Custom Executors¶
Executors define how jobs are processed once dequeued. PgQueuer provides a default executor, but you can create custom executors to introduce specialized behavior such as advanced logging, conditional execution, or retry logic.
What Are Executors?¶
Custom executors let you:
- Implement custom logic: Interact with external APIs, add specialized error handling, or build complex workflows.
- Modularize job processing: Keep execution logic separate from application code.
- Enhance flexibility: Define concurrency limits, dynamic resource allocation, or multi-step dispatch patterns.
Creating a Custom Executor¶
Subclass AbstractEntrypointExecutor and implement the execute method:
from pgqueuer.executors import AbstractEntrypointExecutor
from pgqueuer.models import Job, Context
class NotificationExecutor(AbstractEntrypointExecutor):
async def execute(self, job: Job, context: Context) -> None:
type_, message = job.payload.decode().split("|")
if type_ == "email":
await self.send_email(message)
elif type_ == "sms":
await self.send_sms(message)
async def send_email(self, message: str) -> None:
print(f"Sending Email: {message}")
async def send_sms(self, message: str) -> None:
print(f"Sending SMS: {message}")
Registering a Custom Executor¶
Pass the executor class via executor_factory:
@pgq.entrypoint("user_notification", executor_factory=NotificationExecutor)
async def notification_task(job: Job) -> None:
pass
Retry with Backoff Executor¶
RetryWithBackoffEntrypointExecutor handles transient failures with exponential backoff and
jitter.
Features¶
- Automatic retries on failure — reduces manual intervention.
- Exponential backoff — progressively longer delays between attempts.
- Jitter — randomised delays to avoid thundering herd on retries.
- Configurable limits — max attempts, backoff cap, and total retry time.
When to Use It¶
- Calling rate-limited or occasionally unavailable external APIs
- Handling transient network timeouts or disconnections
- Retrying database operations during brief contention or outages
Example¶
import asyncpg
from datetime import timedelta
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.executors import RetryWithBackoffEntrypointExecutor
from pgqueuer.models import Job
async def create_pgqueuer() -> PgQueuer:
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
pgq = PgQueuer(driver)
@pgq.entrypoint(
"retry_with_backoff",
executor_factory=lambda parameters: RetryWithBackoffEntrypointExecutor(
parameters=parameters,
max_attempts=5,
max_delay=timedelta(seconds=0.5),
max_time=timedelta(seconds=1),
),
)
async def retry_with_backoff(job: Job) -> None:
print(f"Processing job: {job!r}")
return pgq
Parameters¶
| Parameter | Description |
|---|---|
max_attempts |
Maximum number of retry attempts |
max_delay |
Cap on exponential backoff delay between retries |
max_time |
Maximum total time allowed for all retry attempts combined |