- class aiojobs.Scheduler(*, close_timeout=0.1, limit=100, pending_limit=10000, exception_handler=None)¶
A container for managed jobs.
Jobs are created by
close()should be used for finishing all scheduled jobs.
The class implements
collections.abc.Collectioncontract, jobs could be iterated etc.:
for job in scheduler,
job in scheduleroperations are supported.
Class must be instantiated within a running event loop (e.g. in an
close_timeout is a timeout for job closing,
0.1by default. If job’s closing time takes more than timeout a message is logged by
limit is a limit for jobs spawned by scheduler,
pending_limit is a limit for amount of jobs awaiting starting,
10000by default. Use
0for infinite pending queue size.
exception_handler is a callable with
handler(scheduler, context)signature to log unhandled exceptions from jobs (see
Scheduler.call_exception_handler()for documentation about context and default implementation).
close_timeout pinned down to
0.1second, it looks too small at first glance. But it is a timeout for waiting cancelled jobs. Normally job is finished immediately if it doesn’t swallow
But in last case there is no reasonable timeout with good number for everybody, user should pass a value suitable for his environment anyway.
Concurrency limit (
100by default) or
Noneif the limit is disabled.
A limit for pending queue size (
0for unlimited queue).
New in version 0.2.
Timeout for waiting for jobs closing,
Count of active (executed) jobs.
Count of scheduled but not executed yet jobs.
- async spawn(coro)¶
Spawn a new job for execution coro coroutine.
Return a new
The job might be started immediately or pushed into pending list if concurrency
pending_countis greater than
pending_limitand the limit is finite (not
0) the method suspends execution without scheduling a new job (adding it into pending queue) until penging queue size will be reduced to have a free slot.
Changed in version 0.2: The method respects
- async close()¶
Close scheduler and all its jobs.
A callable with signature
Nonefor default handler.
Log an information about errors in not explicitly awaited jobs and jobs that close procedure exceeds
By default calls
asyncio.AbstractEventLoop.call_exception_handler(), the behavior could be overridden by passing exception_handler parameter into
context is a
dictwith the following keys:
- class aiojobs.Job¶
A wrapper around spawned async function.
Job has three states:
pending: spawn but not executed yet because of concurrency limit
active: is executing now
closed: job has finished or stopped.
Job is executed now
Job was spawned by actual execution is delayed because scheduler reached concurrency limit.
Job is finished.
- async wait(*, timeout=None)¶
Wait for job finishing.
If timeout exceeded
The job is in closed state after finishing the method.
Integration with aiohttp web server¶
For using the project with aiohttp server a scheduler should be installed into app and new function should be used for spawning new jobs.
- aiojobs.aiohttp.setup(app, **kwargs)¶
- aiojobs.aiohttp.spawn(request, coro)¶
Spawn a new job using scheduler registered into
request.app, or a parent
coro a coroutine to be executed inside a new job
Return a scheduler from request, raise
RuntimeErrorif scheduler was not registered on application startup phase (see
setup()). The scheduler will be resolved from the current or any parent
aiohttp.web.Application, if available.
Return a scheduler from aiohttp application or
Noneif scheduler was not registered on application startup phase (see
Return a scheduler from aiohttp request or
Noneif scheduler was not registered on any application in the hierarchy of parent applications (see
Wrap a web-handler to execute the entire handler as a new job.
@atomic async def handler(request): return web.Response()
is a functional equivalent of
async def inner(request): return web.Response() async def handler(request): job = await spawn(request, inner(request)) return await job.wait()