Holding Failed Jobs¶
By default, when a job raises an unhandled exception it is deleted from the queue table and
logged with status exception. The payload is gone. This is fine for many workloads, but some
scenarios require keeping the failed job around so a human (or external system) can inspect it
and decide whether to retry.
Setting on_failure="hold" on an entrypoint changes this behavior: instead of deleting, the
job is updated to status='failed' and stays in the queue table with its payload, headers, and
all metadata intact. The dequeue query skips failed jobs, so they won't be picked up by
workers until explicitly re-queued.
How It Works¶
- A job handler raises an unhandled exception.
- PgQueuer checks the entrypoint's
on_failuresetting. - If
"hold": the job is updated tostatus='failed'in-place and a log entry is written. - If
"delete"(default): the job is deleted and logged withexception(existing behavior). - Failed jobs sit in the queue table until someone re-queues or deletes them.
The hold operation flows through the same batched buffer as normal job completions — there is no performance penalty in the common (non-failure) path.
Use Cases¶
One-shot jobs with manual review¶
You want each job to run exactly once. If it fails, a human inspects the error and decides whether to retry:
@pgq.entrypoint("process_order", on_failure="hold")
async def process_order(job: Job) -> None:
await payment_gateway.charge(job.payload)
If the payment gateway returns an error, the job is parked. An operator can inspect the payload
via pgq failed, fix the upstream issue, and then pgq requeue <id>.
Auto-retry with a safety net¶
Combine DatabaseRetryEntrypointExecutor with on_failure="hold" to get automatic retries
and a safety net. The executor retries N times with exponential backoff; if all attempts
fail, the job is held instead of lost:
from datetime import timedelta
from pgqueuer.executors import DatabaseRetryEntrypointExecutor
@pgq.entrypoint(
"sync_inventory",
on_failure="hold",
executor_factory=lambda params: DatabaseRetryEntrypointExecutor(
parameters=params,
max_attempts=5,
initial_delay=timedelta(seconds=2),
max_delay=timedelta(minutes=10),
),
)
async def sync_inventory(job: Job) -> None:
await inventory_api.sync(job.payload)
After 5 failed attempts the job lands in status='failed' with the full retry history in the
log table.
External API with human-in-the-loop¶
A webhook delivery service where transient failures retry automatically, but persistent failures (4xx responses, invalid payloads) need a human to fix the payload and re-send:
@pgq.entrypoint("deliver_webhook", on_failure="hold")
async def deliver_webhook(job: Job) -> None:
response = await http_client.post(url, data=job.payload)
if response.status == 429:
raise RetryRequested(delay=timedelta(seconds=60), reason="rate limited")
response.raise_for_status()
Rate limits trigger an automatic database-level retry. Other HTTP errors (400, 500, etc.) raise an exception and the job is held for inspection.
Inspecting Failed Jobs¶
CLI¶
Output includes job ID, entrypoint, attempt count, creation time, and payload size.
SQL¶
SELECT id, entrypoint, attempts, created,
octet_length(payload) AS payload_bytes
FROM pgqueuer
WHERE status = 'failed'
ORDER BY created DESC
LIMIT 50;
For the traceback of a specific failed job:
SELECT traceback->>'exception_type' AS type,
traceback->>'exception_message' AS message,
traceback->>'traceback' AS tb
FROM pgqueuer_log
WHERE job_id = 42 AND status = 'failed'
ORDER BY created DESC
LIMIT 1;
Re-queuing Failed Jobs¶
CLI¶
Programmatic¶
What happens on re-queue¶
- Status changes from
failedtoqueued. execute_afteris set toNOW()— the job is immediately eligible.attemptsis reset to0— a fresh start.- A log entry with
status='queued'is written for auditability. - Any worker can pick up the re-queued job.
- If
DatabaseRetryEntrypointExecutoris configured, the job gets a full new set of retry attempts.
Behavior Summary¶
| Scenario | on_failure="delete" (default) |
on_failure="hold" |
|---|---|---|
| Handler raises exception | Job deleted, logged as exception |
Job kept with status='failed' |
Handler raises RetryRequested |
Re-queued with delay (both modes) | Re-queued with delay (both modes) |
| Handler completes | Job deleted, logged as successful |
Job deleted, logged as successful |
DatabaseRetryEntrypointExecutor exhausted |
Job deleted | Job held with status='failed' |