PgQueuer¶
High-performance PostgreSQL job queue for Python. No Redis. No RabbitMQ. Just PostgreSQL.
Install¶
Then set up the database schema:
Optional extras
| Extra | Purpose |
|---|---|
asyncpg |
asyncpg async driver |
psycopg |
psycopg async + sync driver |
logfire |
Logfire distributed tracing |
sentry |
Sentry distributed tracing |
fastapi |
FastAPI Prometheus metrics router |
Quick Start¶
1. Define a consumer¶
Create myapp.py:
from datetime import datetime
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.models import Job, Schedule
async def main() -> PgQueuer:
connection = await asyncpg.connect()
pgq = PgQueuer.from_asyncpg_connection(connection)
@pgq.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed: {job!r}")
@pgq.schedule("cleanup", "0 * * * *")
async def cleanup(schedule: Schedule) -> None:
print(f"Hourly cleanup: {datetime.now()!r}")
return pgq
from datetime import datetime
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.models import Job, Schedule
async def main() -> PgQueuer:
pool = await asyncpg.create_pool()
pgq = PgQueuer.from_asyncpg_pool(pool)
@pgq.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed: {job!r}")
@pgq.schedule("cleanup", "0 * * * *")
async def cleanup(schedule: Schedule) -> None:
print(f"Hourly cleanup: {datetime.now()!r}")
return pgq
from datetime import datetime
import psycopg
from pgqueuer import PgQueuer
from pgqueuer.models import Job, Schedule
async def main() -> PgQueuer:
connection = await psycopg.AsyncConnection.connect(autocommit=True)
pgq = PgQueuer.from_psycopg_connection(connection)
@pgq.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed: {job!r}")
@pgq.schedule("cleanup", "0 * * * *")
async def cleanup(schedule: Schedule) -> None:
print(f"Hourly cleanup: {datetime.now()!r}")
return pgq
from pgqueuer import PgQueuer
from pgqueuer.domain.models import Job
from pgqueuer.domain.types import QueueExecutionMode
async def test_job_handler():
pq = PgQueuer.in_memory()
@pq.entrypoint("say_hello")
async def say_hello(job: Job) -> None:
print(f"Processing: {job.payload}")
await pq.qm.queries.enqueue(["say_hello"], [b"hello"], [0])
await pq.qm.run(mode=QueueExecutionMode.drain)
Note
psycopg connections must use autocommit=True.
2. Run the consumer¶
This registers signal handlers, processes jobs as they arrive, runs scheduled tasks, and shuts down gracefully on Ctrl+C.
3. Enqueue jobs¶
From another process or script:
4. Monitor¶
Features¶
Real-time delivery — PostgreSQL LISTEN/NOTIFY pushes jobs to workers instantly. No polling, no external broker.
Rate limiting — Per-entrypoint requests_per_second and concurrency_limit protect downstream services.
Built-in scheduler — Cron-style recurring tasks without celery-beat or any extra process.
Custom executors — Plug in retry-with-backoff, notification dispatch, or any execution strategy.
Observability — Prometheus metrics, Logfire and Sentry tracing, live CLI dashboard.
In-memory adapter — Drop-in replacement for testing and CI. No Docker, no PostgreSQL required.
Completion tracking — CompletionWatcher lets callers await job results in real time.
Deferred execution — Schedule jobs for a future time with execute_after.
Why PgQueuer?¶
If you're already running PostgreSQL, PgQueuer eliminates the need for Redis, RabbitMQ, or
any other message broker. Your jobs live in the same database as your application data — with
full ACID guarantees, row-level locking via FOR UPDATE SKIP LOCKED, and real-time
notifications via LISTEN/NOTIFY.
See the Celery comparison for a detailed side-by-side.
Next Steps¶
- Shared Resources — share DB pools, HTTP clients, and ML models across jobs
- Scheduling — cron-style recurring tasks
- Rate Limiting — throttle and limit concurrency
- CLI Reference — full list of
pgqcommands - Architecture — how jobs flow through the system