-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add blocking progress mode to Python async #116
Add blocking progress mode to Python async #116
Conversation
37ea715
to
1c1ab87
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the core of this looks fine, but I am reminded that this epoll_wait
issue with asyncio doesn't actually quite work correctly with the code we are using (see discussion here rapidsai/ucx-py#888)
# - All asyncio tasks that isn't waiting on UCX must be executed | ||
# so that the asyncio's next state is epoll wait. | ||
# See <https://github.com/rapidsai/ucx-py/issues/413> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I think I understand the constraint, I am not sure what it means for the "next state" to be epoll_wait. Surely there can be arbitrary non-ucx tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna be honest and say my understanding here is also a bit fuzzy, and as you noted yourself this "doesn't work" (except when it does), with the original being adapted from https://stackoverflow.com/a/48491563. In my understanding, what epoll_wait
refers to here is the socket state, which is there solely to provide a mechanism to prevent asyncio
from running out of "ready" tasks, so epoll_wait
will ensure that if nothing useful happens in the event loop, asyncio
will still be awaken at some point to allow the loop to reevaluate.
|
||
if self.worker.arm(): | ||
# At this point we know that asyncio's next state is | ||
# epoll wait. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who does the epoll_wait
call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per my #116 (comment), I believe this is related to the sockets.
Yes, I remember that. The purpose of this is not necessarily to be used long-term, but rather to have a fallback to original UCX-Py behavior. This will allow us testing as close as possible with what UCX-Py did in the past, and we may later deprecate/remove this if we are confident we have a better option (e.g., |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanations!
52e4d1d
to
91ab7bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some small docstring comments but I still think this looks good, is there anything else to do here?
The blocking progress mode ensure the worker is progress whenever the UCX | ||
worker reports an event on its epoll file descriptor. In certain | ||
circumstances the epoll file descriptor may not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Grammar suggestion:
Blocking progress mode ensures that the worker is progressed whenever the UCX worker reports and event on its epoll file descriptor.
*nit8: The second sentence appears to be incomplete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed that and a few more issues with the progress timeout in c800ca4 .
event_loop_close = self.event_loop.close | ||
|
||
def _event_loop_close(*args, **kwargs): | ||
if not self.event_loop.is_closed() and self.asyncio_task is not None: | ||
try: | ||
self.asyncio_task.cancel() | ||
self.event_loop.run_until_complete(self.asyncio_task) | ||
except asyncio.exceptions.CancelledError: | ||
pass | ||
finally: | ||
event_loop_close(*args, **kwargs) | ||
|
||
self.event_loop.close = _event_loop_close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wence- would you mind having one more look at this? It is a real solution for the years long coroutine was never awaited
/Task was destroyed but it is pending!
warning that we've attempted to resolve in many instances, including rapidsai/ucx-py#929, yet it is very intrusive. To me it doesn't look like it can be too harmful, but maybe you'll have some other thoughts or opinions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions:
- This changes the behaviour of
EventLoop.close
. Does it only do so for this instance? - What happens if multiple of these
ProgressTask
s are stacked up, I guess each one overwrites theclose
method, but remembers the previous one, so we do unwind everything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This changes the behaviour of
EventLoop.close
. Does it only do so for this instance?
That's right, the change only applies to self.event_loop
, not the whole class, here's an example:
import asyncio
async def run_patch():
loop = asyncio.get_running_loop()
print(f"run_patch: {loop}")
loop_close = loop.close
def _patch_close(*args, **kwargs):
if not loop.is_closed():
print("_patch_close")
loop_close(*args, **kwargs)
loop.close = _patch_close
async def run_orig():
loop = asyncio.get_running_loop()
print(f"run_orig: {loop}")
loop = asyncio.new_event_loop()
loop.run_until_complete(run_patch())
loop.close()
loop2 = asyncio.new_event_loop()
loop2.run_until_complete(run_orig())
loop2.close()
Which prints:
run_patch: <_UnixSelectorEventLoop running=True closed=False debug=False>
_patch_close
run_orig: <_UnixSelectorEventLoop running=True closed=False debug=False>
IOW, _patch_close
only applies to loop
, as expected.
- What happens if multiple of these
ProgressTask
s are stacked up, I guess each one overwrites theclose
method, but remembers the previous one, so we do unwind everything?
This is a very good catch. Indeed this may not work and cause infinite recursion due to local event_loop_close
in its original form, but it does work when passing the original loop.close
function to a partial
, such as:
import asyncio
from functools import partial
async def run_patch():
loop = asyncio.get_running_loop()
print(f"run_patch: {loop}")
loop_close = loop.close
def _patch_close(loop_close, *args, **kwargs):
if not loop.is_closed():
print(f"_patch_close: {loop_close}")
loop_close(*args, **kwargs)
loop.close = partial(_patch_close, loop_close)
loop_close = loop.close
def _patch_close2(loop_close, *args, **kwargs):
if not loop.is_closed():
print(f"_patch_close2: {loop_close}")
loop_close(*args, **kwargs)
loop.close = partial(_patch_close2, loop_close)
loop = asyncio.new_event_loop()
loop.run_until_complete(run_patch())
loop.close()
If I'm not overlooking anything, the sample above is equivalent to having multiple ProgressTask
s stacking up, so the unwinding occurs by always rewriting loop.close
with the most recent ProgressTask
that calls loop.close
that was previously set. This change is now reflected in c5c2ceb .
if not self.event_loop.is_closed() and self.asyncio_task is not None: | ||
try: | ||
self.asyncio_task.cancel() | ||
self.event_loop.run_until_complete(self.asyncio_task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: should we set self.asyncio_task = None
after running until complete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so the idea is that you don't have control over who is closing the event loop, so you instead hook into close
and this task cancels itself during event loop closing/teardown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: should we set
self.asyncio_task = None
after running until complete?
That's a good idea, done in 279cb4c .
OK, so the idea is that you don't have control over who is closing the event loop, so you instead hook into
close
and this task cancels itself during event loop closing/teardown?
Exactly, there doesn't seem to be another way, since we have no control of whether the user will close the loop before resetting UCXX.
event_loop_close = self.event_loop.close | ||
|
||
def _event_loop_close(*args, **kwargs): | ||
if not self.event_loop.is_closed() and self.asyncio_task is not None: | ||
try: | ||
self.asyncio_task.cancel() | ||
self.event_loop.run_until_complete(self.asyncio_task) | ||
except asyncio.exceptions.CancelledError: | ||
pass | ||
finally: | ||
event_loop_close(*args, **kwargs) | ||
|
||
self.event_loop.close = _event_loop_close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions:
- This changes the behaviour of
EventLoop.close
. Does it only do so for this instance? - What happens if multiple of these
ProgressTask
s are stacked up, I guess each one overwrites theclose
method, but remembers the previous one, so we do unwind everything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Peter
/merge |
Thanks all for the reviews here! |
Implements the blocking progress mode (UCX-Py default), which was still not implemented in UCXX.