0

Laravel Horizon — real-time queue monitoring

Advanced5 min read·lv-19-007
performance

Concept

Job middleware wraps job execution with reusable cross-cutting concerns — preventing overlap, rate limiting, and throttling exceptions. Applied via the middleware() method on job classes.

WithoutOverlapping(string $key): Acquires an atomic lock before running. If the lock exists (another instance is running), the job is released back to the queue.

  • ->releaseAfter(int $seconds): Delay before re-attempt. Default: 0.
  • ->expireAfter(int $seconds): Lock TTL as a safety valve.
  • ->dontRelease(): Delete the job instead of releasing if lock unavailable.

RateLimited(string $limiterName): Uses a named rate limiter. The job waits and retries if the rate limit is exceeded. Define limiters with RateLimiter::for().

ThrottlesExceptions(int $maxAttempts, int $decayMinutes): Counts exceptions within a time window. Once $maxAttempts exceptions occur in $decayMinutes, jobs are throttled (released back with a delay) instead of being immediately retried.

Skip(bool $skip): Built-in middleware to conditionally skip job execution.

Custom middleware: Any class with handle($job, $next) signature. Can delete, release, or pass the job.

Middleware are not singletons: A new middleware instance is created per job execution. They can be stateful.

Code Example

php
<?php
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
use Illuminate\Support\Facades\RateLimiter;
use Illuminate\Cache\RateLimiting\Limit;

// Register rate limiters in AppServiceProvider::boot()
RateLimiter::for('stripe-api', function(object $job) {
    return Limit::perMinute(100)->by('stripe'); // 100 calls/min to Stripe
});

RateLimiter::for('per-user', function(object $job) {
    return Limit::perHour(50)->by($job->userId); // 50 jobs/hour per user
});

// Job applying multiple middleware
class ChargeCustomer implements \Illuminate\Contracts\Queue\ShouldQueue
{
    use \Illuminate\Bus\Queueable, \Illuminate\Foundation\Bus\Dispatchable;
    use \Illuminate\Queue\InteractsWithQueue, \Illuminate\Queue\SerializesModels;

    public int $tries = 10;

    public function __construct(
        public readonly int $userId,
        public readonly float $amount,
    ) {}

    public function middleware(): array
    {
        return [
            // Prevent duplicate charges for the same user
            (new WithoutOverlapping("charge:user:{$this->userId}"))
                ->releaseAfter(5)
                ->expireAfter(60),

            // Rate limit Stripe API calls
            new \Illuminate\Queue\Middleware\RateLimited('stripe-api'),

            // If Stripe throws exceptions repeatedly, back off
            (new ThrottlesExceptions(5, 10))  // 5 exceptions per 10 minutes
                ->backoff(5),                  // retry after 5 minutes when throttled
        ];
    }

    public function handle(): void
    {
        app(\App\Services\StripeService::class)->charge($this->userId, $this->amount);
    }
}

// Custom middleware — skip if feature flag is off
class RequiresFeatureFlag
{
    public function __construct(private readonly string $feature) {}

    public function handle(object $job, callable $next): void
    {
        if (!\Illuminate\Support\Facades\Feature::active($this->feature)) {
            $job->delete(); // silently discard
            return;
        }
        $next($job);
    }
}

// Apply custom middleware
public function middleware(): array
{
    return [new RequiresFeatureFlag('new_payment_system')];
}