Source code for edifice.run_subprocess_with_callback
from __future__ import annotations
import asyncio
import dataclasses
import typing
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from multiprocessing.context import SpawnContext
if typing.TYPE_CHECKING:
import queue
_T_subprocess = typing.TypeVar("_T_subprocess")
_P_callback = typing.ParamSpec("_P_callback")
class _EndProcess:
pass
@dataclasses.dataclass
class _ExceptionWrapper:
ex: BaseException
def _run_subprocess(
subprocess: typing.Callable[[typing.Callable[_P_callback, None]], typing.Awaitable[_T_subprocess]],
qup: queue.Queue, # type of Queue?
) -> _T_subprocess | _ExceptionWrapper:
subloop = asyncio.new_event_loop()
async def work() -> _T_subprocess | _ExceptionWrapper:
try:
def _run_callback(*args: _P_callback.args, **kwargs: _P_callback.kwargs) -> None:
qup.put((args, kwargs))
r = await subprocess(_run_callback)
# It would be nice if the subprocess could be cancelled
# by .cancel() so it it could have a chance to handle CancelledError
# and clean up before it gets .terminate()ed.
#
# But we have to ask: what if the subprocess is blocked?
# Making blocking I/O calls in a subprocess should be supported
# and encouranged. That's the whole point of using a subprocess.
#
# So I think the best and simplest thing to do is to terminate
# on cancellation.
except BaseException as e: # noqa: BLE001
return _ExceptionWrapper(e)
else:
return r
finally:
qup.put(_EndProcess())
return subloop.run_until_complete(work())
[docs]
async def run_subprocess_with_callback(
subprocess: typing.Callable[[typing.Callable[_P_callback, None]], typing.Awaitable[_T_subprocess]],
callback: typing.Callable[_P_callback, None],
) -> _T_subprocess:
"""
Run an
async :code:`subprocess` in a
`ProcessPoolExecutor <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor>`_
and return the result.
The advantage of :func:`run_subprocess_with_callback` is that it behaves
well and cleans up properly in the event of exceptions and
`cancellation <https://docs.python.org/3/library/asyncio-task.html#task-cancellation>`_.
This function is useful for a long-running parallel worker
subprocess for which we want to report progress back to the main GUI event loop.
Like *pytorch* stuff.
Args:
subprocess:
The async function to run in a
`ProcessPoolExecutor <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor>`_.
The :code:`subprocess` function will run in a sub-process in a new event loop.
This :code:`subprocess` function takes a single argument: a function with the same type
as the :code:`callback` function.
The :code:`subprocess` function must be picklable.
callback:
The :code:`callback` function to pass to the :code:`subprocess` when it starts.
This function will run in the main process event loop.
All of the arguments to the :code:`callback` function must be picklable.
The :code:`subprocess` will be started when :func:`run_subprocess_with_callback`
is entered, and the :code:`subprocess` is guaranteed to be terminated when
:func:`run_subprocess_with_callback` completes.
The :code:`subprocess` will be started with the
`'spawn' start method <https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods>`_,
so it will not inherit any file handles from the calling process.
While the :code:`subprocess` is running, it may call the supplied :code:`callback` function.
The :code:`callback` function will run in the main event loop of the calling process.
If this async :func:`run_subprocess_with_callback` function is
`cancelled <https://docs.python.org/3/library/asyncio-task.html#task-cancellation>`_,
the :code:`subprocess` will be terminated by calling
`Process.terminate() <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate>`_.
Termination of the :code:`subprocess` will occur even if
the :code:`subprocess` is blocked. Note that
:code:`CancelledError` will not be raised in the :code:`subprocess`,
instead the :code:`subprocess` will be terminated immediately. If you
want to perform sudden cleanup and halt of the :code:`subprocess` then
send it a message as in the below “Example of Queue messaging.”
Exceptions raised in the :code:`subprocess` will be re-raised from :func:`run_subprocess_with_callback`.
Exceptions raised in the :code:`callback` will be suppressed.
.. code-block:: python
:caption: Example
async def my_subprocess(callback: Callable[[int], None]) -> str:
# This function will run in a subprocess in a new event loop.
callback(1)
await asyncio.sleep(1)
callback(2)
await asyncio.sleep(1)
callback(3)
return "done"
def my_callback(x:int) -> None:
# This function will run in the main process event loop.
print(f"callback {x}")
async def main() -> None:
y = await run_subprocess_with_callback(my_subprocess, my_callback)
# If this main() function is cancelled while awaiting the
# subprocess then the subprocess will be terminated.
print(f"my_subprocess returned {y}")
.. note::
Because “only picklable objects can be executed” by a
`ProcessPoolExecutor <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor>`_,
we cannot pass a local function as the :code:`subprocess`. The best
workaround is to define at the module top-level a :code:`subprocess`
function which takes all its parameters as arguments, and then use
`functools.partial <https://docs.python.org/3/library/functools.html#functools.partial>`_
to bind local values to the :code:`subprocess` parameters.
The :code:`callback` does not have this problem; we can pass a local
function as the :code:`callback`.
The :func:`run_subprocess_with_callback` function provides a :code:`callback`
function for messaging back up to the main process, but it does not provide a
built-in way to message down to the subprocess. To accomplish this we can create
and pass a messaging object to the :code:`subprocess`, for example a
`multiprocessing.managers.SyncManager.Queue <https://docs.python.org/3/library/multiprocessing.html#multiprocessing.managers.SyncManager.Queue>`_.
.. code-block:: python
:caption: Example of Queue messaging from the main process to the subprocess
async def my_subprocess(
# This function will run in a subprocess in a new event loop.
queue: queue.Queue[str],
callback: typing.Callable[[int], None],
) -> str:
while (msg := queue.get()) != "finish":
callback(len(msg))
return "done"
async def main() -> None:
with multiprocessing.Manager() as manager:
msg_queue: queue.Queue[str] = manager.Queue()
def local_callback(x:int) -> None:
# This function will run in the main process event loop.
print(f"callback {x}")
async def send_messages() -> None:
msg_queue.put("one")
msg_queue.put("finish")
y, _ = await asyncio.gather(
run_subprocess_with_callback(
functools.partial(my_subprocess, msg_queue),
local_callback,
),
send_messages())
)
print(f"my_subprocess returned {y}")
"""
with (
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Manager
# “corresponds to a spawned child process”
Manager() as manager,
# We must have 2 parallel workers. Therefore 2 ProcessPoolExecutors.
ProcessPoolExecutor(max_workers=1, mp_context=SpawnContext()) as executor_sub,
ProcessPoolExecutor(max_workers=1, mp_context=SpawnContext()) as executor_queue,
):
try:
qup: queue.Queue = manager.Queue()
loop = asyncio.get_running_loop()
subtask = loop.run_in_executor(executor_sub, _run_subprocess, subprocess, qup)
async def get_messages() -> None:
while type(i := (await loop.run_in_executor(executor_queue, qup.get))) is not _EndProcess:
try:
callback(*(i[0]), **(i[1])) # type: ignore # noqa: PGH003
except: # noqa: PERF203, S110, E722
# We have to suppress callback exceptions because
# asyncio.gather will not cancel the subtask if the
# callback raises an exception.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
# We cannot use TaskGroup because we require Python 3.10.
pass
retval, _ = await asyncio.gather(subtask, get_messages())
match retval:
case _ExceptionWrapper(ex):
raise ex # noqa: TRY301
case _:
return retval
return retval # noqa: TRY300
except BaseException: # including asyncio.CancelledError
# We must terminate the process pool workers because cancelling the
# loop.run_in_executor() call will not terminate the workers.
for process in executor_sub._processes.values():
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate
if process.is_alive():
process.terminate()
process.join()
for process in executor_queue._processes.values():
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate
if process.is_alive():
process.terminate()
process.join()
raise