Completion Tracking¶
CompletionWatcher lets you await the final status of a job using PostgreSQL
LISTEN/NOTIFY instead of polling.
Parameters¶
| Parameter | Type | Default | Purpose |
|---|---|---|---|
refresh_interval |
timedelta \| None |
5 s | Safety-net poll in case a NOTIFY was lost. Pass None to disable polling and rely solely on notifications |
debounce |
timedelta |
50 ms | Coalesces bursts of NOTIFYs to reduce query load |
Basic Usage¶
from pgqueuer.core.completion import CompletionWatcher
from pgqueuer.queries import Queries
queries = Queries(driver)
async with CompletionWatcher(driver, queries=queries) as watcher:
status = await watcher.wait_for(job_id)
# status: "successful", "exception", "canceled", or "deleted"
Completion Watcher State Flow¶
The watcher monitors a job's progression until it reaches a terminal state:
┌────────┐
│ queued │
└───┬──┬─┘
claim │ │ delete
▼ ▼
┌────────┐ ┌─────────┐
│ picked │ │ deleted │
└┬─┬──┬─┬┘ └─────────┘
│ │ │ │
complete │ │ │ │ cancel
│ │ │ │
▼ │ │ ▼
┌────────────┐ │ │ ┌───────────┐
│ successful │ │ │ │ canceled │
└────────────┘ │ │ └───────────┘
error │ │ hold
▼ ▼
┌───────────┐ ┌────────┐
│ exception │ │ failed │
└───────────┘ └────────┘
failed is not a terminal state for the watcher
A held job (on_failure="hold") lands in failed, but wait_for resolves
only on successful, exception, canceled, or deleted. A held job stays
unresolved until it is re-queued and reaches one of those states.
Tracking Many Jobs at Once¶
from asyncio import gather
from pgqueuer.core.completion import CompletionWatcher
image_ids = await qm.queries.enqueue(["render_img"] * 20, [b"..."] * 20, [0] * 20)
report_ids = await qm.queries.enqueue(["generate_pdf"] * 10, [b"..."] * 10, [0] * 10)
cleanup_ids = await qm.queries.enqueue(["cleanup"] * 5, [b"..."] * 5, [0] * 5)
async with CompletionWatcher(driver, queries=queries) as w:
img_statuses, pdf_statuses, clean_statuses = await gather(
gather(*[w.wait_for(j) for j in image_ids]),
gather(*[w.wait_for(j) for j in report_ids]),
gather(*[w.wait_for(j) for j in cleanup_ids]),
)
Terminal states: canceled, deleted, exception, successful.
Helper Patterns¶
Below are two ready-to-use patterns you can copy into your own code for common completion-tracking scenarios.
Wait for all jobs¶
Block until every supplied job finishes; return statuses in the same order as the input IDs.
import asyncio
from datetime import timedelta
from pgqueuer import db, models
from pgqueuer.core.completion import CompletionWatcher
from pgqueuer.queries import Queries
async def wait_for_all(
driver: db.Driver,
job_ids: list[models.JobId],
refresh_interval: timedelta = timedelta(seconds=5),
debounce: timedelta = timedelta(milliseconds=50),
) -> list[models.JOB_STATUS]:
async with CompletionWatcher(
driver,
queries=Queries(driver),
refresh_interval=refresh_interval,
debounce=debounce,
) as watcher:
waiters = [watcher.wait_for(jid) for jid in job_ids]
return await asyncio.gather(*waiters)
Wait for first job¶
Return as soon as any job hits a terminal state; cancel pending waiters.
async def wait_for_first(
driver: db.Driver,
job_ids: list[models.JobId],
refresh_interval: timedelta = timedelta(seconds=5),
debounce: timedelta = timedelta(milliseconds=50),
) -> models.JOB_STATUS:
async with CompletionWatcher(
driver,
queries=Queries(driver),
refresh_interval=refresh_interval,
debounce=debounce,
) as watcher:
waiters = [watcher.wait_for(jid) for jid in job_ids]
done, pending = await asyncio.wait(
waiters, return_when=asyncio.FIRST_COMPLETED
)
for fut in pending:
fut.cancel()
return next(iter(done)).result()
Notification Reliability¶
To improve reliability without heavy polling:
-
Listener health check — Run with the
pgq run --shutdown-on-listener-failureflag (or passshutdown_on_listener_failure=TruetoQueueManager.run()) so the manager stops (and can be restarted by a supervisor) if the LISTEN channel becomes unhealthy. -
Minimize the refresh poll — Set a long
refresh_intervalto rely primarily on notifications when your channel is stable: