0

Design a job queue system from scratch

Advanced5 min read·eng-11-002
interviewsql

Concept

Design a job queue system from scratch — understanding what Laravel's queue actually does at a lower level.

Core components:

  1. Producer: The code that creates and pushes a job onto the queue. Queue::push($job).
  2. Queue storage: Where jobs are stored between dispatch and execution. Database table, Redis list, SQS, etc.
  3. Worker: A long-running process that polls the queue, picks up jobs, executes them.
  4. Serialization: Jobs must be serialized to store in queue, deserialized to execute.

Database-backed queue (simplest to understand):

  • Table: jobs(id, queue, payload JSON, attempts, available_at, reserved_at, created_at).
  • Push: INSERT INTO jobs (queue, payload, available_at) VALUES (?, ?, ?).
  • Pop: SELECT ... WHERE queue = ? AND reserved_at IS NULL AND available_at <= NOW() LIMIT 1 FOR UPDATE. Update reserved_at. Execute. DELETE on success.
  • The FOR UPDATE lock prevents two workers from picking the same job.

Redis-backed queue (production standard):

  • LPUSH queue:default $payload to push (O(1)).
  • Worker: BRPOPLPUSH queue:default queue:reserved 0 — atomically moves a job from the main queue to a "reserved" list. If the worker dies, the reserved list lets you recover.
  • On success: LREM queue:reserved 1 $payload.
  • BRPOPLPUSH blocks until a job is available — no polling delay.

Retry logic: attempts column / counter. On failure: increment attempts. If attempts < max_tries: put back in queue with a delay. If attempts >= max_tries: move to failed jobs table.

Delayed jobs: Store available_at = NOW() + delay. Workers only pick jobs where available_at <= NOW().

Failed jobs: failed_jobs(id, queue, payload, exception, failed_at). php artisan queue:retry all.

Code Example

php
<?php
// Minimal database-backed queue implementation
class SimpleQueue
{
    public function __construct(private readonly \PDO $pdo)
    {
        $this->pdo->exec('
            CREATE TABLE IF NOT EXISTS jobs (
                id           INTEGER PRIMARY KEY AUTOINCREMENT,
                queue        TEXT NOT NULL DEFAULT "default",
                payload      TEXT NOT NULL,
                attempts     INTEGER NOT NULL DEFAULT 0,
                available_at INTEGER NOT NULL,
                reserved_at  INTEGER NULL,
                created_at   INTEGER NOT NULL
            )
        ');
    }

    // Producer — push a job
    public function push(object $job, int $delaySeconds = 0): void
    {
        $now  = time();
        $stmt = $this->pdo->prepare(
            'INSERT INTO jobs (payload, available_at, created_at) VALUES (?, ?, ?)'
        );
        $stmt->execute([serialize($job), $now + $delaySeconds, $now]);
    }

    // Worker — pop a job (reserve it atomically)
    public function pop(string $queue = 'default'): ?object
    {
        $this->pdo->beginTransaction();
        $stmt = $this->pdo->prepare('
            SELECT * FROM jobs
            WHERE queue = ? AND reserved_at IS NULL AND available_at <= ?
            ORDER BY id ASC
            LIMIT 1
        ');
        $stmt->execute([$queue, time()]);
        $row = $stmt->fetch(\PDO::FETCH_ASSOC);

        if (!$row) { $this->pdo->rollBack(); return null; }

        $this->pdo->prepare('UPDATE jobs SET reserved_at = ?, attempts = attempts + 1 WHERE id = ?')
            ->execute([time(), $row['id']]);

        $this->pdo->commit();
        return unserialize($row['payload']);
    }

    public function delete(int $jobId): void
    {
        $this->pdo->prepare('DELETE FROM jobs WHERE id = ?')->execute([$jobId]);
    }

    public function release(int $jobId, int $delay = 0): void
    {
        $this->pdo->prepare('UPDATE jobs SET reserved_at = NULL, available_at = ? WHERE id = ?')
            ->execute([time() + $delay, $jobId]);
    }
}

// Worker loop
class QueueWorker
{
    public function __construct(private readonly SimpleQueue $queue) {}

    public function work(): void
    {
        echo "Worker started. Waiting for jobs...\n";
        while (true) {
            $job = $this->queue->pop();
            if ($job === null) { sleep(1); continue; } // poll interval

            try {
                $job->handle();
                echo "Job completed: " . get_class($job) . "\n";
            } catch (\Throwable $e) {
                echo "Job failed: " . $e->getMessage() . "\n";
                // In a real queue: check attempts, retry or move to failed_jobs
            }
        }
    }
}

// Example job
class SendWelcomeEmail
{
    public function __construct(private readonly int $userId) {}

    public function handle(): void
    {
        $user = \App\Models\User::find($this->userId);
        \Mail::to($user)->send(new \App\Mail\WelcomeEmail($user));
        echo "Email sent to {$user->email}\n";
    }
}