From c6c55abcf9416abf61118408a695223e0e86d9af Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Wed, 24 Jul 2024 19:03:32 +0100 Subject: [PATCH] Add async with support (#503) --- README.rst | 30 ++++++++++++++---------------- aiojobs/_scheduler.py | 22 +++++++++++++++++++++- docs/api.rst | 14 +++++++++----- docs/index.rst | 30 ++++++++++++++---------------- docs/quickstart.rst | 28 ++++++++++++++++------------ tests/test_scheduler.py | 14 +++++++------- 6 files changed, 81 insertions(+), 57 deletions(-) diff --git a/README.rst b/README.rst index f0a759b..4f11d41 100644 --- a/README.rst +++ b/README.rst @@ -39,16 +39,15 @@ Usage example await asyncio.sleep(timeout) async def main(): - scheduler = aiojobs.Scheduler() - for i in range(100): - # spawn jobs - await scheduler.spawn(coro(i/10)) + async with aiojobs.Scheduler() as scheduler: + for i in range(100): + # spawn jobs + await scheduler.spawn(coro(i/10)) - await asyncio.sleep(5.0) - # not all scheduled jobs are finished at the moment - - # gracefully wait on tasks before closing any remaining spawned jobs - await scheduler.wait_and_close() + await asyncio.sleep(5.0) + # not all scheduled jobs are finished at the moment + # Exit from context will gracefully wait on tasks before closing + # any remaining spawned jobs asyncio.run(main()) @@ -83,13 +82,12 @@ For example: await scheduler.shield(important()) async def main(): - scheduler = aiojobs.Scheduler() - t = asyncio.create_task(run_something(scheduler)) - await asyncio.sleep(0.1) - t.cancel() - with suppress(asyncio.CancelledError): - await t - await scheduler.wait_and_close() + async with aiojobs.Scheduler() as scheduler: + t = asyncio.create_task(run_something(scheduler)) + await asyncio.sleep(0.1) + t.cancel() + with suppress(asyncio.CancelledError): + await t asyncio.run(main()) diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index 0a4abc6..3954700 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -1,6 +1,7 @@ import asyncio import sys from contextlib import suppress +from types import TracebackType from typing import ( Any, Awaitable, @@ -11,6 +12,7 @@ Iterator, Optional, Set, + Type, TypeVar, Union, ) @@ -19,9 +21,12 @@ if sys.version_info >= (3, 11): from asyncio import timeout as asyncio_timeout + from typing import Self else: from async_timeout import timeout as asyncio_timeout + Self = TypeVar("Self", bound="Scheduler") + _T = TypeVar("_T") _FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]] ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] @@ -45,6 +50,7 @@ def __init__( self, *, close_timeout: Optional[float] = 0.1, + wait_timeout: Optional[float] = 60, limit: Optional[int] = 100, pending_limit: int = 10000, exception_handler: Optional[ExceptionHandler] = None, @@ -58,6 +64,7 @@ def __init__( self._jobs: Set[Job[object]] = set() self._shields: Set[asyncio.Task[object]] = set() self._close_timeout = close_timeout + self._wait_timeout = wait_timeout self._limit = limit self._exception_handler = exception_handler self._failed_tasks: asyncio.Queue[Optional[asyncio.Task[object]]] = ( @@ -85,6 +92,17 @@ def __repr__(self) -> str: state += " " return f"" + async def __aenter__(self: Self) -> Self: + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + await self.wait_and_close() + @property def limit(self) -> Optional[int]: return self._limit @@ -164,7 +182,9 @@ def _outer_done_callback(outer: "asyncio.Future[object]") -> None: outer.add_done_callback(_outer_done_callback) return outer - async def wait_and_close(self, timeout: float = 60) -> None: + async def wait_and_close(self, timeout: Optional[float] = None) -> None: + if timeout is None: + timeout = self._wait_timeout with suppress(asyncio.TimeoutError): async with asyncio_timeout(timeout): while self._jobs or self._shields: diff --git a/docs/api.rst b/docs/api.rst index 8a4fb02..3c91be6 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -27,10 +27,14 @@ Scheduler Class must be instantiated within a running event loop (e.g. in an ``async`` function). - * *close_timeout* is a timeout for job closing, ``0.1`` by default. - If job's closing time takes more than timeout a + * *close_timeout* is a timeout for job closing after cancellation, + ``0.1`` by default. If job's closing time takes more than timeout a message is logged by :meth:`Scheduler.call_exception_handler`. + * *wait_timeout* is a timeout to allow pending tasks to complete before + being cancelled when using :meth:`Scheduler.wait_and_close` or + the ``async with`` syntax. Defaults to 60 seconds. + * *limit* is a limit for jobs spawned by scheduler, ``100`` by default. @@ -113,12 +117,12 @@ Scheduler used to ensure that shielded tasks will actually be completed on application shutdown. - .. py:method:: wait_and_close(timeout=60) + .. py:method:: wait_and_close(timeout=None) :async: Wait for currently scheduled tasks to finish gracefully for the given - *timeout*. Then proceed with closing the scheduler, where any - remaining tasks will be cancelled. + *timeout* or *wait_timeout* if *timeout* is ``None``. Then proceed with + closing the scheduler, where any remaining tasks will be cancelled. .. py:method:: close() :async: diff --git a/docs/index.rst b/docs/index.rst index e1ab465..14d9f09 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -28,16 +28,15 @@ Usage example await asyncio.sleep(timeout) async def main(): - scheduler = aiojobs.Scheduler() - for i in range(100): - # spawn jobs - await scheduler.spawn(coro(i/10)) + async with aiojobs.Scheduler() as scheduler: + for i in range(100): + # spawn jobs + await scheduler.spawn(coro(i/10)) - await asyncio.sleep(5.0) - # not all scheduled jobs are finished at the moment - - # gracefully wait on tasks before closing any remaining spawned jobs - await scheduler.wait_and_close() + await asyncio.sleep(5.0) + # not all scheduled jobs are finished at the moment + # Exit from context will gracefully wait on tasks before closing + # any remaining spawned jobs asyncio.run(main()) @@ -75,13 +74,12 @@ For example: await scheduler.shield(important()) async def main(): - scheduler = aiojobs.Scheduler() - t = asyncio.create_task(run_something(scheduler)) - await asyncio.sleep(0.1) - t.cancel() - with suppress(asyncio.CancelledError): - await t - await scheduler.wait_and_close() + async with aiojobs.Scheduler() as scheduler: + t = asyncio.create_task(run_something(scheduler)) + await asyncio.sleep(0.1) + t.cancel() + with suppress(asyncio.CancelledError): + await t asyncio.run(main()) diff --git a/docs/quickstart.rst b/docs/quickstart.rst index faeeefc..ab25048 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -35,7 +35,7 @@ Spawn a new job:: At the end of program gracefully close the scheduler:: - await scheduler.close() + await scheduler.wait_and_close() Let's collect it altogether into very small but still functional example:: @@ -47,16 +47,15 @@ Let's collect it altogether into very small but still functional example:: await asyncio.sleep(timeout) async def main(): - scheduler = aiojobs.Scheduler() - for i in range(100): - # spawn jobs - await scheduler.spawn(coro(i/10)) + async with aiojobs.Scheduler() as scheduler: + for i in range(100): + # spawn jobs + await scheduler.spawn(coro(i/10)) - await asyncio.sleep(5.0) - # not all scheduled jobs are finished at the moment - - # gracefully close spawned jobs - await scheduler.close() + await asyncio.sleep(5.0) + # not all scheduled jobs are finished at the moment + # Exit from context will gracefully wait on tasks before closing + # any remaining spawned jobs asyncio.run(main()) @@ -70,8 +69,13 @@ seconds for its execution. Next we waits for ``5`` seconds. Roughly half of scheduled jobs should be finished already but ``50`` jobs are still active. -For closing them we calls ``await scheduler.close()``, the call sends -:exc:`asyncio.CancelledError` into every non-closed job to stop it. +For closing them we exit the context manager, which calls +:meth:`aiojobs.Scheduler.wait_and_close`. This waits for a grace period +to allow tasks to complete normally, then after a timeout it sends +:exc:`asyncio.CancelledError` into every non-closed job to stop them. + +Alternatively, we could use :meth:`Scheduler.close` to immediately +close/cancel the jobs. That's pretty much it. diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index b365c8f..8e10dc0 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -516,7 +516,7 @@ async def outer() -> None: assert scheduler.closed -async def test_wait_and_close_timeout_shield(scheduler: Scheduler) -> None: +async def test_wait_and_close_timeout_shield() -> None: inner_cancelled = outer_cancelled = False async def inner() -> None: @@ -534,13 +534,13 @@ async def outer() -> None: except asyncio.CancelledError: outer_cancelled = True - await scheduler.spawn(outer()) - await asyncio.sleep(0) - assert not inner_cancelled and not outer_cancelled - assert len(scheduler._shields) == 1 - assert len(scheduler._jobs) == 1 + async with Scheduler(wait_timeout=0.1) as scheduler: + await scheduler.spawn(outer()) + await asyncio.sleep(0) + assert not inner_cancelled and not outer_cancelled + assert len(scheduler._shields) == 1 + assert len(scheduler._jobs) == 1 - await scheduler.wait_and_close(0.1) assert inner_cancelled and outer_cancelled # type: ignore[unreachable] assert len(scheduler._shields) == 0 # type: ignore[unreachable] assert len(scheduler._jobs) == 0