Pipeline Cookbook¶
This cookbook provides practical, ready-to-use solutions for common data processing challenges.
Data Cleaning¶
Safe and Predictable Data Cleaning¶
Problem: You need to remove null or false values from a dataset without accidentally removing valid data like 0 or empty strings.
Solution: Use select(), which by default removes only null and false.
$cleanedData = take($rawData)
->select()
->toList();
Use plain filter() only when you genuinely want to drop every falsy value, like array_filter() does.
Batch Processing¶
Batching for Databases and APIs¶
Problem: You need to process a large number of records and send them to a database or API in manageable batches.
Solution: Use chunk() to group items and each() to process each batch.
// Process a large dataset in batches of 1000
take(new SplFileObject('large-dataset.csv'))
->map(str_getcsv(...))
->chunk(1000)
->each(function ($batch) {
Database::bulkInsert($batch);
});
Only one batch is in memory at a time. For ramp-up scenarios—a small trial batch first, then full-size batches—see chunkBy().
Real-Time Analysis¶
Anomaly Detection¶
Problem: You have a live stream of data and need to identify outliers in real time.
Solution: Use runningVariance() to maintain live statistics and flag data points that fall outside a normal range.
$stats = null;
take($liveStream)
->runningVariance($stats)
->each(function ($value) use ($stats) {
if ($stats->getCount() < 30) {
return; // Not enough data yet
}
if (abs($value - $stats->getMean()) > 3 * $stats->getStandardDeviation()) {
AlertSystem::notify("Anomaly detected: $value");
}
});
Sliding Window Calculations¶
Problem: You need a moving average (or any windowed calculation) over a stream.
Solution: Keep a small rolling buffer in a stateful map() callback, yielding once the window is full.
$window = [];
$movingAverages = take($measurements)
->map(function ($value) use (&$window) {
$window[] = $value;
if (count($window) > 5) {
array_shift($window);
}
if (count($window) === 5) {
yield array_sum($window) / 5;
}
})
->toList();
Distributed Data¶
Parallel Statistics Aggregation¶
Problem: Your data is distributed across multiple sources, and you need overall statistics without centralizing the data.
Solution: Calculate finalVariance() for each source independently, then merge the resulting RunningVariance objects.
use Pipeline\Helper\RunningVariance;
$stats1 = take($source1)->finalVariance();
$stats2 = take($source2)->finalVariance();
$overallStats = new RunningVariance($stats1, $stats2);
File Processing¶
CSV Processing¶
Problem: You need to process a large CSV file efficiently.
Solution: Stream the file line by line; only one row is in memory at a time.
$data = take(new SplFileObject('data.csv'))
->map(str_getcsv(...))
->filter(fn($row) => count($row) === 3)
->toList();
Log File Analysis¶
Problem: You need counts and a sample of matching lines from a huge log file, in a single pass.
Solution: Combine runningCount() with the filtering chain; use skipWhile() to ignore a preamble.
$total = 0;
$errors = take(new SplFileObject('app.log'))
->skipWhile(fn($line) => !str_contains($line, 'STARTUP COMPLETE'))
->runningCount($total)
->filter(fn($line) => str_contains($line, 'ERROR'))
->slice(0, 100)
->toList();
// $errors holds the first 100 errors; $total counts all lines seen
Infinite Sequences¶
Generating and Consuming Endless Streams¶
Problem: You need to process a sequence with no natural end—generated data, polling results, an event stream.
Solution: Seed the pipeline with an infinite generator; laziness guarantees only the consumed portion is ever computed. Bound the consumption with slice() or by breaking out of a foreach.
use function Pipeline\map;
$fibonacci = map(function () {
yield 0;
$prev = 0;
$current = 1;
while (true) {
yield $current;
[$prev, $current] = [$current, $prev + $current];
}
});
// Statistics for the second hundred Fibonacci numbers
$variance = $fibonacci->slice(101, 100)->finalVariance();
$variance->getCount(); // 100
Error Handling¶
Safe Processing¶
Problem: Some items may cause errors, but you want to continue processing the rest.
Solution: Wrap the risky operation in a try-catch block within a map() transformation.
$results = take($inputs)
->map(function ($item) {
try {
return ['success' => true, 'data' => process($item)];
} catch (Exception $e) {
return ['success' => false, 'error' => $e->getMessage()];
}
})
->toList();
Logging Rejected Items¶
Problem: You filter out invalid records but still need to know what was dropped and why.
Solution: Use select() with its onReject callback.
$valid = take($records)
->select(
fn($record) => $record->isValid(),
onReject: fn($record, $key) => $logger->warning("Dropped record $key"),
)
->toList();
Deduplication¶
Memory-Efficient Deduplication¶
Problem: You need to remove duplicate values from a large stream without loading it all into memory.
Solution: Track seen keys in a set; memory grows with the number of unique items only.
$seen = [];
$unique = take($users)
->filter(function ($user) use (&$seen) {
$key = $user['email'];
if (isset($seen[$key])) {
return false;
}
$seen[$key] = true;
return true;
})
->toList();
For an array-backed pipeline of scalar values there is also a shortcut: flip()->flip() deduplicates via array_flip(), just as it would with plain arrays.