Database-Level Retry¶
PgQueuer supports database-level retry — when a job handler raises RetryRequested, the
job stays in the queue table and is re-queued for another attempt. The job row is updated
in-place (not deleted and re-inserted), so the id, payload, headers, and all metadata
are preserved across retries.
How It Works¶
- Your handler raises
RetryRequestedwith an optional delay and reason. - PgQueuer catches the exception and atomically:
- Resets the job status to
queued. - Bumps
execute_afterby the requested delay. - Increments the
attemptscounter. - Writes a log entry to
pgqueuer_logwith full traceability context.
- Resets the job status to
- The job becomes eligible for dequeue again after the delay expires.
- On the next execution,
job.attemptsreflects how many previous attempts occurred.
Because the job row is updated (not deleted), the same job.id is stable across all
retry attempts. This makes it straightforward to trace the full retry graph by querying the
log table.
Basic Usage¶
Raise RetryRequested from your handler when you detect a transient failure:
from datetime import timedelta
from pgqueuer import PgQueuer, Job
from pgqueuer.errors import RetryRequested
pgq = PgQueuer(driver)
@pgq.entrypoint("call_api")
async def call_api(job: Job) -> None:
response = await http_client.post(API_URL, data=job.payload)
if response.status == 429:
raise RetryRequested(
delay=timedelta(seconds=30),
reason="rate limited",
)
response.raise_for_status()
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
delay |
timedelta |
timedelta(0) |
Time to wait before the next attempt |
reason |
str \| None |
None |
Human-readable explanation (stored in the log) |
Reading the Attempt Counter¶
The job.attempts field tells you how many previous attempts have been made. On the first
execution it is 0, after one retry it is 1, and so on:
@pgq.entrypoint("resilient_task")
async def resilient_task(job: Job) -> None:
if job.attempts > 5:
# Give up after 5 retries — let it fail terminally
raise RuntimeError("Too many retries, giving up")
try:
await do_work(job.payload)
except TransientError:
raise RetryRequested(
delay=timedelta(seconds=2 ** job.attempts),
reason=f"transient error on attempt {job.attempts}",
)
Automatic Retry with DatabaseRetryEntrypointExecutor¶
For cases where you want any unhandled exception to trigger a retry (not just explicit
RetryRequested raises), use DatabaseRetryEntrypointExecutor. It wraps your handler and
converts exceptions into RetryRequested with exponential backoff:
from datetime import timedelta
from pgqueuer import PgQueuer, Job
from pgqueuer.executors import DatabaseRetryEntrypointExecutor
pgq = PgQueuer(driver)
@pgq.entrypoint(
"flaky_api",
executor_factory=lambda params: DatabaseRetryEntrypointExecutor(
parameters=params,
max_attempts=5,
initial_delay=timedelta(seconds=1),
max_delay=timedelta(minutes=5),
backoff_multiplier=2.0,
),
)
async def flaky_api(job: Job) -> None:
await call_unreliable_service(job.payload)
After max_attempts consecutive failures, the original exception propagates as a terminal
failure (the job is deleted and logged with status exception).
If your handler raises RetryRequested directly, it passes through the executor unchanged —
the executor only converts non-retry exceptions.
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
max_attempts |
int |
5 |
Maximum retries before the exception becomes terminal |
initial_delay |
timedelta |
1s |
Delay before the first retry |
max_delay |
timedelta |
5m |
Cap on exponential backoff |
backoff_multiplier |
float |
2.0 |
Multiplier applied to delay after each attempt |
Backoff Schedule Example¶
With initial_delay=1s, backoff_multiplier=2.0, max_delay=60s:
| Attempt | Delay |
|---|---|
| 0 | 1s |
| 1 | 2s |
| 2 | 4s |
| 3 | 8s |
| 4 | 16s |
| 5 | 32s |
| 6+ | 60s (capped) |
Retry vs. Heartbeat Recovery¶
PgQueuer has two complementary retry mechanisms:
| Mechanism | Scope | When to use |
|---|---|---|
RetryRequested / DatabaseRetryEntrypointExecutor |
Database-level | Transient failures where you want the job to survive across worker restarts and be visible to any worker |
| Heartbeat timeout | Worker-crash recovery | Stalled jobs whose worker has crashed without updating the heartbeat |
RetryRequested re-queues the job in the database, so any worker can pick it up.
Heartbeat recovery handles the case where a worker crashes without raising an exception.
Traceability¶
Every retry writes a log entry to pgqueuer_log with:
status = 'queued'(the job was re-queued)tracebackcontaining aTracebackRecordwith the exception detailsadditional_contextwith retry metadata:
Query the retry history for a specific job:
SELECT job_id, status, created,
traceback->'additional_context'->>'attempt' AS attempt,
traceback->'additional_context'->>'reason' AS reason,
traceback->'additional_context'->>'retry_delay' AS delay
FROM pgqueuer_log
WHERE job_id = 42
ORDER BY created;
Behavior Summary¶
| Scenario | What happens |
|---|---|
Handler raises RetryRequested |
Job updated to queued, attempts incremented, log entry written |
| Handler raises any other exception | Job deleted, logged as exception (or held if on_failure="hold" — see Holding Failed Jobs) |
| Handler completes normally | Job deleted, logged as successful (existing behavior) |
DatabaseRetryEntrypointExecutor + exception + attempts < max |
Converted to RetryRequested with backoff |
DatabaseRetryEntrypointExecutor + exception + attempts >= max |
Exception propagates as terminal failure (or held if on_failure="hold") |