0

Pipeline pattern — composing operations in sequence

Intermediate5 min read·eng-04-012
laravel-srccompare

Concept

Pipeline pattern composes a sequence of operations (stages/pipes) where the output of one stage becomes the input of the next. Unlike Chain of Responsibility (where a handler can stop the chain), every stage in a Pipeline always processes and passes the payload to the next stage.

Mental model: Unix pipes. cat file | grep 'error' | sort | uniq. Each command processes and passes to the next.

Structure:

  • Payload: The data flowing through the pipeline. Can be mutated by each stage.
  • Stage (Pipe): A callable or object with a handle($payload, callable $next) signature (or just a callable).
  • Pipeline: Assembles the stages and runs the payload through them.

PHP implementations:

  • array_reduce(): The functional approach. Reduces stages over the initial payload.
  • Laravel's Pipeline class: app(Pipeline::class)->send($data)->through($stages)->thenReturn().
  • Custom Pipeline class: Composable, fluent.

Pipeline vs Chain of Responsibility: Chain: a handler MAY stop the chain. Pipeline: all stages always run (or pass through). Pipeline is a specialized CoR where stages always continue.

Laravel Pipeline class:

  • Used internally for middleware.
  • Available directly: app(Pipeline::class)->send($job)->through([Stage1::class, Stage2::class])->thenReturn().
  • Useful for multi-step data transformations.

Stage interface in Laravel: Each stage must have a handle($payload, Closure $next) method, or be a closure.

Code Example

php
<?php
// Functional pipeline using array_reduce
class Pipeline
{
    private array $stages = [];

    public function pipe(callable $stage): static
    {
        $this->stages[] = $stage;
        return $this;
    }

    public function process(mixed $payload): mixed
    {
        return array_reduce(
            $this->stages,
            fn($carry, $stage) => $stage($carry),
            $payload
        );
    }
}

// Object-oriented stages
interface PipelineStage
{
    public function handle(mixed $payload, callable $next): mixed;
}

class TrimStrings implements PipelineStage
{
    public function handle(mixed $payload, callable $next): mixed
    {
        $payload = array_map('trim', $payload);
        return $next($payload);
    }
}

class NormalizeEmail implements PipelineStage
{
    public function handle(mixed $payload, callable $next): mixed
    {
        if (isset($payload['email'])) {
            $payload['email'] = strtolower($payload['email']);
        }
        return $next($payload);
    }
}

class HashPassword implements PipelineStage
{
    public function handle(mixed $payload, callable $next): mixed
    {
        if (isset($payload['password'])) {
            $payload['password'] = password_hash($payload['password'], PASSWORD_ARGON2ID);
        }
        return $next($payload);
    }
}

class ValidateData implements PipelineStage
{
    public function handle(mixed $payload, callable $next): mixed
    {
        if (empty($payload['email'])) throw new \InvalidArgumentException('Email required');
        if (empty($payload['name']))  throw new \InvalidArgumentException('Name required');
        return $next($payload);
    }
}

// Building and running the pipeline
$formData = ['name' => '  Alice ', 'email' => '  ALICE@EXAMPLE.COM ', 'password' => 'secret123'];

$stages  = [new TrimStrings(), new NormalizeEmail(), new HashPassword(), new ValidateData()];
$payload = $formData;

foreach (array_reverse($stages) as $stage) {
    $inner   = $payload;
    $payload = fn($p) => $stage->handle($p, fn($x) => $x);
}
// Simpler with a Pipeline class:

$result = (new Pipeline())
    ->pipe(fn($data) => array_map('trim', $data))
    ->pipe(fn($data) => array_merge($data, ['email' => strtolower($data['email'])]))
    ->pipe(fn($data) => array_merge($data, ['password' => password_hash($data['password'], PASSWORD_ARGON2ID)]))
    ->process($formData);

// Laravel Pipeline
// $processed = app(\Illuminate\Pipeline\Pipeline::class)
//     ->send($formData)
//     ->through([TrimStrings::class, NormalizeEmail::class, HashPassword::class])
//     ->thenReturn();