0

PSR-14 event dispatcher — how Laravel implements it

Advanced5 min read·lv-18-006
psrlaravel-srcframework

Concept

Job middleware allows wrapping job execution with reusable logic — rate limiting, preventing overlapping runs, and throttling. They're similar to HTTP middleware but applied to queue jobs.

Defining job middleware: A class with a single handle(Job $job, callable $next) method (like HTTP middleware). Applied via middleware() method on the job class.

WithoutOverlapping: Built-in middleware. Prevents multiple instances of the same job from running concurrently using atomic locks. Takes a key that identifies what "overlapping" means: new WithoutOverlapping($this->userId) — only one ProcessUserReport per user at a time.

RateLimited / ThrottlesExceptions: Rate-limit job processing or throttle retries when exceptions occur (e.g., API rate limits). Uses named rate limiters defined with RateLimiter::for().

Middleware::class or custom: Any class with a handle() method can be middleware. Applied by returning instances from public function middleware(): array.

WithoutOverlapping release behavior: If a lock can't be acquired (another instance is running), the job is released back to the queue. releaseAfter(int $seconds) controls the delay. expireAfter(int $seconds) sets a maximum lock duration.

Code Example

php
<?php
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\Middleware\RateLimited;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

class ProcessUserReport implements \Illuminate\Contracts\Queue\ShouldQueue
{
    use \Illuminate\Bus\Queueable, \Illuminate\Foundation\Bus\Dispatchable;
    use \Illuminate\Queue\InteractsWithQueue, \Illuminate\Queue\SerializesModels;

    public function __construct(private readonly int $userId) {}

    public function middleware(): array
    {
        return [
            // Only one ProcessUserReport per user at a time
            (new WithoutOverlapping($this->userId))
                ->releaseAfter(30)  // try again in 30s if lock not available
                ->expireAfter(300), // lock expires in 5 minutes (failsafe)
        ];
    }

    public function handle(): void { ... }
}

// Rate-limited job — using a named rate limiter
// In AppServiceProvider::boot():
\Illuminate\Support\Facades\RateLimiter::for('api-calls', function($job) {
    return \Illuminate\Cache\RateLimiting\Limit::perMinute(60)
        ->by($job->userId);
});

class SyncWithExternalApi implements \Illuminate\Contracts\Queue\ShouldQueue
{
    use \Illuminate\Bus\Queueable, \Illuminate\Foundation\Bus\Dispatchable;
    use \Illuminate\Queue\InteractsWithQueue, \Illuminate\Queue\SerializesModels;

    public function middleware(): array
    {
        return [new RateLimited('api-calls')];
    }
    public function handle(): void { ... }
}

// ThrottlesExceptions — exponential backoff when API returns exceptions
class CallExternalService implements \Illuminate\Contracts\Queue\ShouldQueue
{
    use \Illuminate\Bus\Queueable, \Illuminate\Foundation\Bus\Dispatchable;
    use \Illuminate\Queue\InteractsWithQueue, \Illuminate\Queue\SerializesModels;

    public $tries = 10;

    public function middleware(): array
    {
        return [
            (new ThrottlesExceptions(10, 5)) // 10 exceptions allowed per 5-minute window
                ->backoff(5),                // 5 minute wait when throttled
        ];
    }

    public function handle(): void
    {
        // If this throws more than 10 exceptions in 5 minutes,
        // the job is released and retried after 5 minutes
        app(\App\Services\ExternalApi::class)->call();
    }
}

// Custom middleware
class EnsureFeatureEnabled
{
    public function handle(object $job, callable $next): void
    {
        if (!\Illuminate\Support\Facades\Feature::active('new_processing')) {
            $job->delete(); // discard job without failing
            return;
        }
        $next($job);
    }
}