API

Scheduler

class aiojobs.Scheduler(*, close_timeout=0.1, limit=100, pending_limit=10000, exception_handler=None)

A container for managed jobs.

Jobs are created by spawn().

close() should be used for finishing all scheduled jobs.

The class implements collections.abc.Collection contract, jobs could be iterated etc.: len(scheduler), for job in scheduler, job in scheduler operations are supported.

Class must be instantiated within a running event loop (e.g. in an async function).

  • close_timeout is a timeout for job closing, 0.1 by default. If job’s closing time takes more than timeout a message is logged by Scheduler.call_exception_handler().

  • limit is a limit for jobs spawned by scheduler, 100 by default.

  • pending_limit is a limit for amount of jobs awaiting starting, 10000 by default. Use 0 for infinite pending queue size.

  • exception_handler is a callable with handler(scheduler, context) signature to log unhandled exceptions from jobs (see Scheduler.call_exception_handler() for documentation about context and default implementation).

Note

close_timeout pinned down to 0.1 second, it looks too small at first glance. But it is a timeout for waiting cancelled jobs. Normally job is finished immediately if it doesn’t swallow asyncio.CancelledError.

But in last case there is no reasonable timeout with good number for everybody, user should pass a value suitable for his environment anyway.

limit

Concurrency limit (100 by default) or None if the limit is disabled.

pending_limit

A limit for pending queue size (0 for unlimited queue).

See spawn() for details.

Added in version 0.2.

close_timeout

Timeout for waiting for jobs closing, 0.1 by default.

active_count

Count of active (executed) jobs.

pending_count

Count of scheduled but not executed yet jobs.

closed

True if scheduler is closed (close() called).

async spawn(coro)

Spawn a new job for execution coro coroutine.

Return a new Job object.

The job might be started immediately or pushed into pending list if concurrency limit exceeded.

If pending_count is greater than pending_limit and the limit is finite (not 0) the method suspends execution without scheduling a new job (adding it into pending queue) until penging queue size will be reduced to have a free slot.

Changed in version 0.2: The method respects pending_limit now.

async close()

Close scheduler and all its jobs.

It finishing time for particular job exceeds close_timeout this job is logged by call_exception_handler().

exception_handler

A callable with signature (scheduler, context) or None for default handler.

Used by call_exception_handler().

call_exception_handler(context)

Log an information about errors in not explicitly awaited jobs and jobs that close procedure exceeds close_timeout.

By default calls asyncio.AbstractEventLoop.call_exception_handler(), the behavior could be overridden by passing exception_handler parameter into Scheduler.

context is a dict with the following keys:

  • message: error message, str

  • job: failed job, Job instance

  • exception: caught exception, Exception instance

  • source_traceback: a traceback at the moment of job creation (present only for debug event loops, see also PYTHONASYNCIODEBUG).

Job

class aiojobs.Job

A wrapper around spawned async function.

Job has three states:

  • pending: spawn but not executed yet because of concurrency limit

  • active: is executing now

  • closed: job has finished or stopped.

All exception not explicitly awaited by wait() and close() are logged by Scheduler.call_exception_handler()

active

Job is executed now

pending

Job was spawned by actual execution is delayed because scheduler reached concurrency limit.

closed

Job is finished.

async wait(*, timeout=None)

Wait for job finishing.

If timeout exceeded asyncio.TimeoutError raised.

The job is in closed state after finishing the method.

async close(*, timeout=None)

Close the job.

If timeout exceeded asyncio.TimeoutError raised.

The job is in closed state after finishing the method.

Integration with aiohttp web server

For using the project with aiohttp server a scheduler should be installed into app and new function should be used for spawning new jobs.

aiojobs.aiohttp.setup(app, **kwargs)

Register aiohttp.web.Application.on_startup and aiohttp.web.Application.on_cleanup hooks for creating aiojobs.Scheduler on application initialization stage and closing it on web server shutdown.

aiojobs.aiohttp.spawn(request, coro)
:async:

Spawn a new job using scheduler registered into request.app, or a parent aiohttp.web.Application.

Return aiojobs.Job instance

Helpers

aiojobs.aiohttp.get_scheduler(request)

Return a scheduler from request, raise RuntimeError if scheduler was not registered on application startup phase (see setup()). The scheduler will be resolved from the current or any parent aiohttp.web.Application, if available.

aiojobs.aiohttp.get_scheduler_from_app(app)

Return a scheduler from aiohttp application or None if scheduler was not registered on application startup phase (see setup()).

aiojobs.aiohttp.get_scheduler_from_request(request)

Return a scheduler from aiohttp request or None if scheduler was not registered on any application in the hierarchy of parent applications (see setup())

@aiojobs.aiohttp.atomic

Wrap a web-handler to execute the entire handler as a new job.

@atomic
async def handler(request):
    return web.Response()

is a functional equivalent of

async def inner(request):
    return web.Response()

async def handler(request):
    job = await spawn(request, inner(request))
    return await job.wait()