Rate Limiting & Concurrency¶
PgQueuer provides fine-grained control over job execution frequency and concurrency at the entrypoint level.
Rate Limiting¶
Define a maximum number of requests per second for a specific job type:
@pgq.entrypoint("data_processing", requests_per_second=10)
async def process_data(job: Job) -> None:
pass
Jobs that exceed the rate limit are held and retried within the same batch cycle.
Concurrency Limiting¶
Limit the number of jobs of a given type that run simultaneously:
@pgq.entrypoint("data_processing", concurrency_limit=4)
async def process_data(job: Job) -> None:
pass
This is useful for protecting external services with connection pool limits or memory-intensive operations.
Serialized Dispatch¶
Ensure jobs of the same type are processed strictly one at a time:
@pgq.entrypoint("shared_resource", serialized_dispatch=True)
async def process_shared_resource(job: Job) -> None:
pass
serialized_dispatch=True is equivalent to concurrency_limit=1.
Global Concurrency Limit¶
You can also cap the total number of concurrently running tasks across all entrypoints at the worker level using the CLI flag:
This limits the total across all entrypoints regardless of individual entrypoint settings.
Combining Controls¶
You can combine multiple controls on a single entrypoint:
@pgq.entrypoint(
"api_call",
requests_per_second=5,
concurrency_limit=3,
)
async def call_external_api(job: Job) -> None:
pass
Configuring Timeouts¶
Two additional parameters control job processing timing:
dequeue_timeout: Maximum time (in seconds) to wait for new jobs before returning an empty batch. Default: 30 seconds.retry_timer: Interval to retry unprocessed jobs. Default: 0 (no retry timer).
These are set at the QueueManager / PgQueuer level, not per entrypoint.