Skip to content

Commit

Permalink
Add async with support (#503)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreamsorcerer authored Jul 24, 2024
1 parent 757e547 commit c6c55ab
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 57 deletions.
30 changes: 14 additions & 16 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,15 @@ Usage example
await asyncio.sleep(timeout)
async def main():
scheduler = aiojobs.Scheduler()
for i in range(100):
# spawn jobs
await scheduler.spawn(coro(i/10))
async with aiojobs.Scheduler() as scheduler:
for i in range(100):
# spawn jobs
await scheduler.spawn(coro(i/10))
await asyncio.sleep(5.0)
# not all scheduled jobs are finished at the moment
# gracefully wait on tasks before closing any remaining spawned jobs
await scheduler.wait_and_close()
await asyncio.sleep(5.0)
# not all scheduled jobs are finished at the moment
# Exit from context will gracefully wait on tasks before closing
# any remaining spawned jobs
asyncio.run(main())
Expand Down Expand Up @@ -83,13 +82,12 @@ For example:
await scheduler.shield(important())
async def main():
scheduler = aiojobs.Scheduler()
t = asyncio.create_task(run_something(scheduler))
await asyncio.sleep(0.1)
t.cancel()
with suppress(asyncio.CancelledError):
await t
await scheduler.wait_and_close()
async with aiojobs.Scheduler() as scheduler:
t = asyncio.create_task(run_something(scheduler))
await asyncio.sleep(0.1)
t.cancel()
with suppress(asyncio.CancelledError):
await t
asyncio.run(main())
Expand Down
22 changes: 21 additions & 1 deletion aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import sys
from contextlib import suppress
from types import TracebackType
from typing import (
Any,
Awaitable,
Expand All @@ -11,6 +12,7 @@
Iterator,
Optional,
Set,
Type,
TypeVar,
Union,
)
Expand All @@ -19,9 +21,12 @@

if sys.version_info >= (3, 11):
from asyncio import timeout as asyncio_timeout
from typing import Self
else:
from async_timeout import timeout as asyncio_timeout

Self = TypeVar("Self", bound="Scheduler")

_T = TypeVar("_T")
_FutureLike = Union["asyncio.Future[_T]", Awaitable[_T]]
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]
Expand All @@ -45,6 +50,7 @@ def __init__(
self,
*,
close_timeout: Optional[float] = 0.1,
wait_timeout: Optional[float] = 60,
limit: Optional[int] = 100,
pending_limit: int = 10000,
exception_handler: Optional[ExceptionHandler] = None,
Expand All @@ -58,6 +64,7 @@ def __init__(
self._jobs: Set[Job[object]] = set()
self._shields: Set[asyncio.Task[object]] = set()
self._close_timeout = close_timeout
self._wait_timeout = wait_timeout
self._limit = limit
self._exception_handler = exception_handler
self._failed_tasks: asyncio.Queue[Optional[asyncio.Task[object]]] = (
Expand Down Expand Up @@ -85,6 +92,17 @@ def __repr__(self) -> str:
state += " "
return f"<Scheduler {state}jobs={len(self)}>"

async def __aenter__(self: Self) -> Self:
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
await self.wait_and_close()

@property
def limit(self) -> Optional[int]:
return self._limit
Expand Down Expand Up @@ -164,7 +182,9 @@ def _outer_done_callback(outer: "asyncio.Future[object]") -> None:
outer.add_done_callback(_outer_done_callback)
return outer

async def wait_and_close(self, timeout: float = 60) -> None:
async def wait_and_close(self, timeout: Optional[float] = None) -> None:
if timeout is None:
timeout = self._wait_timeout
with suppress(asyncio.TimeoutError):
async with asyncio_timeout(timeout):
while self._jobs or self._shields:
Expand Down
14 changes: 9 additions & 5 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ Scheduler
Class must be instantiated within a running event loop (e.g. in an
``async`` function).

* *close_timeout* is a timeout for job closing, ``0.1`` by default.
If job's closing time takes more than timeout a
* *close_timeout* is a timeout for job closing after cancellation,
``0.1`` by default. If job's closing time takes more than timeout a
message is logged by :meth:`Scheduler.call_exception_handler`.

* *wait_timeout* is a timeout to allow pending tasks to complete before
being cancelled when using :meth:`Scheduler.wait_and_close` or
the ``async with`` syntax. Defaults to 60 seconds.

* *limit* is a limit for jobs spawned by scheduler, ``100`` by
default.

Expand Down Expand Up @@ -113,12 +117,12 @@ Scheduler
used to ensure that shielded tasks will actually be completed on
application shutdown.

.. py:method:: wait_and_close(timeout=60)
.. py:method:: wait_and_close(timeout=None)
:async:

Wait for currently scheduled tasks to finish gracefully for the given
*timeout*. Then proceed with closing the scheduler, where any
remaining tasks will be cancelled.
*timeout* or *wait_timeout* if *timeout* is ``None``. Then proceed with
closing the scheduler, where any remaining tasks will be cancelled.

.. py:method:: close()
:async:
Expand Down
30 changes: 14 additions & 16 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ Usage example
await asyncio.sleep(timeout)
async def main():
scheduler = aiojobs.Scheduler()
for i in range(100):
# spawn jobs
await scheduler.spawn(coro(i/10))
async with aiojobs.Scheduler() as scheduler:
for i in range(100):
# spawn jobs
await scheduler.spawn(coro(i/10))
await asyncio.sleep(5.0)
# not all scheduled jobs are finished at the moment
# gracefully wait on tasks before closing any remaining spawned jobs
await scheduler.wait_and_close()
await asyncio.sleep(5.0)
# not all scheduled jobs are finished at the moment
# Exit from context will gracefully wait on tasks before closing
# any remaining spawned jobs
asyncio.run(main())
Expand Down Expand Up @@ -75,13 +74,12 @@ For example:
await scheduler.shield(important())
async def main():
scheduler = aiojobs.Scheduler()
t = asyncio.create_task(run_something(scheduler))
await asyncio.sleep(0.1)
t.cancel()
with suppress(asyncio.CancelledError):
await t
await scheduler.wait_and_close()
async with aiojobs.Scheduler() as scheduler:
t = asyncio.create_task(run_something(scheduler))
await asyncio.sleep(0.1)
t.cancel()
with suppress(asyncio.CancelledError):
await t
asyncio.run(main())
Expand Down
28 changes: 16 additions & 12 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Spawn a new job::

At the end of program gracefully close the scheduler::

await scheduler.close()
await scheduler.wait_and_close()


Let's collect it altogether into very small but still functional example::
Expand All @@ -47,16 +47,15 @@ Let's collect it altogether into very small but still functional example::
await asyncio.sleep(timeout)

async def main():
scheduler = aiojobs.Scheduler()
for i in range(100):
# spawn jobs
await scheduler.spawn(coro(i/10))
async with aiojobs.Scheduler() as scheduler:
for i in range(100):
# spawn jobs
await scheduler.spawn(coro(i/10))

await asyncio.sleep(5.0)
# not all scheduled jobs are finished at the moment

# gracefully close spawned jobs
await scheduler.close()
await asyncio.sleep(5.0)
# not all scheduled jobs are finished at the moment
# Exit from context will gracefully wait on tasks before closing
# any remaining spawned jobs

asyncio.run(main())

Expand All @@ -70,8 +69,13 @@ seconds for its execution.
Next we waits for ``5`` seconds. Roughly half of scheduled jobs should
be finished already but ``50`` jobs are still active.

For closing them we calls ``await scheduler.close()``, the call sends
:exc:`asyncio.CancelledError` into every non-closed job to stop it.
For closing them we exit the context manager, which calls
:meth:`aiojobs.Scheduler.wait_and_close`. This waits for a grace period
to allow tasks to complete normally, then after a timeout it sends
:exc:`asyncio.CancelledError` into every non-closed job to stop them.

Alternatively, we could use :meth:`Scheduler.close` to immediately
close/cancel the jobs.

That's pretty much it.

Expand Down
14 changes: 7 additions & 7 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ async def outer() -> None:
assert scheduler.closed


async def test_wait_and_close_timeout_shield(scheduler: Scheduler) -> None:
async def test_wait_and_close_timeout_shield() -> None:
inner_cancelled = outer_cancelled = False

async def inner() -> None:
Expand All @@ -534,13 +534,13 @@ async def outer() -> None:
except asyncio.CancelledError:
outer_cancelled = True

await scheduler.spawn(outer())
await asyncio.sleep(0)
assert not inner_cancelled and not outer_cancelled
assert len(scheduler._shields) == 1
assert len(scheduler._jobs) == 1
async with Scheduler(wait_timeout=0.1) as scheduler:
await scheduler.spawn(outer())
await asyncio.sleep(0)
assert not inner_cancelled and not outer_cancelled
assert len(scheduler._shields) == 1
assert len(scheduler._jobs) == 1

await scheduler.wait_and_close(0.1)
assert inner_cancelled and outer_cancelled # type: ignore[unreachable]
assert len(scheduler._shields) == 0 # type: ignore[unreachable]
assert len(scheduler._jobs) == 0
Expand Down

0 comments on commit c6c55ab

Please sign in to comment.