Skip to content

Reliability Model

This page explains how PgQueuer handles failures, ensures jobs are not lost, supports idempotent enqueuing, and provides an audit trail of every completed job.

Failure Handling

When a job raises an unhandled exception, PgQueuer:

  1. Marks the job status as exception.
  2. Captures the full traceback, exception type, and message into a TracebackRecord.
  3. Moves the record from the active queue (pgqueuer) to the log table (pgqueuer_log).

The job is not automatically retried at this point — it is considered definitively failed for this execution attempt. The traceback is persisted for inspection via pgq dashboard or a direct query:

SELECT job_id, entrypoint,
       traceback->>'exception_type' AS exception_type,
       traceback->>'exception_message' AS exception_message,
       traceback->>'traceback' AS traceback_text
FROM pgqueuer_log
WHERE status = 'exception'
ORDER BY created DESC
LIMIT 20;

Retry Strategies

PgQueuer provides three complementary retry mechanisms.

Database-level retry: durable re-queuing

Raise RetryRequested from your handler to re-queue the job in the database. The job row is updated in-place — the id, payload, and all metadata are preserved. Any worker can pick up the retried job.

from datetime import timedelta
from pgqueuer.errors import RetryRequested

@pgq.entrypoint("call_api")
async def call_api(job: Job) -> None:
    response = await http_client.post(API_URL, data=job.payload)
    if response.status == 429:
        raise RetryRequested(delay=timedelta(seconds=30), reason="rate limited")

Use DatabaseRetryEntrypointExecutor to automatically convert any exception into a database-level retry with exponential backoff:

from pgqueuer.executors import DatabaseRetryEntrypointExecutor

@pgq.entrypoint(
    "flaky_api",
    executor_factory=lambda params: DatabaseRetryEntrypointExecutor(
        parameters=params,
        max_attempts=5,
        initial_delay=timedelta(seconds=1),
    ),
)
async def flaky_api(job: Job) -> None:
    await call_unreliable_service(job.payload)

See Database-Level Retry for full details, backoff configuration, and traceability queries.

Holding terminal failures for manual re-queue

Set on_failure="hold" on an entrypoint to keep the job in the queue table with status='failed' instead of deleting it. The payload, headers, and attempt count are preserved for inspection. Use pgq failed to list held jobs and pgq requeue <id> to send them back.

See Holding Failed Jobs for the full guide with use cases and examples.

Worker-crash recovery: stalled jobs

If a worker process crashes mid-job, the job remains in picked state with a stale heartbeat. The global heartbeat_timeout on pgq.run() controls when stale jobs become eligible for re-pickup by another worker:

from datetime import timedelta

await pgq.run(
    dequeue_timeout=timedelta(seconds=5),
    batch_size=10,
    heartbeat_timeout=timedelta(minutes=5),
)

Any worker can then claim and re-run the stalled job. See Heartbeat Monitoring for stall detection queries.

Design for re-execution

A recovered job will run again from the start. Ensure your job functions are idempotent, or checkpoint progress externally so a restart is safe.

Idempotency

Pass a dedupe_key to prevent duplicate jobs from entering the queue:

job_ids = await queries.enqueue(
    "send_invoice",
    payload=b'{"order_id": 42}',
    dedupe_key="invoice-order-42",
)

PgQueuer enforces a database-level unique constraint:

UNIQUE (dedupe_key)
WHERE status IN ('queued', 'picked') AND dedupe_key IS NOT NULL

If a job with the same dedupe_key already exists in queued or picked state, a DuplicateJobError is raised. Once the job reaches a terminal state (successful, exception, canceled, deleted), the key is released and the same key can be used again.

Choosing a dedupe key: Use a stable, business-meaningful identifier — for example, f"invoice-{order_id}" or f"report-{date}-{user_id}". This turns enqueue into an idempotent operation: calling it twice with the same key and payload is safe.

Poison Jobs

A "poison job" is one that consistently causes worker crashes or hangs without updating its heartbeat. PgQueuer does not include a built-in dead-letter queue, but you can detect and handle poison jobs with a query:

-- Jobs that have been re-picked more than 3 times (indicative of repeated failure)
-- Adjust threshold based on your retry_timer and expected job runtime
SELECT id, entrypoint, status, heartbeat, updated
FROM pgqueuer
WHERE status = 'picked'
  AND heartbeat < NOW() - INTERVAL '10 minutes'
ORDER BY heartbeat ASC;

Recommended pattern: route these to a separate monitoring alert or move them to a dedicated "quarantine" entrypoint by updating their entrypoint column and re-queuing.

Audit Trail

Every job that leaves the active queue is written to pgqueuer_log:

Event Status in log
Job completes without error successful
Job raises an exception exception (with traceback)
Job is canceled canceled
Job is deleted without running deleted

The log is append-only and serves as a permanent audit record. You can query it directly or use pgq dashboard from the CLI.

Log table retention

PgQueuer does not automatically prune pgqueuer_log. Add a periodic DELETE job or PostgreSQL table partition policy to manage log growth in high-throughput systems.

Summary

Concern Mechanism
Durable retry across workers RetryRequested / DatabaseRetryEntrypointExecutor
Worker crash recovery heartbeat_timeout
Terminal failure parking on_failure="hold"
Duplicate enqueue prevention dedupe_key unique constraint
Failure inspection pgqueuer_log with traceback
Audit trail pgqueuer_log for all terminal states