API¶
Scheduler¶
- class aiojobs.Scheduler(*, close_timeout=0.1, limit=100, pending_limit=10000, exception_handler=None)¶
A container for managed jobs.
Jobs are created by
spawn().close()should be used for finishing all scheduled jobs.The class implements
collections.abc.Collectioncontract, jobs could be iterated etc.:len(scheduler),for job in scheduler,job in scheduleroperations are supported.Class must be instantiated within a running event loop (e.g. in an
asyncfunction).close_timeout is a timeout for job closing after cancellation,
0.1by default. If job’s closing time takes more than timeout a message is logged byScheduler.call_exception_handler().wait_timeout is a timeout to allow pending tasks to complete before being cancelled when using
Scheduler.wait_and_close()or theasync withsyntax. Defaults to 60 seconds.limit is a limit for jobs spawned by scheduler,
100by default.pending_limit is a limit for amount of jobs awaiting starting,
10000by default. Use0for infinite pending queue size.exception_handler is a callable with
handler(scheduler, context)signature to log unhandled exceptions from jobs (seeScheduler.call_exception_handler()for documentation about context and default implementation).
Note
close_timeout pinned down to
0.1second, it looks too small at first glance. But it is a timeout for waiting cancelled jobs. Normally job is finished immediately if it doesn’t swallowasyncio.CancelledError.But in last case there is no reasonable timeout with good number for everybody, user should pass a value suitable for his environment anyway.
- limit¶
Concurrency limit (
100by default) orNoneif the limit is disabled.
- pending_limit¶
A limit for pending queue size (
0for unlimited queue).See
spawn()for details.Added in version 0.2.
- close_timeout¶
Timeout for waiting for jobs closing,
0.1by default.
- active_count¶
Count of active (executed) jobs.
- pending_count¶
Count of scheduled but not executed yet jobs.
- async spawn(coro)¶
Spawn a new job for execution coro coroutine.
Return a new
Jobobject.The job might be started immediately or pushed into pending list if concurrency
limitexceeded.If
pending_countis greater thanpending_limitand the limit is finite (not0) the method suspends execution without scheduling a new job (adding it into pending queue) until penging queue size will be reduced to have a free slot.Changed in version 0.2: The method respects
pending_limitnow.
- async shield(coro)¶
Protect an awaitable from being cancelled.
This is a drop-in replacement for
asyncio.shield(), with the addition of tracking the shielded task in the scheduler. This can be used to ensure that shielded tasks will actually be completed on application shutdown.
- async wait_and_close(timeout=None)¶
Wait for currently scheduled tasks to finish gracefully for the given timeout or wait_timeout if timeout is
None. Then proceed with closing the scheduler, where any remaining tasks will be cancelled.
- async close()¶
Close scheduler and all its jobs by cancelling the tasks and then waiting on them.
It finishing time for a particular job exceeds
close_timeoutthe job is logged bycall_exception_handler().
- exception_handler¶
A callable with signature
(scheduler, context)orNonefor default handler.Used by
call_exception_handler().
- call_exception_handler(context)¶
Log an information about errors in not explicitly awaited jobs and jobs that close procedure exceeds
close_timeout.By default calls
asyncio.AbstractEventLoop.call_exception_handler(), the behavior could be overridden by passing exception_handler parameter intoScheduler.context is a
dictwith the following keys:message: error message,
strjob: failed job,
Jobinstanceexception: caught exception,
Exceptioninstancesource_traceback: a traceback at the moment of job creation (present only for debug event loops, see also
PYTHONASYNCIODEBUG).
Job¶
- class aiojobs.Job¶
A wrapper around spawned async function.
Job has three states:
pending: spawn but not executed yet because of concurrency limit
active: is executing now
closed: job has finished or stopped.
All exception not explicitly awaited by
wait()andclose()are logged byScheduler.call_exception_handler()- active¶
Job is executed now
- pending¶
Job was spawned by actual execution is delayed because scheduler reached concurrency limit.
- closed¶
Job is finished.
- async wait(*, timeout=None)¶
Wait for job finishing.
If timeout exceeded
asyncio.TimeoutErrorraised.The job is in closed state after finishing the method.
- async close(*, timeout=None)¶
Close the job.
If timeout exceeded
asyncio.TimeoutErrorraised.The job is in closed state after finishing the method.
Integration with aiohttp web server¶
For using the project with aiohttp server a scheduler should be installed into app and new function should be used for spawning new jobs.
- aiojobs.aiohttp.setup(app, **kwargs)¶
Register
aiohttp.web.Application.on_startupandaiohttp.web.Application.on_cleanuphooks for creatingaiojobs.Scheduleron application initialization stage and closing it on web server shutdown.app -
aiohttp.web.Applicationinstance.kwargs - additional named parameters passed to
aiojobs.Scheduler.
- aiojobs.aiohttp.spawn(request, coro)¶
- :async:
Spawn a new job using scheduler registered into
request.app, or a parentaiohttp.web.Application.request –
aiohttp.web.Requestgiven from web-handlercoro a coroutine to be executed inside a new job
Return
aiojobs.Jobinstance
- aiojobs.aiohttp.shield(request, coro)¶
- :async:
Protect an awaitable from being cancelled while registering the shielded task into the registered scheduler.
Any shielded tasks will then be run to completion when the web app shuts down (assuming it doesn’t exceed the shutdown timeout).
Helpers
- aiojobs.aiohttp.get_scheduler(request)¶
Return a scheduler from request, raise
RuntimeErrorif scheduler was not registered on application startup phase (seesetup()). The scheduler will be resolved from the current or any parentaiohttp.web.Application, if available.
- aiojobs.aiohttp.get_scheduler_from_app(app)¶
Return a scheduler from aiohttp application or
Noneif scheduler was not registered on application startup phase (seesetup()).
- aiojobs.aiohttp.get_scheduler_from_request(request)¶
Return a scheduler from aiohttp request or
Noneif scheduler was not registered on any application in the hierarchy of parent applications (seesetup())
- @aiojobs.aiohttp.atomic¶
Wrap a web-handler to execute the entire handler as a new job.
@atomic async def handler(request): return web.Response()
is a functional equivalent of
async def inner(request): return web.Response() async def handler(request): job = await spawn(request, inner(request)) return await job.wait()