Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-128041: Add a terminate_workers method to ProcessPoolExecutor #128043

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
12 changes: 12 additions & 0 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,18 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
require the *fork* start method for :class:`ProcessPoolExecutor` you must
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.

.. method:: terminate_workers(signal=signal.SIGTERM)

Attempt to terminate all living worker processes immediately by sending
each of them the given signal. If the signal is not specified, the default
signal :data:`signal.SIGTERM` is used.

After calling this method the caller should no longer submit tasks to the
executor. It is also recommended to still call :meth:`Executor.shutdown`
to ensure that all other resources associated with the executor are freed.

.. versionadded:: next

.. _processpoolexecutor-example:

ProcessPoolExecutor Example
Expand Down
4 changes: 4 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ concurrent.futures
supplying a *mp_context* to :class:`concurrent.futures.ProcessPoolExecutor`.
(Contributed by Gregory P. Smith in :gh:`84559`.)

* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
csm10495 marked this conversation as resolved.
Show resolved Hide resolved
a way to terminate all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`128043`.)

ctypes
------

Expand Down
31 changes: 31 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import weakref
from functools import partial
import itertools
import signal
import sys
from traceback import format_exception

Expand Down Expand Up @@ -855,3 +856,33 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._executor_manager_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__

def terminate_workers(self, signal=signal.SIGTERM):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, can we perhaps name it kill? (namely, you kill the pool's workers with the given signal) or is kill already taken (possibly for something else in the future)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I thought this was explicit. I'm open for either or.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so after some research:

  • Process.terminate -- mimics SIGTERM
  • Process.kill -- sends SIGKILL to the process.

Many other interfaces use terminate for sending SIGTERM and kill for sending SIGKILL. Now, we also have multiprocessing.Pool.terminate, so maybe we could mimic it by just naming it terminate(signal=...)? It would be slightly inconsistent with the others terminate methods since this one would be able to send a SIGKILL as well.

So it's up to you (or up to Gregory)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could definitely be down for terminate to match multiprocessing.Pool. (Though i'd still like to be able to change the signal sent so it would be a slight api difference). I'll wait to see what Gregory thinks to either change it or leave as is.

"""Attempts to terminate the executor's workers using the given signal.
Iterates through all of the current processes and sends the given signal if
the process is still alive.

After terminating workers, the pool will be in a broken state
and no longer usable (for instance, new tasks should not be
submitted).

Args:
signal: The signal to send to each worker process. Defaults to
signal.SIGTERM.
"""
if not self._processes:
return

for pid, proc in self._processes.items():
try:
if not proc.is_alive():
continue
except ValueError:
# The process is already exited/closed out.
continue

try:
os.kill(pid, signal)
except ProcessLookupError:
# The process just ended before our signal
continue
73 changes: 73 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import multiprocessing
import os
import queue
import signal
import sys
import threading
import time
import unittest
import unittest.mock
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool

Expand All @@ -22,6 +26,12 @@ def __init__(self, mgr):
def __del__(self):
self.event.set()

def _put_sleep_put(queue):
""" Used as part of test_process_pool_executor_terminate_workers """
queue.put('started')
time.sleep(2)
queue.put('finished')


class ProcessPoolExecutorTest(ExecutorTest):

Expand Down Expand Up @@ -218,6 +228,69 @@ def mock_start_new_thread(func, *args, **kwargs):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()

def test_process_pool_executor_terminate_workers(self):
manager = multiprocessing.Manager()
q = manager.Queue()

with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(_put_sleep_put, q)

# We should get started, but not finished since we'll terminate the workers just after
self.assertEqual(q.get(timeout=1), 'started')

executor.terminate_workers()

self.assertRaises(queue.Empty, q.get, timeout=1)


def test_process_pool_executor_terminate_workers_dead_workers(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(os._exit, 1)
self.assertRaises(BrokenProcessPool, future.result)

# Patching in here instead of at the function level since we only want
# to patch it for this function call, not other parts of the flow.
with unittest.mock.patch('concurrent.futures.process.os.kill') as mock_kill:
executor.terminate_workers()

mock_kill.assert_not_called()

@unittest.mock.patch('concurrent.futures.process.os.kill')
def test_process_pool_executor_terminate_workers_not_started_yet(self, mock_kill):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
# The worker has not been started yet, terminate_workers should basically no-op
executor.terminate_workers()
csm10495 marked this conversation as resolved.
Show resolved Hide resolved

mock_kill.assert_not_called()

def test_process_pool_executor_terminate_workers_stops_pool(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(time.sleep, 0).result()

executor.terminate_workers()

future = executor.submit(time.sleep, 0)
self.assertRaises(BrokenProcessPool, future.result)

@unittest.mock.patch('concurrent.futures.process.os.kill')
def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 0)
future.result()

executor.terminate_workers(signal.SIGABRT)

worker_process = list(executor._processes.values())[0]
mock_kill.assert_called_once_with(worker_process.pid, signal.SIGABRT)

def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 0)
future.result()

# 'potatoes' isn't a valid signal, so os.kill will raise a TypeError
self.assertRaises(TypeError, executor.terminate_workers, 'potatoes')


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
a way to terminate all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`128043`.)
Loading