# MIT License
#
# Copyright 2021 David Ding
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Edifice run_subprocess_with_callback 2025 by James D. Brock
#
# This run_subprocess_with_callback module file depends only on the Python
# standard library so it can copied and pasted into any project without
# modification.
from __future__ import annotations
import asyncio
import dataclasses
import multiprocessing
import multiprocessing.process
import multiprocessing.queues
import queue
import traceback
import typing
if typing.TYPE_CHECKING:
from multiprocessing.context import SpawnContext
_T_subprocess = typing.TypeVar("_T_subprocess")
_P_callback = typing.ParamSpec("_P_callback")
@dataclasses.dataclass
class _EndProcess:
"""
The result returned by the subprocess
"""
result: typing.Any
@dataclasses.dataclass
class _ExceptionWrapper:
"""
Wrap an exception raised in the subprocess
"""
ex: BaseException
ex_string: list[str]
def _run_subprocess(
subprocess: typing.Callable[[typing.Callable[_P_callback, None]], typing.Awaitable[_T_subprocess]],
callback_send: multiprocessing.queues.Queue,
) -> None:
subloop = asyncio.new_event_loop()
async def work() -> None:
def _run_callback(*args: _P_callback.args, **kwargs: _P_callback.kwargs) -> None:
callback_send.put((args, kwargs))
try:
r = await subprocess(_run_callback)
# It would be nice if the subprocess could be cancelled
# by .cancel() so it it could have a chance to handle CancelledError
# and clean up before it gets .terminate()ed.
#
# But we have to ask: what if the subprocess is blocked?
# Making blocking I/O calls in a subprocess should be supported
# and encouranged. That's the whole point of using a subprocess.
#
# So I think the best and simplest thing to do is to terminate
# on cancellation.
except BaseException as e: # noqa: BLE001
callback_send.put(_ExceptionWrapper(e, traceback.format_exception(e)))
else:
callback_send.put(_EndProcess(r))
return subloop.run_until_complete(work())
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue.join_thread
# > “By default if a process is not the creator of the queue then on
# > exit it will attempt to join the queue's background thread.”
#
# https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues
# > “if a child process has put items on a queue (and it has not used
# > JoinableQueue.cancel_join_thread), then that process will not
# > terminate until all buffered items have been flushed to the pipe.”
[docs]
async def run_subprocess_with_callback(
subprocess: typing.Callable[[typing.Callable[_P_callback, None]], typing.Awaitable[_T_subprocess]],
callback: typing.Callable[_P_callback, None],
daemon: bool | None = None,
) -> _T_subprocess:
"""
Run an
:code:`async def subprocess` function in a
`Process <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process>`_
and return the result.
Args:
subprocess:
The async function to run in a
`Process <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process>`_.
The :code:`subprocess` function will run in a sub-process in a new event loop.
This :code:`subprocess` function takes a single argument: a function with the same type
as the :code:`callback` function.
The :code:`subprocess` function must be picklable.
callback:
The :code:`callback` function to pass to the :code:`subprocess` when it starts.
This function will run in the main process event loop.
All of the arguments to the :code:`callback` function must be picklable.
daemon:
Optional argument which will be passed to the Process
`daemon <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.daemon>`_
argument.
The advantage of :func:`run_subprocess_with_callback` over
`run_in_executor <https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor>`_
`ProcessPoolExecutor <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor>`_
is that :func:`run_subprocess_with_callback` behaves
well and cleans up properly in the event of
`cancellation <https://docs.python.org/3/library/asyncio-task.html#task-cancellation>`_,
exceptions, and crashes.
This function is useful for a long-running parallel worker
subprocess for which we want to report progress back to the main GUI event loop.
Like *PyTorch* stuff.
The :code:`subprocess` will be started when :code:`await` :func:`run_subprocess_with_callback`
is entered, and the :code:`subprocess` is guaranteed to be terminated when
:code:`await` :func:`run_subprocess_with_callback` completes.
While the :code:`subprocess` is running, it may call the supplied :code:`callback` function.
The :code:`callback` function will run in the main event loop of the calling process.
The :code:`subprocess` will be started with the
`"spawn" start method <https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods>`_,
so it will not inherit any file handles from the calling process.
.. code-block:: python
:caption: Example
async def my_subprocess(callback: Callable[[int], None]) -> str:
# This function will run in a new Process in a new event loop.
callback(1)
await asyncio.sleep(1)
callback(2)
await asyncio.sleep(1)
callback(3)
return "done"
def my_callback(x:int) -> None:
# This function will run in the main process event loop.
print(f"callback {x}")
async def main() -> None:
y = await run_subprocess_with_callback(my_subprocess, my_callback)
# If this main() function is cancelled while awaiting the
# subprocess then the subprocess will be terminated.
print(f"my_subprocess returned {y}")
Cancellation, Exceptions, Crashes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If this :code:`async` :func:`run_subprocess_with_callback` function is
`cancelled <https://docs.python.org/3/library/asyncio-task.html#task-cancellation>`_,
then the :code:`subprocess` will be terminated by calling
`Process.terminate() <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate>`_.
Termination of the :code:`subprocess` will occur even if
the :code:`subprocess` is blocked. Note that
`CancelledError <https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError>`_
will not be raised in the :code:`subprocess`,
instead the :code:`subprocess` will be terminated immediately. If you
want to perform sudden cleanup and halt of the :code:`subprocess` then
send it a message as in the below `Example of Queue messaging.`
Exceptions raised in the :code:`subprocess` will be re-raised from :func:`run_subprocess_with_callback`,
including
`CancelledError <https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError>`_.
Because the Exception must be pickled back to the main process, it will
lose its `traceback <https://docs.python.org/3/reference/datamodel.html#traceback-objects>`_.
In Python ≥3.11, the traceback string from the :code:`subprocess` stack will be added
to the Exception `__notes__ <https://docs.python.org/3/library/exceptions.html#BaseException.__notes__>`_.
Exceptions raised in the :code:`callback` will be suppressed.
If the :code:`subprocess` exits abnormally without returning a value then a
`ProcessError <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.ProcessError>`_
will be raised from :func:`run_subprocess_with_callback`.
Pickling the subprocess
^^^^^^^^^^^^^^^^^^^^^^^
.. note::
Because “only picklable objects can be executed” by a
`Process <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process>`_.
we cannot pass a local function as the :code:`subprocess`. The best
workaround is to define at the module top-level a :code:`subprocess`
function which takes all its parameters as arguments, and then use
`functools.partial <https://docs.python.org/3/library/functools.html#functools.partial>`_
to bind local values to the :code:`subprocess` parameters.
The :code:`callback` does not have this problem; we can pass a local
function as the :code:`callback`.
Messaging to the subprocess
^^^^^^^^^^^^^^^^^^^^^^^^^^^
The :func:`run_subprocess_with_callback` function provides a :code:`callback`
function for messaging back up to the main process, but it does not provide a
built-in way to message down to the :code:`subprocess`.
To message down to the :code:`subprocess` we can create
and pass a messaging object to the :code:`subprocess`, for example a
`multiprocessing.Queue <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue>`_.
Because the :code:`subprocess` is started in the
`"spawn" context <https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods>`_,
we must create the :code:`Queue` in the :code:`"spawn"` context.
.. code-block:: python
:caption: Example of Queue messaging from the main process to the subprocess
async def my_subprocess(
# This function will run in a new Process in a new event loop.
msg_queue: multiprocessing.queues.Queue[str],
callback: typing.Callable[[int], None],
) -> str:
while (msg := msg_queue.get()) != "finish":
callback(len(msg))
return "done"
async def main() -> None:
msg_queue: multiprocessing.queues.Queue[str] = multiprocessing.get_context("spawn").Queue()
def local_callback(x:int) -> None:
# This function will run in the main process event loop.
print(f"callback {x}")
async def send_messages() -> None:
msg_queue.put("one")
msg_queue.put("finish")
# In this example we use gather() to run these 2 functions
# concurrently (“at the same time”).
# 1. run_subprocess_with_callback()
# 2. send_messages()
y, _ = await asyncio.gather(
run_subprocess_with_callback(
functools.partial(my_subprocess, msg_queue),
local_callback,
),
send_messages())
)
print(f"my_subprocess returned {y}")
.. note::
To get proper type hinting on the :code:`Queue`:
.. code-block:: python
from __future__ import annotations
"""
# multiprocessing.Queue can only be used with spawn startmethod
# if we get it from a spawn context.
# https://stackoverflow.com/questions/34847203/queue-objects-should-only-be-shared-between-processes-through-inheritance
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
#
# “You generally can't pass a multiprocessing.Queue as argument after a
# Process has already started, you need to pass it already to the constructor
# of the Process object.”
# https://stackoverflow.com/questions/63419229/passing-a-queue-with-concurrent-futures-regardless-of-executor-type#comment112144271_63419229
spawncontext: SpawnContext = multiprocessing.get_context("spawn")
callback_send = spawncontext.Queue()
proc = spawncontext.Process(
group=None,
target=_run_subprocess,
args=(subprocess, callback_send),
daemon=daemon,
)
# We alternate waiting on the queue and waiting on the event loop.
# There is no good way in Python to wait on both at the same time.
# Mostly we wait on the event loop, and poll the queue.
# Because we want to be able to cancel the task.
# Because then the event loop can run.
# Unfortunately this means
# 1. We raise queue.Empty errors all the time internally.
# 2. There is an extra <100ms delay calling the callback.
# 3. There is some extra CPU busy-waiting while the subprocess is running.
proc.start()
while True:
try:
while True:
# Pull messages out of the queue as fast as we can until
# the queue is empty.
# Then wait on the event loop for 100ms and check the queue
# again.
message = callback_send.get_nowait()
match message:
case _EndProcess(r):
# subprocess returned
proc.join() # We know that process end is imminent.
return r
case _ExceptionWrapper(ex, ex_string):
# subprocess raised an exception
if hasattr(ex, "add_note"):
# https://docs.python.org/3/library/exceptions.html#BaseException.add_note
ex.add_note("".join(ex_string)) # type: ignore # noqa: PGH003
proc.join() # We know that process end is imminent.
raise ex # including CancelledError
case (args, kwargs):
# subprocess called callback
try:
callback(*args, **kwargs) # type: ignore # noqa: PGH003
except: # noqa: S110, E722
# We suppress exceptions in the callback.
# TODO I don't like suppressing exceptions but
# but it's tricky to decide what to do with them
# here. If we raise them then we must also
# terminate the subprocess.
pass
case _:
raise RuntimeError("unreachable")
except queue.Empty:
pass
# Can the callback_send queue raise any other kind of exception?
if not proc.is_alive() and callback_send.empty():
# Is that extra empty() check necessary and sufficient to avoid a
# race condition when the process returns normally and exits?
# Yes, because
# https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues
# > “if a child process has put items on a queue (and it has not used
# > JoinableQueue.cancel_join_thread), then that process will not
# > terminate until all buffered items have been flushed to the pipe.”
#
# This is a lot of extra system calls, too bad we have to poll like this.
raise multiprocessing.ProcessError(f"subprocess exited with code {proc.exitcode}")
try:
await asyncio.sleep(0.1) # CancelledError can be raised here
except asyncio.CancelledError:
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate
# > “Warning: If this method is used when the associated process is
# > using a pipe or queue then the pipe or queue is liable to become
# corrupted and may become unusable by other process.”
#
# Really?
# https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues
# > “Warning If a process is killed using Process.terminate() while
# > it is trying to use a Queue, then the data in the queue is
# > likely to become corrupted. This may cause any other process to
# get an exception when it tries to use the queue later on.”
#
# What kind of exception?
# Windows
# https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-terminateprocess#remarks
# > “The terminated process cannot exit until all pending I/O has
# > been completed or canceled.”
proc.terminate()
# https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues
# > ”This means that if you try joining that process you may get a
# > deadlock unless you are sure that all items which have been put
# > on the queue have been consumed.”
try:
while True:
callback_send.get_nowait()
except queue.Empty:
pass
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.join
proc.join()
# proc.close()?
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.close
# https://stackoverflow.com/questions/58866837/python3-multiprocessing-terminate-vs-kill-vs-close/58866932#58866932
# > “close allows you to ensure the resources are definitely cleaned at a
# > specific point in time”
# We don't need to call close() because we are not worried about
# the OS running out of file descriptors.
raise