run_subprocess_with_callback#
- async edifice.run_subprocess_with_callback(subprocess, callback, daemon=None)[source]#
Run a
subprocessfunction in a Process and return the result.- Parameters:
subprocess (
Callable[[Callable[[ParamSpec(_P_callback)],None]],TypeVar(_T_subprocess)]) –The function to run in a Process. This
subprocessfunction takes a single argument: a function with the same type as thecallbackfunction. Thesubprocessfunction must be picklable.callback (
Callable[[ParamSpec(_P_callback)],None]) – Thecallbackfunction to pass to thesubprocesswhen it starts. Thesubprocessmay call thecallbackfunction at any time. Thecallbackfunction will run in the main process event loop. All of the arguments to thecallbackfunction must be picklable.daemon (
Optional[bool]) – Optional argument which will be passed to the Process daemon argument.
- Return type:
TypeVar(_T_subprocess)
The advantage of
run_subprocess_with_callback()over run_in_executor ProcessPoolExecutor is thatrun_subprocess_with_callback()behaves well and cleans up properly in the event of cancellation, exceptions, and crashes. ProcessPoolExecutor cannot be cancelled.. This function is useful for a CPU-bound parallel worker subprocess for which we want to report progress back to the main GUI event loop. Like PyTorch stuff.The
subprocesswill be started whenawaitrun_subprocess_with_callback()is entered, and thesubprocessis guaranteed to be terminated whenawaitrun_subprocess_with_callback()completes.While the
subprocessis running, it may call the suppliedcallbackfunction. Thecallbackfunction will run in the main event loop of the calling process.The
subprocesswill be started with the “spawn” start method, so it will not inherit any file handles from the calling process.Example#def my_subprocess(callback: Callable[[int], None]) -> str: # This function will run in a new Process. callback(1) time.sleep(1) callback(2) time.sleep(1) callback(3) return "done" def my_callback(x:int) -> None: # This function will run in the main process event loop. print(f"callback {x}") async def main() -> None: y = await run_subprocess_with_callback(my_subprocess, my_callback) # If this main() function is cancelled while awaiting # run_subprocess_with_callback then the subprocess will be terminated. print(f"my_subprocess returned {y}")
Cancellation, Exceptions, Crashes#
If this
asyncrun_subprocess_with_callback()function is cancelled, then thesubprocesswill be terminated by calling Process.terminate(). Termination of thesubprocesswill occur even if thesubprocessis blocked. Note that CancelledError will not be raised in thesubprocess, instead thesubprocesswill be terminated immediately. If you want to perform sudden cleanup and halt of thesubprocessthen send it a message as in the below Example of Queue messaging.Exceptions raised in the
subprocesswill be re-raised fromrun_subprocess_with_callback(), including CancelledError. Because the Exception must be pickled back to the main process, it will lose its traceback. In Python ≥3.11, the traceback string from thesubprocessstack will be added to the Exception __notes__.Exceptions raised in the
callbackwill be suppressed.If the
subprocessexits abnormally without returning a value then a ProcessError will be raised fromrun_subprocess_with_callback().Pickling the subprocess#
Because “only picklable objects can be executed” by a Process. we cannot pass a local function with local variable bindings as the
subprocess. The best workaround is to define at the module top-level asubprocessfunction which takes all its parameters as arguments, and then use functools.partial to bind local values to thesubprocessparameters. See below Example of Queue messaging.The
callbackdoes not have this problem; we can pass a local function as thecallback.Messaging to the subprocess#
The
run_subprocess_with_callback()function provides acallbackfunction for messaging back up to the main process, but it does not provide a built-in way to message down to thesubprocess.To message down to the
subprocesswe can create and pass a messaging object to thesubprocess, for example a multiprocessing.Queue.Because the
subprocessis started in the “spawn” context, we must create theQueuein the"spawn"context.Example of Queue messaging from the main process to the subprocess#def my_subprocess( # This function will run in a new Process. msg_queue: multiprocessing.queues.Queue[str], callback: typing.Callable[[int], None], ) -> str: while (msg := msg_queue.get()) != "finish": callback(len(msg)) return "done" async def main() -> None: msg_queue: multiprocessing.queues.Queue[str] = multiprocessing.get_context("spawn").Queue() def local_callback(x:int) -> None: # This function will run in the main process event loop. print(f"callback {x}") async def send_messages() -> None: msg_queue.put("one") msg_queue.put("finish") # In this example we use gather() to run these 2 functions # concurrently (“at the same time”). # # 1. run_subprocess_with_callback() # 2. send_messages() # # In your code you will probably instead want to send_messages() # in reaction to some event. # y, _ = await asyncio.gather( run_subprocess_with_callback( functools.partial(my_subprocess, msg_queue), local_callback, ), send_messages(), ) print(f"my_subprocess returned {y}")
Note
To get proper type hinting on the
Queue:from __future__ import annotations
Async in the subprocess#
If you want the
subprocessto run anasyncfunction then make a new event loop and use that as the event loop in thesubprocess.Example async subprocess function#def my_subprocess(callback: typing.Callable[[int], None]) -> str: # This function will run in a new Process. async def work() -> str: callback(1) await asyncio.sleep(1) return "done" return asyncio.new_event_loop().run_until_complete(work())
A cancelled
awaitin thesubprocesswill raise CancelledError which will be propagated and re-raised fromrun_subprocess_with_callback()in the usual way.PyInstaller#
If you build a distribution with PyInstaller then you should call multiprocessing.freeze_support() to divert the spawn Process before the __main__ imports so that the spawn Process starts up faster.
Independence#
This
run_subprocess_with_callback()module depends only on the Python standard library so this module file can be copied and pasted into another project without depending on the Edifice package.