Skip to content

Shared Resources (Context.resources)

PgQueuer lets you provide a single shared resources container that is injected into every job execution context. This makes it easy to initialize heavyweight or shared components (database pools, HTTP clients, caches, ML models, etc.) once at process startup and reuse them across all jobs.

Why Use Shared Resources?

  • Avoid re-initializing expensive objects per job (e.g. HTTP session pools, model weights)
  • Centralize lifecycle (create at startup, optionally close at shutdown)
  • Enable coordinated state (e.g. in‑memory counters, feature flags)
  • Provide a structured place for integrations (tracing, metrics, external APIs)

Providing Resources

You pass a mutable mapping when constructing PgQueuer (or QueueManager directly):

import asyncpg
from contextlib import asynccontextmanager
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job

@asynccontextmanager
async def build_pgqueuer():
    conn = await asyncpg.connect()
    driver = AsyncpgDriver(conn)

    resources = {
        "http_client": build_http_client(),   # e.g. httpx.AsyncClient()
        "vector_index": load_vector_index(),  # custom object
        "feature_flags": {"beta_mode": True},
    }

    pgq = PgQueuer(driver, resources=resources)

    @pgq.entrypoint("process_user")
    async def process_user(job: Job) -> None:
        ctx = pgq.qm.get_context(job.id)
        http = ctx.resources["http_client"]
        flags = ctx.resources["feature_flags"]
        # Use shared objects without recreating them
        ...

    yield pgq

Internally this mapping is passed into each Context as context.resources. All jobs receive the same object (it is not copied), so mutations are visible across jobs.

Access Inside Custom Executors

If you implement a custom executor (AbstractEntrypointExecutor), the execute(self, job, context) method receives the Context:

from pgqueuer.executors import AbstractEntrypointExecutor
from pgqueuer.models import Job, Context

class LoggingExecutor(AbstractEntrypointExecutor):
    async def execute(self, job: Job, context: Context) -> None:
        logger = context.resources.get("logger")
        if logger:
            logger.info("Processing job %s", job.id)
        # Call wrapped function (if delegating) or implement logic directly

Mutating Resources

Because resources is a shared mutable mapping:

context.resources.setdefault("metrics", {}).setdefault("processed", 0)
context.resources["metrics"]["processed"] += 1

If you need stricter control (immutability, lifecycle hooks), you can replace the mapping with a custom registry class; the public contract is simply "object with mapping semantics."

Enabling Context Injection

Entry points only receive the runtime Context when registered with accepts_context=True.

@pgq.entrypoint("process_with_context", accepts_context=True)
async def process_with_context(job: Job, ctx: Context) -> None:
    logger = ctx.resources.get("logger")
    ...

Entry points registered without the flag are invoked with the job only:

@pgq.entrypoint("process_without_context")
async def process_without_context(job: Job) -> None:
    ...

Scheduled Tasks

Scheduled tasks can receive a ScheduleContext with shared resources by setting accepts_context=True:

from pgqueuer.models import Schedule, ScheduleContext

@pgq.schedule("refresh_cache", "*/5 * * * *", accepts_context=True)
async def refresh_cache(schedule: Schedule, ctx: ScheduleContext) -> None:
    http = ctx.resources["http"]
    await http.get("https://api.example.com/ping")

Tasks registered without accepts_context continue to work with just the schedule argument:

@pgq.schedule("simple_task", "*/5 * * * *")
async def simple_task(schedule: Schedule) -> None:
    await perform_task()

Testing With Resources

qm = QueueManager(driver, resources={"flag": "test"})

@qm.entrypoint("demo")
async def demo(job: Job) -> None:
    assert qm.get_context(job.id).resources["flag"] == "test"

Summary

Aspect Behavior
Initialization Passed at construction: PgQueuer(..., resources=...)
Scope Shared across all jobs in the same process
Mutation Visible to subsequent jobs
Scheduled jobs accepts_context=True for ScheduleContext
Custom executors Receive via context.resources