From 6dae328d93dcd719833c1f49149a7118ebbf9ef6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 2 Nov 2023 07:01:23 -0700 Subject: [PATCH 01/22] Expose `ucxx::Worker` epoll file descriptor getter --- cpp/include/ucxx/worker.h | 17 +++++++++++++++++ cpp/src/worker.cpp | 8 ++++++++ python/ucxx/_lib/libucxx.pyx | 17 +++++++++++++++++ python/ucxx/_lib/ucxx_api.pxd | 2 ++ 4 files changed, 44 insertions(+) diff --git a/cpp/include/ucxx/worker.h b/cpp/include/ucxx/worker.h index 727aad65..4b87d75b 100644 --- a/cpp/include/ucxx/worker.h +++ b/cpp/include/ucxx/worker.h @@ -245,6 +245,23 @@ class Worker : public Component { */ void initBlockingProgressMode(); + /** + * @brief Get the epoll file descriptor associated with the worker. + * + * Get the epoll file descriptor associated with the worker when running in blocking mode. + * The worker only has an associated epoll file descriptor after + * `initBlockingProgressMode()` is executed. + * + * The file descriptor is destroyed as part of the `ucxx::Worker` destructor, thus any + * reference to it shall not be used after that. + * + * @throws std::runtime_error if `initBlockingProgressMode()` was not executed to run the + * worker in blocking progress mode. + * + * @returns the file descriptor. + */ + int getEpollFileDescriptor(); + /** * @brief Arm the UCP worker. * diff --git a/cpp/src/worker.cpp b/cpp/src/worker.cpp index fe63ed64..477b02c2 100644 --- a/cpp/src/worker.cpp +++ b/cpp/src/worker.cpp @@ -207,6 +207,14 @@ void Worker::initBlockingProgressMode() if (err != 0) throw std::ios_base::failure(std::string("epoll_ctl() returned " + err)); } +int Worker::getEpollFileDescriptor() +{ + if (_epollFileDescriptor == 0) + throw std::runtime_error("Worker not running in blocking progress mode"); + + return _epollFileDescriptor; +} + bool Worker::arm() { ucs_status_t status = ucp_worker_arm(_handle); diff --git a/python/ucxx/_lib/libucxx.pyx b/python/ucxx/_lib/libucxx.pyx index 3e26bfba..66b83379 100644 --- a/python/ucxx/_lib/libucxx.pyx +++ b/python/ucxx/_lib/libucxx.pyx @@ -504,6 +504,23 @@ cdef class UCXWorker(): with nogil: self._worker.get().initBlockingProgressMode() + def arm(self): + cdef bint armed + + with nogil: + armed = self._worker.get().arm() + + return armed + + @property + def epoll_file_descriptor(self): + cdef int epoll_file_descriptor = 0 + + with nogil: + epoll_file_descriptor = self._worker.get().getEpollFileDescriptor() + + return epoll_file_descriptor + def progress(self): with nogil: self._worker.get().progress() diff --git a/python/ucxx/_lib/ucxx_api.pxd b/python/ucxx/_lib/ucxx_api.pxd index 3ee28346..e06982f1 100644 --- a/python/ucxx/_lib/ucxx_api.pxd +++ b/python/ucxx/_lib/ucxx_api.pxd @@ -228,6 +228,8 @@ cdef extern from "" namespace "ucxx" nogil: uint16_t port, ucp_listener_conn_callback_t callback, void *callback_args ) except +raise_py_error void initBlockingProgressMode() except +raise_py_error + int getEpollFileDescriptor() + bint arm() except +raise_py_error void progress() bint progressOnce() void progressWorkerEvent(int epoll_timeout) From 155ab589df7494b54a1e021b1ddaa5c7cbeea47f Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 2 Nov 2023 07:11:18 -0700 Subject: [PATCH 02/22] Add blocking progress mode to Python async --- python/ucxx/_lib_async/application_context.py | 11 +++-- .../_lib_async/continuous_ucx_progress.py | 49 +++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/python/ucxx/_lib_async/application_context.py b/python/ucxx/_lib_async/application_context.py index 3303c23e..01cd8584 100644 --- a/python/ucxx/_lib_async/application_context.py +++ b/python/ucxx/_lib_async/application_context.py @@ -11,7 +11,7 @@ from ucxx._lib.arr import Array from ucxx.exceptions import UCXMessageTruncatedError -from .continuous_ucx_progress import PollingMode, ThreadMode +from .continuous_ucx_progress import BlockingMode, PollingMode, ThreadMode from .endpoint import Endpoint from .exchange_peer_info import exchange_peer_info from .listener import ActiveClients, Listener, _listener_handler @@ -60,6 +60,9 @@ def __init__( enable_python_future=enable_python_future, ) + if self.progress_mode == "blocking": + self.worker.init_blocking_progress_mode() + self.start_notifier_thread() weakref.finalize(self, self.progress_tasks.clear) @@ -77,7 +80,7 @@ def _check_progress_mode(progress_mode): else: progress_mode = "thread" - valid_progress_modes = ["polling", "thread", "thread-polling"] + valid_progress_modes = ["blocking", "polling", "thread", "thread-polling"] if not isinstance(progress_mode, str) or not any( progress_mode == m for m in valid_progress_modes ): @@ -107,7 +110,7 @@ def _check_enable_delayed_submission(enable_delayed_submission, progress_mode): and explicit_enable_delayed_submission ): raise ValueError( - f"Delayed submission requested, but {progress_mode} does not " + f"Delayed submission requested, but '{progress_mode}' mode does not " "support it, 'thread' or 'thread-polling' progress mode required." ) @@ -400,6 +403,8 @@ def continuous_ucx_progress(self, event_loop=None): task = ThreadMode(self.worker, loop, polling_mode=True) elif self.progress_mode == "polling": task = PollingMode(self.worker, loop) + elif self.progress_mode == "blocking": + task = BlockingMode(self.worker, loop, self.worker.epoll_file_descriptor) self.progress_tasks.append(task) diff --git a/python/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/_lib_async/continuous_ucx_progress.py index 8c31fcd7..d9850bcf 100644 --- a/python/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/_lib_async/continuous_ucx_progress.py @@ -3,6 +3,8 @@ import asyncio +import socket +import weakref class ProgressTask(object): @@ -70,3 +72,50 @@ async def _progress_task(self): worker.progress() # Give other co-routines a chance to run. await asyncio.sleep(0) + + +class BlockingMode(ProgressTask): + def __init__(self, worker, event_loop, epoll_fd): + super().__init__(worker, event_loop) + + # Creating a job that is ready straightaway but with low priority. + # Calling `await self.event_loop.sock_recv(self.rsock, 1)` will + # return when all non-IO tasks are finished. + # See . + self.rsock, wsock = socket.socketpair() + self.rsock.setblocking(0) + wsock.setblocking(0) + wsock.close() + + # Bind an asyncio reader to a UCX epoll file descripter + event_loop.add_reader(epoll_fd, self._fd_reader_callback) + + # Remove the reader and close socket on finalization + weakref.finalize(self, event_loop.remove_reader, epoll_fd) + weakref.finalize(self, self.rsock.close) + + def _fd_reader_callback(self): + self.worker.progress() + + # Notice, we can safely overwrite `self.dangling_arm_task` + # since previous arm task is finished by now. + assert self.asyncio_task is None or self.asyncio_task.done() + self.asyncio_task = self.event_loop.create_task(self._arm_worker()) + + async def _arm_worker(self): + # When arming the worker, the following must be true: + # - No more progress in UCX (see doc of ucp_worker_arm()) + # - All asyncio tasks that isn't waiting on UCX must be executed + # so that the asyncio's next state is epoll wait. + # See + while True: + self.worker.progress() + + # This IO task returns when all non-IO tasks are finished. + # Notice, we do NOT hold a reference to `worker` while waiting. + await self.event_loop.sock_recv(self.rsock, 1) + + if self.worker.arm(): + # At this point we know that asyncio's next state is + # epoll wait. + break From 1c1ab874f9f571a77e0a0e3b500de4d8b39cd531 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 2 Nov 2023 15:17:47 -0700 Subject: [PATCH 03/22] Test blocking mode in CI --- ci/test_python.sh | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index 169674e2..d9601e64 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -109,24 +109,28 @@ rapids-logger "Python Async Tests" # run_tests_async PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE SKIP run_tests_async thread 0 0 0 run_tests_async thread 1 1 0 +run_tests_async blocking 0 1 0 rapids-logger "Python Benchmarks" # run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW run_py_benchmark ucxx-core thread 0 0 0 1 0 run_py_benchmark ucxx-core thread 1 0 0 1 0 -for nbuf in 1 8; do - if [[ ! $RAPIDS_CUDA_VERSION =~ 11.2.* ]]; then - # run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW - run_py_benchmark ucxx-async thread 0 0 0 ${nbuf} 0 - run_py_benchmark ucxx-async thread 0 0 1 ${nbuf} 0 - run_py_benchmark ucxx-async thread 0 1 0 ${nbuf} 0 - run_py_benchmark ucxx-async thread 0 1 1 ${nbuf} 0 - fi +for progress_mode in "blocking" "thread"; do + for nbuf in 1 8; do + if [[ ! $RAPIDS_CUDA_VERSION =~ 11.2.* ]]; then + # run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW + run_py_benchmark ucxx-async ${progress_mode} 0 0 0 ${nbuf} 0 + run_py_benchmark ucxx-async ${progress_mode} 0 0 1 ${nbuf} 0 + run_py_benchmark ucxx-async ${progress_mode} 0 1 0 ${nbuf} 0 + run_py_benchmark ucxx-async ${progress_mode} 0 1 1 ${nbuf} 0 + fi + done done rapids-logger "Distributed Tests" # run_distributed_ucxx_tests PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE +run_distributed_ucxx_tests blocking 0 0 run_distributed_ucxx_tests polling 0 0 run_distributed_ucxx_tests thread 0 0 run_distributed_ucxx_tests thread 0 1 From e5f4a40c4f750b7c0c2d33c6aff37dd27df188b2 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 6 Nov 2023 09:24:23 -0800 Subject: [PATCH 04/22] Disable Python future on `blocking` mode testing --- ci/test_python.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index d9601e64..eca2494d 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -109,7 +109,7 @@ rapids-logger "Python Async Tests" # run_tests_async PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE SKIP run_tests_async thread 0 0 0 run_tests_async thread 1 1 0 -run_tests_async blocking 0 1 0 +run_tests_async blocking 0 0 0 rapids-logger "Python Benchmarks" # run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW From edd31925b867b27d348592eb1443f690e622953e Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 17 Jan 2024 03:09:24 -0800 Subject: [PATCH 05/22] Add timeout to Python's async blocking progress mode --- python/ucxx/_lib_async/application_context.py | 2 +- .../_lib_async/continuous_ucx_progress.py | 92 ++++++++++++++++++- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/python/ucxx/_lib_async/application_context.py b/python/ucxx/_lib_async/application_context.py index e7ccbada..7ad1e914 100644 --- a/python/ucxx/_lib_async/application_context.py +++ b/python/ucxx/_lib_async/application_context.py @@ -405,7 +405,7 @@ def continuous_ucx_progress(self, event_loop=None): elif self.progress_mode == "polling": task = PollingMode(self.worker, loop) elif self.progress_mode == "blocking": - task = BlockingMode(self.worker, loop, self.worker.epoll_file_descriptor) + task = BlockingMode(self.worker, loop) self.progress_tasks.append(task) diff --git a/python/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/_lib_async/continuous_ucx_progress.py index d9850bcf..323ffda6 100644 --- a/python/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/_lib_async/continuous_ucx_progress.py @@ -4,8 +4,11 @@ import asyncio import socket +import time import weakref +from ucxx._lib.libucxx import UCXWorker + class ProgressTask(object): def __init__(self, worker, event_loop): @@ -75,7 +78,28 @@ async def _progress_task(self): class BlockingMode(ProgressTask): - def __init__(self, worker, event_loop, epoll_fd): + def __init__( + self, + worker: UCXWorker, + event_loop: asyncio.AbstractEventLoop, + progress_timeout: float = 1.0, + ): + """Progress the UCX worker in blocking mode. + + The blocking progress mode ensure the worker is progress whenever the UCX + worker reports an event on its epoll file descriptor. In certain + circumstances the epoll file descriptor may not + + Parameters + ---------- + worker: UCXWorker + Worker object from the UCXX Cython API to progress. + event_loop: asyncio.AbstractEventLoop + Asynchronous event loop where to schedule async tasks. + progress_timeout: float + The timeout to sleep until calling checking again whether the worker should + be progressed. + """ super().__init__(worker, event_loop) # Creating a job that is ready straightaway but with low priority. @@ -87,6 +111,8 @@ def __init__(self, worker, event_loop, epoll_fd): wsock.setblocking(0) wsock.close() + epoll_fd = worker.epoll_file_descriptor + # Bind an asyncio reader to a UCX epoll file descripter event_loop.add_reader(epoll_fd, self._fd_reader_callback) @@ -94,21 +120,56 @@ def __init__(self, worker, event_loop, epoll_fd): weakref.finalize(self, event_loop.remove_reader, epoll_fd) weakref.finalize(self, self.rsock.close) + self.blocking_asyncio_task = None + self.last_progress_time = time.monotonic() - progress_timeout + self.asyncio_task = event_loop.create_task(self._timeout_progress(1.0)) + + def __del__(self): + """Cancel asynchronous blocking progress task. + + Cancel asynchronouns blocking progress task. + + .. warning:: + This only works if the event loop is still running. If the event loop has + been closed before this runs the following error will be printed by the + interpreter on the standard output: + + ``` + Task was destroyed but it is pending! + ``` + """ + if self.event_loop is not None and self.event_loop.is_running(): + if self.blocking_asyncio_task is not None: + self.call_soon_threadsafe(self.blocking_asyncio_task.cancel()) + + super().__del__() + def _fd_reader_callback(self): + """Schedule new progress task upon worker event. + + Schedule new progress task when a new event occurs in the worker's epoll file + descriptor. + """ self.worker.progress() # Notice, we can safely overwrite `self.dangling_arm_task` # since previous arm task is finished by now. - assert self.asyncio_task is None or self.asyncio_task.done() - self.asyncio_task = self.event_loop.create_task(self._arm_worker()) + assert self.blocking_asyncio_task is None or self.blocking_asyncio_task.done() + self.blocking_asyncio_task = self.event_loop.create_task(self._arm_worker()) async def _arm_worker(self): + """Progress the worker and rearm. + + Progress and rearm the worker to watch for new events on its epoll file + descriptor. + """ # When arming the worker, the following must be true: # - No more progress in UCX (see doc of ucp_worker_arm()) # - All asyncio tasks that isn't waiting on UCX must be executed # so that the asyncio's next state is epoll wait. # See while True: + self.last_progress_time = time.monotonic() self.worker.progress() # This IO task returns when all non-IO tasks are finished. @@ -119,3 +180,28 @@ async def _arm_worker(self): # At this point we know that asyncio's next state is # epoll wait. break + + async def _timeout_progress(self, progress_timeout: float = 1.0): + """Protect worker from never progressing again. + + To ensure the worker progresses if no events are raised and the asyncio loop + getting stuck we must ensure the worker is progressed every so often. This + method ensures the worker is progressed independent of what the epoll file + descriptor does if longer than `progress_timeout` has elapsed since last check, + thus preventing a deadlock. + + Parameters + ---------- + progress_timeout: float + The timeout to sleep until calling checking again whether the worker should + be progressed. + """ + while True: + worker = self.worker + if worker is None: + return + if time.monotonic() > self.last_progress_time + progress_timeout: + self.last_progress_time = time.monotonic() + worker.progress() + # Give other co-routines a chance to run. + await asyncio.sleep(progress_timeout) From e3f9cc3dfafb49a9c9881aa80edb9022e01571c6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 17 Jan 2024 04:12:38 -0800 Subject: [PATCH 06/22] Support blocking mode in 'send_recv` Python benchmark --- python/ucxx/benchmarks/send_recv.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/ucxx/benchmarks/send_recv.py b/python/ucxx/benchmarks/send_recv.py index bfe7ae4a..51f217b3 100644 --- a/python/ucxx/benchmarks/send_recv.py +++ b/python/ucxx/benchmarks/send_recv.py @@ -294,8 +294,8 @@ def parse_args(): parser.add_argument( "--progress-mode", default="thread", - help="Progress mode for the UCP worker. Valid options are: " - "'thread' (default) and 'blocking'.", + help="Progress mode for the UCP worker. Valid options are: 'blocking, " + "'polling', 'thread' and 'thread-polling. (Default: 'thread')'", type=str, ) parser.add_argument( @@ -350,8 +350,6 @@ def parse_args(): if args.progress_mode not in ["blocking", "polling", "thread", "thread-polling"]: raise RuntimeError(f"Invalid `--progress-mode`: '{args.progress_mode}'") - if args.progress_mode == "blocking" and args.backend == "ucxx-async": - raise RuntimeError("Blocking progress mode not supported for ucxx-async yet") if args.asyncio_wait and not args.progress_mode.startswith("thread"): raise RuntimeError( "`--asyncio-wait` requires `--progress-mode=thread` or " From b5f95f01aab1ab8d726b7f7a82738a73381c02d6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 17 Jan 2024 04:20:21 -0800 Subject: [PATCH 07/22] Schedule cancelation in `ProgressTask` deleter --- ci/test_cpp.sh | 2 +- ci/test_python.sh | 2 +- python/ucxx/_lib_async/continuous_ucx_progress.py | 12 +++++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/ci/test_cpp.sh b/ci/test_cpp.sh index 5d72d463..c32e834f 100755 --- a/ci/test_cpp.sh +++ b/ci/test_cpp.sh @@ -94,7 +94,7 @@ run_port_retry() { } rapids-logger "Downloading artifacts from previous jobs" -CPP_CHANNEL=$(rapids-download-conda-from-s3 cpp) +CPP_CHANNEL=${RAPIDS_CONDA_BLD_OUTPUT_DIR} rapids-mamba-retry install \ --channel "${CPP_CHANNEL}" \ diff --git a/ci/test_python.sh b/ci/test_python.sh index a81f204f..256dbd21 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -87,7 +87,7 @@ run_distributed_ucxx_tests() { } rapids-logger "Downloading artifacts from previous jobs" -CPP_CHANNEL=$(rapids-download-conda-from-s3 cpp) +CPP_CHANNEL=${RAPIDS_CONDA_BLD_OUTPUT_DIR} rapids-mamba-retry install \ --channel "${CPP_CHANNEL}" \ diff --git a/python/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/_lib_async/continuous_ucx_progress.py index 323ffda6..37469e78 100644 --- a/python/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/_lib_async/continuous_ucx_progress.py @@ -30,11 +30,13 @@ def __init__(self, worker, event_loop): self.asyncio_task = None def __del__(self): - if self.asyncio_task is not None: - # FIXME: This does not work, the cancellation must be awaited. - # Running with polling mode will always cause - # `Task was destroyed but it is pending!` errors at ucxx.reset(). - self.asyncio_task.cancel() + # FIXME: This only works if the event loop is still running and awaits the + # cancelation. + # Running with blocking and polling modes may cause + # `Task was destroyed but it is pending!` errors at ucxx.reset(). + if self.event_loop is not None and self.event_loop.is_running(): + if self.asyncio_task is not None: + self.call_soon_threadsafe(self.asyncio_task.cancel()) # Hash and equality is based on the event loop def __hash__(self): From 60e49d19a4834f4a7b1eda5b40762f3491885be1 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 17 Jan 2024 05:39:43 -0800 Subject: [PATCH 08/22] Rerun CI From 91ab7bf0421757829a51877a9f67c5c59f8ee153 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 17 Jan 2024 07:59:29 -0800 Subject: [PATCH 09/22] Revert accidental CI script changes --- ci/test_cpp.sh | 2 +- ci/test_python.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/test_cpp.sh b/ci/test_cpp.sh index c32e834f..5d72d463 100755 --- a/ci/test_cpp.sh +++ b/ci/test_cpp.sh @@ -94,7 +94,7 @@ run_port_retry() { } rapids-logger "Downloading artifacts from previous jobs" -CPP_CHANNEL=${RAPIDS_CONDA_BLD_OUTPUT_DIR} +CPP_CHANNEL=$(rapids-download-conda-from-s3 cpp) rapids-mamba-retry install \ --channel "${CPP_CHANNEL}" \ diff --git a/ci/test_python.sh b/ci/test_python.sh index 256dbd21..a81f204f 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -87,7 +87,7 @@ run_distributed_ucxx_tests() { } rapids-logger "Downloading artifacts from previous jobs" -CPP_CHANNEL=${RAPIDS_CONDA_BLD_OUTPUT_DIR} +CPP_CHANNEL=$(rapids-download-conda-from-s3 cpp) rapids-mamba-retry install \ --channel "${CPP_CHANNEL}" \ From 26480a5f729af8da75bf568d0607f37d89d94619 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 18 Jan 2024 04:55:11 -0800 Subject: [PATCH 10/22] Disable blocking progress mode delayed submission benchmarks --- ci/test_python.sh | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index a81f204f..7aa5190b 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -122,8 +122,12 @@ for progress_mode in "blocking" "thread"; do # run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW run_py_benchmark ucxx-async ${progress_mode} 0 0 0 ${nbuf} 0 run_py_benchmark ucxx-async ${progress_mode} 0 0 1 ${nbuf} 0 - run_py_benchmark ucxx-async ${progress_mode} 0 1 0 ${nbuf} 0 - run_py_benchmark ucxx-async ${progress_mode} 0 1 1 ${nbuf} 0 + if [[ ${progress_mode} != "blocking" ]]; then + # Delayed submission isn't support by blocking progress mode + # run_py_benchmark BACKEND PROGRESS_MODE ASYNCIO_WAIT ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE NBUFFERS SLOW + run_py_benchmark ucxx-async ${progress_mode} 0 1 0 ${nbuf} 0 + run_py_benchmark ucxx-async ${progress_mode} 0 1 1 ${nbuf} 0 + fi fi done done From 77b3659738f5e04aaba9cc576dd3958027a73425 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 30 Jan 2024 10:00:14 -0800 Subject: [PATCH 11/22] Remove `pytest.mark.gpu` --- python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py | 2 -- .../distributed-ucxx/distributed_ucxx/tests/test_ucxx_config.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py b/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py index 1f820629..0163d1a8 100644 --- a/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py +++ b/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py @@ -27,8 +27,6 @@ import distributed_ucxx # noqa: E402 from distributed_ucxx.utils_test import gen_test -pytestmark = pytest.mark.gpu - try: HOST = ucxx.get_address() except Exception: diff --git a/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx_config.py b/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx_config.py index d167d39f..08b11226 100644 --- a/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx_config.py +++ b/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx_config.py @@ -13,8 +13,6 @@ from distributed_ucxx.ucxx import _prepare_ucx_config from distributed_ucxx.utils_test import gen_test -pytestmark = pytest.mark.gpu - try: HOST = get_ip() except Exception: From e22b227bd56924ad101720d0de6ceab30b87d106 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 25 Jul 2024 07:39:39 -0700 Subject: [PATCH 12/22] Re-enable blocking Distributed tests in CI --- ci/test_python_distributed.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ci/test_python_distributed.sh b/ci/test_python_distributed.sh index 46160707..c955f4bf 100755 --- a/ci/test_python_distributed.sh +++ b/ci/test_python_distributed.sh @@ -33,6 +33,7 @@ print_ucx_config rapids-logger "Run distributed-ucxx tests with conda package" # run_distributed_ucxx_tests PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE +run_distributed_ucxx_tests blocking 0 0 run_distributed_ucxx_tests polling 0 0 run_distributed_ucxx_tests thread 0 0 run_distributed_ucxx_tests thread 0 1 @@ -42,6 +43,7 @@ run_distributed_ucxx_tests thread 1 1 install_distributed_dev_mode # run_distributed_ucxx_tests_internal PROGRESS_MODE ENABLE_DELAYED_SUBMISSION ENABLE_PYTHON_FUTURE +run_distributed_ucxx_tests_internal blocking 0 0 run_distributed_ucxx_tests_internal polling 0 0 run_distributed_ucxx_tests_internal thread 0 0 run_distributed_ucxx_tests_internal thread 0 1 From c800ca4e1afac5c6ecea3a98827592ba78754326 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 30 Sep 2024 14:03:31 -0700 Subject: [PATCH 13/22] Fix progress timeout and docstrings --- .../_lib_async/continuous_ucx_progress.py | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py index 997b8fd3..ce7813f9 100644 --- a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py @@ -88,9 +88,11 @@ def __init__( ): """Progress the UCX worker in blocking mode. - The blocking progress mode ensure the worker is progress whenever the UCX - worker reports an event on its epoll file descriptor. In certain - circumstances the epoll file descriptor may not + The blocking progress mode ensure the worker is progresses whenever the + UCX worker reports an event on its epoll file descriptor. In certain + circumstances the epoll file descriptor may not present an event, thus + the `progress_timeout` will ensure the UCX worker is progressed to + prevent a potential deadlock. Parameters ---------- @@ -103,6 +105,7 @@ def __init__( be progressed. """ super().__init__(worker, event_loop) + self._progress_timeout = progress_timeout # Creating a job that is ready straight away but with low priority. # Calling `await self.event_loop.sock_recv(self.rsock, 1)` will @@ -123,8 +126,8 @@ def __init__( weakref.finalize(self, self.rsock.close) self.blocking_asyncio_task = None - self.last_progress_time = time.monotonic() - progress_timeout - self.asyncio_task = event_loop.create_task(self._timeout_progress(1.0)) + self.last_progress_time = time.monotonic() - self._progress_timeout + self.asyncio_task = event_loop.create_task(self._progress_with_timeout()) def __del__(self): """Cancel asynchronous blocking progress task. @@ -176,34 +179,30 @@ async def _arm_worker(self): # This IO task returns when all non-IO tasks are finished. # Notice, we do NOT hold a reference to `worker` while waiting. - await self.event_loop.sock_recv(self.rsock, 1) + await asyncio.wait_for( + self.event_loop.sock_recv(self.rsock, 1), self._progress_timeout + ) if self.worker.arm(): # At this point we know that asyncio's next state is # epoll wait. break - async def _timeout_progress(self, progress_timeout: float = 1.0): + async def _progress_with_timeout(self): """Protect worker from never progressing again. To ensure the worker progresses if no events are raised and the asyncio loop getting stuck we must ensure the worker is progressed every so often. This method ensures the worker is progressed independent of what the epoll file - descriptor does if longer than `progress_timeout` has elapsed since last check, - thus preventing a deadlock. - - Parameters - ---------- - progress_timeout: float - The timeout to sleep until calling checking again whether the worker should - be progressed. + descriptor does if longer than `self._progress_timeout` has elapsed since + last check, thus preventing a deadlock. """ while True: worker = self.worker if worker is None: return - if time.monotonic() > self.last_progress_time + progress_timeout: + if time.monotonic() > self.last_progress_time + self._progress_timeout: self.last_progress_time = time.monotonic() worker.progress() # Give other co-routines a chance to run. - await asyncio.sleep(progress_timeout) + await asyncio.sleep(self._progress_timeout) From 18e3cf08e5166e552efbe426ff96d51eb7f6598d Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 30 Sep 2024 14:12:54 -0700 Subject: [PATCH 14/22] Cancel progress tasks before closing of event loop --- .../_lib_async/continuous_ucx_progress.py | 41 ++++++------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py index ce7813f9..e59346c0 100644 --- a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py @@ -29,14 +29,19 @@ def __init__(self, worker, event_loop): self.event_loop = event_loop self.asyncio_task = None - def __del__(self): - # FIXME: This only works if the event loop is still running and awaits the - # cancelation. - # Running with blocking and polling modes may cause - # `Task was destroyed but it is pending!` errors at ucxx.reset(). - if self.event_loop is not None and self.event_loop.is_running(): - if self.asyncio_task is not None: - self.call_soon_threadsafe(self.asyncio_task.cancel()) + event_loop_close = self.event_loop.close + + def _event_loop_close(*args, **kwargs): + if not self.event_loop.is_closed() and self.asyncio_task is not None: + try: + self.asyncio_task.cancel() + self.event_loop.run_until_complete(self.asyncio_task) + except asyncio.exceptions.CancelledError: + pass + finally: + event_loop_close(*args, **kwargs) + + self.event_loop.close = _event_loop_close # Hash and equality is based on the event loop def __hash__(self): @@ -129,26 +134,6 @@ def __init__( self.last_progress_time = time.monotonic() - self._progress_timeout self.asyncio_task = event_loop.create_task(self._progress_with_timeout()) - def __del__(self): - """Cancel asynchronous blocking progress task. - - Cancel asynchronouns blocking progress task. - - .. warning:: - This only works if the event loop is still running. If the event loop has - been closed before this runs the following error will be printed by the - interpreter on the standard output: - - ``` - Task was destroyed but it is pending! - ``` - """ - if self.event_loop is not None and self.event_loop.is_running(): - if self.blocking_asyncio_task is not None: - self.call_soon_threadsafe(self.blocking_asyncio_task.cancel()) - - super().__del__() - def _fd_reader_callback(self): """Schedule new progress task upon worker event. From c5c2ceb0514f0bf652a4ffe48036befcba6bfc66 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 1 Oct 2024 04:31:20 -0700 Subject: [PATCH 15/22] Use `partial` to rewrite `event_loop.close` --- python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py index e59346c0..a177771d 100644 --- a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py @@ -6,6 +6,7 @@ import socket import time import weakref +from functools import partial from ucxx._lib.libucxx import UCXWorker @@ -29,9 +30,9 @@ def __init__(self, worker, event_loop): self.event_loop = event_loop self.asyncio_task = None - event_loop_close = self.event_loop.close + event_loop_close_original = self.event_loop.close - def _event_loop_close(*args, **kwargs): + def _event_loop_close(event_loop_close_original, *args, **kwargs): if not self.event_loop.is_closed() and self.asyncio_task is not None: try: self.asyncio_task.cancel() @@ -39,9 +40,9 @@ def _event_loop_close(*args, **kwargs): except asyncio.exceptions.CancelledError: pass finally: - event_loop_close(*args, **kwargs) + event_loop_close_original(*args, **kwargs) - self.event_loop.close = _event_loop_close + self.event_loop.close = partial(_event_loop_close, event_loop_close_original) # Hash and equality is based on the event loop def __hash__(self): From 279cb4c0b4d11fa9df6df29aaf4083e27e30018e Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 1 Oct 2024 04:32:07 -0700 Subject: [PATCH 16/22] Reset `self.asyncio_task` to `None` after cancellation --- python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py index a177771d..b694610a 100644 --- a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py @@ -40,6 +40,7 @@ def _event_loop_close(event_loop_close_original, *args, **kwargs): except asyncio.exceptions.CancelledError: pass finally: + self.asyncio_task = None event_loop_close_original(*args, **kwargs) self.event_loop.close = partial(_event_loop_close, event_loop_close_original) From c624898a2f50f4460a51be8e050691a5efd2ef67 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 1 Oct 2024 13:37:05 +0200 Subject: [PATCH 17/22] Fix comments' phrasing Co-authored-by: Lawrence Mitchell --- python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py index b694610a..fdd25ceb 100644 --- a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py @@ -144,7 +144,7 @@ def _fd_reader_callback(self): """ self.worker.progress() - # Notice, we can safely overwrite `self.dangling_arm_task` + # Notice, we can safely overwrite `self.blocking_asyncio_task` # since previous arm task is finished by now. assert self.blocking_asyncio_task is None or self.blocking_asyncio_task.done() self.blocking_asyncio_task = self.event_loop.create_task(self._arm_worker()) @@ -157,7 +157,7 @@ async def _arm_worker(self): """ # When arming the worker, the following must be true: # - No more progress in UCX (see doc of ucp_worker_arm()) - # - All asyncio tasks that isn't waiting on UCX must be executed + # - All asyncio tasks that aren't waiting on UCX must be executed # so that the asyncio's next state is epoll wait. # See while True: From 5769f312516c3c3fe2695a001f5a9eb8da1de668 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 1 Oct 2024 10:14:13 -0700 Subject: [PATCH 18/22] Cancel `_arm_worker` instead of `sock_recv` --- .../ucxx/_lib_async/continuous_ucx_progress.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py index fdd25ceb..d8124134 100644 --- a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py @@ -166,9 +166,7 @@ async def _arm_worker(self): # This IO task returns when all non-IO tasks are finished. # Notice, we do NOT hold a reference to `worker` while waiting. - await asyncio.wait_for( - self.event_loop.sock_recv(self.rsock, 1), self._progress_timeout - ) + await self.event_loop.sock_recv(self.rsock, 1) if self.worker.arm(): # At this point we know that asyncio's next state is @@ -190,6 +188,18 @@ async def _progress_with_timeout(self): return if time.monotonic() > self.last_progress_time + self._progress_timeout: self.last_progress_time = time.monotonic() + + # Cancel `_arm_worker` task if available. `loop.sock_recv` does not + # seem to respect timeout with `asyncio.wait_for`, thus we cancel + # it here instead. It will get recreated after a new event on + # `worker.epoll_file_descriptor`. + if self.blocking_asyncio_task is not None: + self.blocking_asyncio_task.cancel() + try: + await self.blocking_asyncio_task + except asyncio.exceptions.CancelledError: + pass + worker.progress() # Give other co-routines a chance to run. await asyncio.sleep(self._progress_timeout) From dbb63860046c41caece892ea5a12e6d4b089ee53 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 18 Oct 2024 08:12:17 -0700 Subject: [PATCH 19/22] Revert "Resolve thread-safety issues in distributed-ucxx (#295)" This reverts commit a7d36f5a714f2993ef041294d8a0d4072c7212fa. --- .../distributed-ucxx/distributed_ucxx/ucxx.py | 84 +++++++++++-------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/python/distributed-ucxx/distributed_ucxx/ucxx.py b/python/distributed-ucxx/distributed_ucxx/ucxx.py index 881dc160..1f5fc1df 100644 --- a/python/distributed-ucxx/distributed_ucxx/ucxx.py +++ b/python/distributed-ucxx/distributed_ucxx/ucxx.py @@ -14,7 +14,6 @@ import struct import weakref from collections.abc import Awaitable, Callable, Collection -from threading import Lock from typing import TYPE_CHECKING, Any from unittest.mock import patch @@ -50,13 +49,6 @@ pre_existing_cuda_context = False cuda_context_created = False multi_buffer = None -# Lock protecting access to _resources dict -_resources_lock = Lock() -# Mapping from UCXX context handles to sets of registered dask resource IDs -# Used to track when there are no more users of the context, at which point -# its progress task and notification thread can be shut down. -# See _register_dask_resource and _deregister_dask_resource. -_resources = dict() _warning_suffix = ( @@ -103,13 +95,13 @@ def make_register(): count = itertools.count() def register() -> int: - """Register a Dask resource with the resource tracker. + """Register a Dask resource with the UCXX context. - Generate a unique ID for the resource and register it with the resource - tracker. The resource ID is later used to deregister the resource from - the tracker calling `_deregister_dask_resource(resource_id)`, which - stops the notifier thread and progress tasks when no more UCXX resources - are alive. + Register a Dask resource with the UCXX context and keep track of it with the + use of a unique ID for the resource. The resource ID is later used to + deregister the resource from the UCXX context calling + `_deregister_dask_resource(resource_id)`, which stops the notifier thread + and progress tasks when no more UCXX resources are alive. Returns ------- @@ -118,13 +110,9 @@ def register() -> int: `_deregister_dask_resource` during stop/destruction of the resource. """ ctx = ucxx.core._get_ctx() - handle = ctx.context.handle - with _resources_lock: - if handle not in _resources: - _resources[handle] = set() - + with ctx._dask_resources_lock: resource_id = next(count) - _resources[handle].add(resource_id) + ctx._dask_resources.add(resource_id) ctx.start_notifier_thread() ctx.continuous_ucx_progress() return resource_id @@ -138,11 +126,11 @@ def register() -> int: def _deregister_dask_resource(resource_id): - """Deregister a Dask resource from the resource tracker. + """Deregister a Dask resource with the UCXX context. - Deregister a Dask resource from the resource tracker with given ID, and if - no resources remain after deregistration, stop the notifier thread and - progress tasks. + Deregister a Dask resource from the UCXX context with given ID, and if no + resources remain after deregistration, stop the notifier thread and progress + tasks. Parameters ---------- @@ -156,22 +144,40 @@ def _deregister_dask_resource(resource_id): return ctx = ucxx.core._get_ctx() - handle = ctx.context.handle # Check if the attribute exists first, in tests the UCXX context may have # been reset before some resources are deregistered. - with _resources_lock: - try: - _resources[handle].remove(resource_id) - except KeyError: - pass + if hasattr(ctx, "_dask_resources_lock"): + with ctx._dask_resources_lock: + try: + ctx._dask_resources.remove(resource_id) + except KeyError: + pass + + # Stop notifier thread and progress tasks if no Dask resources using + # UCXX communicators are running anymore. + if len(ctx._dask_resources) == 0: + ctx.stop_notifier_thread() + ctx.progress_tasks.clear() - # Stop notifier thread and progress tasks if no Dask resources using - # UCXX communicators are running anymore. - if handle in _resources and len(_resources[handle]) == 0: - ctx.stop_notifier_thread() - ctx.progress_tasks.clear() - del _resources[handle] + +def _allocate_dask_resources_tracker() -> None: + """Allocate Dask resources tracker. + + Allocate a Dask resources tracker in the UCXX context. This is useful to + track Distributed communicators so that progress and notifier threads can + be cleanly stopped when no UCXX communicators are alive anymore. + """ + ctx = ucxx.core._get_ctx() + if not hasattr(ctx, "_dask_resources"): + # TODO: Move the `Lock` to a file/module-level variable for true + # lock-safety. The approach implemented below could cause race + # conditions if this function is called simultaneously by multiple + # threads. + from threading import Lock + + ctx._dask_resources = set() + ctx._dask_resources_lock = Lock() def init_once(): @@ -181,6 +187,11 @@ def init_once(): global multi_buffer if ucxx is not None: + # Ensure reallocation of Dask resources tracker if the UCXX context was + # reset since the previous `init_once()` call. This may happen in tests, + # where the `ucxx_loop` fixture will reset the context after each test. + _allocate_dask_resources_tracker() + return # remove/process dask.ucx flags for valid ucx options @@ -243,6 +254,7 @@ def init_once(): # environment, so the user's external environment can safely # override things here. ucxx.init(options=ucx_config, env_takes_precedence=True) + _allocate_dask_resources_tracker() pool_size_str = dask.config.get("distributed.rmm.pool-size") From 8e4bc383c8b1cfbc6c103e3d51b51d17311e7a0e Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 22 Oct 2024 08:40:02 -0700 Subject: [PATCH 20/22] Adjust properties and blocking progress mode initialization --- .../ucxx/ucxx/_lib_async/application_context.py | 16 +++++++--------- .../ucxx/_lib_async/continuous_ucx_progress.py | 4 +++- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/ucxx/ucxx/_lib_async/application_context.py b/python/ucxx/ucxx/_lib_async/application_context.py index 0b08ad93..4a488309 100644 --- a/python/ucxx/ucxx/_lib_async/application_context.py +++ b/python/ucxx/ucxx/_lib_async/application_context.py @@ -56,13 +56,10 @@ def __init__( self.context = ucx_api.UCXContext(config_dict) self.worker = ucx_api.UCXWorker( self.context, - enable_delayed_submission=self._enable_delayed_submission, - enable_python_future=self._enable_python_future, + enable_delayed_submission=self.enable_delayed_submission, + enable_python_future=self.enable_python_future, ) - if self.progress_mode == "blocking": - self.worker.init_blocking_progress_mode() - self.start_notifier_thread() weakref.finalize(self, self.progress_tasks.clear) @@ -90,7 +87,7 @@ def progress_mode(self, progress_mode): progress_mode == m for m in valid_progress_modes ): raise ValueError( - f"Unknown progress mode {progress_mode}, valid modes are: " + f"Unknown progress mode '{progress_mode}', valid modes are: " "'blocking', 'polling', 'thread' or 'thread-polling'" ) @@ -124,8 +121,9 @@ def enable_delayed_submission(self, enable_delayed_submission): and explicit_enable_delayed_submission ): raise ValueError( - f"Delayed submission requested, but {self.progress_mode} does not " - "support it, 'thread' or 'thread-polling' progress mode required." + f"Delayed submission requested, but '{self.progress_mode}' does " + "not support it, 'thread' or 'thread-polling' progress mode " + "required." ) self._enable_delayed_submission = explicit_enable_delayed_submission @@ -156,7 +154,7 @@ def enable_python_future(self, enable_python_future): and explicit_enable_python_future ): logger.warning( - f"Notifier thread requested, but {self.progress_mode} does not " + f"Notifier thread requested, but '{self.progress_mode}' does not " "support it, using Python wait_yield()." ) explicit_enable_python_future = False diff --git a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py index d8124134..c959f2f4 100644 --- a/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py +++ b/python/ucxx/ucxx/_lib_async/continuous_ucx_progress.py @@ -114,6 +114,8 @@ def __init__( super().__init__(worker, event_loop) self._progress_timeout = progress_timeout + self.worker.init_blocking_progress_mode() + # Creating a job that is ready straight away but with low priority. # Calling `await self.event_loop.sock_recv(self.rsock, 1)` will # return when all non-IO tasks are finished. @@ -123,7 +125,7 @@ def __init__( wsock.setblocking(0) wsock.close() - epoll_fd = worker.epoll_file_descriptor + epoll_fd = self.worker.epoll_file_descriptor # Bind an asyncio reader to a UCX epoll file descriptor event_loop.add_reader(epoll_fd, self._fd_reader_callback) From 9e2d017221e4a480b905c205d7c716092a3a68f6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 22 Oct 2024 09:05:53 -0700 Subject: [PATCH 21/22] Fix unreachable test --- .../ucxx/_lib_async/tests/test_from_worker_address_error.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ucxx/ucxx/_lib_async/tests/test_from_worker_address_error.py b/python/ucxx/ucxx/_lib_async/tests/test_from_worker_address_error.py index 35d580f8..5aebedf8 100644 --- a/python/ucxx/ucxx/_lib_async/tests/test_from_worker_address_error.py +++ b/python/ucxx/ucxx/_lib_async/tests/test_from_worker_address_error.py @@ -62,6 +62,11 @@ async def run(): # "Endpoint timeout" after UCX_UD_TIMEOUT seconds have passed. # We need to keep progressing ucxx until timeout is raised. ep = await ucxx.create_endpoint_from_worker_address(remote_address) + while ep.alive: + await asyncio.sleep(0) + if not ucxx.core._get_ctx().progress_mode.startswith("thread"): + ucxx.progress() + ep._ep.raise_on_error() else: # Create endpoint to remote worker, and: # From caf67f9fa936dd74f2f5a8aa2a592589ee7fc933 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Tue, 22 Oct 2024 12:02:23 -0700 Subject: [PATCH 22/22] Ensure writer is closed to prevent Distributed check failure --- python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py b/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py index 1e19993b..409f957a 100644 --- a/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py +++ b/python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py @@ -411,7 +411,10 @@ async def test_comm_closed_on_read_error(): with pytest.raises((asyncio.TimeoutError, CommClosedError)): await wait_for(reader.read(), 0.01) + await writer.close() + assert reader.closed() + assert writer.closed() @pytest.mark.flaky(