Pipeline pattern — composing operations in sequence
Intermediate5 min read·eng-04-012
laravel-srccompare
Concept
Pipeline pattern composes a sequence of operations (stages/pipes) where the output of one stage becomes the input of the next. Unlike Chain of Responsibility (where a handler can stop the chain), every stage in a Pipeline always processes and passes the payload to the next stage.
Mental model: Unix pipes. cat file | grep 'error' | sort | uniq. Each command processes and passes to the next.
Structure:
- Payload: The data flowing through the pipeline. Can be mutated by each stage.
- Stage (Pipe): A callable or object with a
handle($payload, callable $next)signature (or just a callable). - Pipeline: Assembles the stages and runs the payload through them.
PHP implementations:
array_reduce(): The functional approach. Reduces stages over the initial payload.- Laravel's
Pipelineclass:app(Pipeline::class)->send($data)->through($stages)->thenReturn(). - Custom
Pipelineclass: Composable, fluent.
Pipeline vs Chain of Responsibility: Chain: a handler MAY stop the chain. Pipeline: all stages always run (or pass through). Pipeline is a specialized CoR where stages always continue.
Laravel Pipeline class:
- Used internally for middleware.
- Available directly:
app(Pipeline::class)->send($job)->through([Stage1::class, Stage2::class])->thenReturn(). - Useful for multi-step data transformations.
Stage interface in Laravel: Each stage must have a handle($payload, Closure $next) method, or be a closure.
Code Example
php
<?php
// Functional pipeline using array_reduce
class Pipeline
{
private array $stages = [];
public function pipe(callable $stage): static
{
$this->stages[] = $stage;
return $this;
}
public function process(mixed $payload): mixed
{
return array_reduce(
$this->stages,
fn($carry, $stage) => $stage($carry),
$payload
);
}
}
// Object-oriented stages
interface PipelineStage
{
public function handle(mixed $payload, callable $next): mixed;
}
class TrimStrings implements PipelineStage
{
public function handle(mixed $payload, callable $next): mixed
{
$payload = array_map('trim', $payload);
return $next($payload);
}
}
class NormalizeEmail implements PipelineStage
{
public function handle(mixed $payload, callable $next): mixed
{
if (isset($payload['email'])) {
$payload['email'] = strtolower($payload['email']);
}
return $next($payload);
}
}
class HashPassword implements PipelineStage
{
public function handle(mixed $payload, callable $next): mixed
{
if (isset($payload['password'])) {
$payload['password'] = password_hash($payload['password'], PASSWORD_ARGON2ID);
}
return $next($payload);
}
}
class ValidateData implements PipelineStage
{
public function handle(mixed $payload, callable $next): mixed
{
if (empty($payload['email'])) throw new \InvalidArgumentException('Email required');
if (empty($payload['name'])) throw new \InvalidArgumentException('Name required');
return $next($payload);
}
}
// Building and running the pipeline
$formData = ['name' => ' Alice ', 'email' => ' ALICE@EXAMPLE.COM ', 'password' => 'secret123'];
$stages = [new TrimStrings(), new NormalizeEmail(), new HashPassword(), new ValidateData()];
$payload = $formData;
foreach (array_reverse($stages) as $stage) {
$inner = $payload;
$payload = fn($p) => $stage->handle($p, fn($x) => $x);
}
// Simpler with a Pipeline class:
$result = (new Pipeline())
->pipe(fn($data) => array_map('trim', $data))
->pipe(fn($data) => array_merge($data, ['email' => strtolower($data['email'])]))
->pipe(fn($data) => array_merge($data, ['password' => password_hash($data['password'], PASSWORD_ARGON2ID)]))
->process($formData);
// Laravel Pipeline
// $processed = app(\Illuminate\Pipeline\Pipeline::class)
// ->send($formData)
// ->through([TrimStrings::class, NormalizeEmail::class, HashPassword::class])
// ->thenReturn();