PgQueuer¶
Your PostgreSQL database is already a job queue.
PgQueuer turns PostgreSQL into a fast, reliable background job processor. Jobs live in the same database as your application data -- one stack, full ACID guarantees, zero additional infrastructure.
Why PostgreSQL?¶
If you're already running PostgreSQL, it can do double duty as your job queue. That gives you:
- One fewer service to provision, monitor, and keep available
- Transactional enqueuing -- commit a job in the same transaction as your application data
- Consistent state -- your queue and your data always agree because they share the same database
- Lower latency -- jobs stay local, no round-trip to an external broker
How PgQueuer Works¶
PgQueuer uses battle-tested PostgreSQL primitives to deliver jobs safely and fast:
FOR UPDATE SKIP LOCKED-- workers claim jobs atomically; a job is never handed to two workersLISTEN/NOTIFY-- a trigger on the queue table firespg_notify()on every insert, waking workers instantly- ACID transactions -- jobs are enqueued and processed with the same guarantees as your application data
- Row-level locking -- multiple workers scale horizontally against a single database
┌──────────┐ enqueue ┌────────────┐ NOTIFY ┌──────────┐
│ Your App │───────────▶│ │──────────▶│ Worker 1 │──┐
└──────────┘ │ │ └──────────┘ │
│ PostgreSQL │ NOTIFY ┌──────────┐ │
│ │──────────▶│ Worker 2 │──┤
│ │ └──────────┘ │
│ │ NOTIFY ┌──────────┐ │
│ │──────────▶│ Worker N │──┤
└────────────┘ └──────────┘ │
▲ FOR UPDATE SKIP LOCKED │
└────────────────────────────────┘
A Taste of PgQueuer¶
Define a consumer, register job handlers, and run:
from contextlib import asynccontextmanager
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.models import Job
@asynccontextmanager
async def main():
connection = await asyncpg.connect()
pgq = PgQueuer.from_asyncpg_connection(connection)
@pgq.entrypoint("send_email")
async def send_email(job: Job) -> None:
print(f"Sending email: {job.payload}")
yield pgq
That's it. Just PostgreSQL and your application code.
Key Features¶
-
Real-Time Delivery
PostgreSQL
LISTEN/NOTIFYpushes jobs to workers instantly. A trigger on the queue table firespg_notify()on every insert -- workers wake up immediately without polling. -
Concurrency Control
Per-entrypoint
concurrency_limitprotects downstream services. Or useserialized_dispatchto process jobs strictly one at a time. -
Built-In Scheduler
Cron-style recurring tasks via the
@scheduledecorator with 5-field (minute-level) or 6-field (second-level) expressions. No separate beat process required. -
Automatic Retries & Heartbeat
DatabaseRetryEntrypointExecutorconverts exceptions into database-level retries with exponential backoff. Heartbeat monitoring detects crashed workers and re-queues stalled jobs. -
Observability
Prometheus metrics, Logfire and Sentry distributed tracing, and a live CLI dashboard (
pgq dashboard). See what your workers are doing in real time. -
In-Memory Testing
PgQueuer.in_memory()provides a drop-in replacement for tests and CI. No Docker, no PostgreSQL instance needed. -
Completion Tracking
CompletionWatcherlets callers await job results viaLISTEN/NOTIFY-- no polling required. Wait for one job, many jobs, or race them withasyncio.wait. -
Deferred Execution
Schedule jobs for a future time with
execute_after. Jobs stay queued until their timestamp passes, then enter the normal priority queue.
PgQueuer vs Celery at a Glance¶
| Concern | PgQueuer | Celery |
|---|---|---|
| Infrastructure | PostgreSQL only | PostgreSQL + Redis/RabbitMQ |
| Transactional enqueue | Same transaction as your data | Separate broker |
| Setup | pip install + pgq install |
Broker install + config + beat process |
| Async model | Built on asyncio | Sync-first with async support |
| Recurring tasks | Built-in @schedule decorator |
Separate celery-beat process |
| Complex workflows | Basic job queue | Chains, chords, groups, canvas |
Celery is a mature, battle-tested project that excels at complex multi-step workflows, canvas primitives, and multi-broker topologies. PgQueuer is a good fit when your jobs are backed by PostgreSQL and you want a simpler operational footprint.
Next Steps¶
- Installation -- install PgQueuer and set up the database schema
- Quick Start -- build your first consumer and producer in 5 minutes
- Core Concepts -- understand jobs, entrypoints, and the status lifecycle
- Architecture -- how data flows from producer to consumer