Reliability Model¶
This page explains how PgQueuer handles failures, ensures jobs are not lost, supports idempotent enqueuing, and provides an audit trail of every completed job.
Failure Handling¶
When a job raises an unhandled exception, PgQueuer:
- Marks the job status as
exception. - Captures the full traceback, exception type, and message into a TracebackRecord.
- Moves the record from the active queue (
pgqueuer) to the log table (pgqueuer_log).
The job is not automatically retried at this point — it is considered definitively failed
for this execution attempt. The traceback is persisted for inspection via pgq dashboard
or a direct query:
SELECT job_id, entrypoint,
traceback->>'exception_type' AS exception_type,
traceback->>'exception_message' AS exception_message,
traceback->>'traceback' AS traceback_text
FROM pgqueuer_log
WHERE status = 'exception'
ORDER BY created DESC
LIMIT 20;
Retry Strategies¶
PgQueuer provides three complementary retry mechanisms.
Database-level retry: durable re-queuing¶
Raise RetryRequested from your handler to re-queue the job in the database. The job row is
updated in-place — the id, payload, and all metadata are preserved. Any worker can pick
up the retried job.
from datetime import timedelta
from pgqueuer.errors import RetryRequested
@pgq.entrypoint("call_api")
async def call_api(job: Job) -> None:
response = await http_client.post(API_URL, data=job.payload)
if response.status == 429:
raise RetryRequested(delay=timedelta(seconds=30), reason="rate limited")
Use DatabaseRetryEntrypointExecutor to automatically convert any exception into a
database-level retry with exponential backoff:
from pgqueuer.executors import DatabaseRetryEntrypointExecutor
@pgq.entrypoint(
"flaky_api",
executor_factory=lambda params: DatabaseRetryEntrypointExecutor(
parameters=params,
max_attempts=5,
initial_delay=timedelta(seconds=1),
),
)
async def flaky_api(job: Job) -> None:
await call_unreliable_service(job.payload)
See Database-Level Retry for full details, backoff configuration, and traceability queries.
Holding terminal failures for manual re-queue¶
Set on_failure="hold" on an entrypoint to keep the job in the queue table with
status='failed' instead of deleting it. The payload, headers, and attempt count are preserved
for inspection. Use pgq failed to list held jobs and pgq requeue <id> to send them back.
See Holding Failed Jobs for the full guide with use cases and examples.
Worker-crash recovery: stalled jobs¶
If a worker process crashes mid-job, the job remains in picked state with a stale heartbeat.
The global heartbeat_timeout on pgq.run() controls when stale jobs become eligible for
re-pickup by another worker:
from datetime import timedelta
await pgq.run(
dequeue_timeout=timedelta(seconds=5),
batch_size=10,
heartbeat_timeout=timedelta(minutes=5),
)
Any worker can then claim and re-run the stalled job. See Heartbeat Monitoring for stall detection queries.
Design for re-execution
A recovered job will run again from the start. Ensure your job functions are idempotent, or checkpoint progress externally so a restart is safe.
Idempotency¶
Pass a dedupe_key to prevent duplicate jobs from entering the queue:
job_ids = await queries.enqueue(
"send_invoice",
payload=b'{"order_id": 42}',
dedupe_key="invoice-order-42",
)
PgQueuer enforces a database-level unique constraint:
If a job with the same dedupe_key already exists in queued or picked state, a
DuplicateJobError is raised. Once the job reaches a terminal state (successful,
exception, canceled, deleted), the key is released and the same key can be used again.
Choosing a dedupe key: Use a stable, business-meaningful identifier — for example,
f"invoice-{order_id}" or f"report-{date}-{user_id}". This turns enqueue into an
idempotent operation: calling it twice with the same key and payload is safe.
Poison Jobs¶
A "poison job" is one that consistently causes worker crashes or hangs without updating its heartbeat. PgQueuer does not include a built-in dead-letter queue, but you can detect and handle poison jobs with a query:
-- Jobs that have been re-picked more than 3 times (indicative of repeated failure)
-- Adjust threshold based on your retry_timer and expected job runtime
SELECT id, entrypoint, status, heartbeat, updated
FROM pgqueuer
WHERE status = 'picked'
AND heartbeat < NOW() - INTERVAL '10 minutes'
ORDER BY heartbeat ASC;
Recommended pattern: route these to a separate monitoring alert or move them to a dedicated "quarantine" entrypoint by updating their entrypoint column and re-queuing.
Audit Trail¶
Every job that leaves the active queue is written to pgqueuer_log:
| Event | Status in log |
|---|---|
| Job completes without error | successful |
| Job raises an exception | exception (with traceback) |
| Job is canceled | canceled |
| Job is deleted without running | deleted |
The log is append-only and serves as a permanent audit record. You can query it directly
or use pgq dashboard from the CLI.
Log table retention
PgQueuer does not automatically prune pgqueuer_log. Add a periodic DELETE job or
PostgreSQL table partition policy to manage log growth in high-throughput systems.
Summary¶
| Concern | Mechanism |
|---|---|
| Durable retry across workers | RetryRequested / DatabaseRetryEntrypointExecutor |
| Worker crash recovery | heartbeat_timeout |
| Terminal failure parking | on_failure="hold" |
| Duplicate enqueue prevention | dedupe_key unique constraint |
| Failure inspection | pgqueuer_log with traceback |
| Audit trail | pgqueuer_log for all terminal states |