Custom Executors¶
Executors define how jobs are processed once dequeued. PgQueuer provides a default executor, but you can create custom executors to introduce specialized behavior such as advanced logging, conditional execution, or retry logic.
What Are Executors?¶
Custom executors let you:
- Implement custom logic: Interact with external APIs, add specialized error handling, or build complex workflows.
- Modularize job processing: Keep execution logic separate from application code.
- Enhance flexibility: Define concurrency limits, dynamic resource allocation, or multi-step dispatch patterns.
Creating a Custom Executor¶
Subclass AbstractEntrypointExecutor and implement the execute method:
from pgqueuer.executors import AbstractEntrypointExecutor
from pgqueuer.models import Job, Context
class NotificationExecutor(AbstractEntrypointExecutor):
async def execute(self, job: Job, context: Context) -> None:
type_, message = job.payload.decode().split("|")
if type_ == "email":
await self.send_email(message)
elif type_ == "sms":
await self.send_sms(message)
async def send_email(self, message: str) -> None:
print(f"Sending Email: {message}")
async def send_sms(self, message: str) -> None:
print(f"Sending SMS: {message}")
Registering a Custom Executor¶
Pass the executor class via executor_factory:
@pgq.entrypoint("user_notification", executor_factory=NotificationExecutor)
async def notification_task(job: Job) -> None:
pass
Database Retry Executor¶
DatabaseRetryEntrypointExecutor converts unhandled exceptions into database-level retries
via RetryRequested. The job is re-queued in the database so any worker can pick it up after
the delay — retries survive worker restarts.
When to Use It¶
- Failures that may take minutes to resolve (e.g., downstream service outages)
- Jobs that must survive worker restarts between retry attempts
- Scenarios where you want the retry to be visible in the queue and log tables
Example¶
from datetime import timedelta
from pgqueuer import PgQueuer, Job
from pgqueuer.executors import DatabaseRetryEntrypointExecutor
pgq = PgQueuer(driver)
@pgq.entrypoint(
"sync_inventory",
executor_factory=lambda params: DatabaseRetryEntrypointExecutor(
parameters=params,
max_attempts=5,
initial_delay=timedelta(seconds=2),
max_delay=timedelta(minutes=10),
backoff_multiplier=3.0,
),
)
async def sync_inventory(job: Job) -> None:
await inventory_api.sync(job.payload)
Parameters¶
| Parameter | Default | Description |
|---|---|---|
max_attempts |
5 |
Maximum retries before the exception becomes terminal |
initial_delay |
1s |
Delay before the first retry |
max_delay |
5m |
Cap on exponential backoff |
backoff_multiplier |
2.0 |
Multiplier applied to delay after each attempt |
If the handler raises RetryRequested directly, it passes through unchanged — the executor
only converts non-retry exceptions. See Database-Level Retry for the full guide.
Combine with on_failure=\"hold\"
After max_attempts is exhausted, the exception propagates as a terminal failure. Add
on_failure="hold" to park the job instead of deleting it, giving you a chance to inspect
and manually re-queue. See Holding Failed Jobs.