run_subprocess_with_callback#
- async edifice.run_subprocess_with_callback(subprocess, callback, daemon=None)[source]#
Run an
async def subprocess
function in a Process and return the result.- Parameters:
subprocess (
Callable
[[Callable
[[ParamSpec
(_P_callback
)],None
]],Awaitable
[TypeVar
(_T_subprocess
)]]) –The async function to run in a Process. The
subprocess
function will run in a sub-process in a new event loop. Thissubprocess
function takes a single argument: a function with the same type as thecallback
function. Thesubprocess
function must be picklable.callback (
Callable
[[ParamSpec
(_P_callback
)],None
]) – Thecallback
function to pass to thesubprocess
when it starts. This function will run in the main process event loop. All of the arguments to thecallback
function must be picklable.daemon (
Optional
[bool
]) – Optional argument which will be passed to the Process daemon argument.
- Return type:
TypeVar
(_T_subprocess
)
The advantage of
run_subprocess_with_callback()
over run_in_executor ProcessPoolExecutor is thatrun_subprocess_with_callback()
behaves well and cleans up properly in the event of cancellation, exceptions, and crashes. This function is useful for a long-running parallel worker subprocess for which we want to report progress back to the main GUI event loop. Like PyTorch stuff.The
subprocess
will be started whenawait
run_subprocess_with_callback()
is entered, and thesubprocess
is guaranteed to be terminated whenawait
run_subprocess_with_callback()
completes.While the
subprocess
is running, it may call the suppliedcallback
function. Thecallback
function will run in the main event loop of the calling process.The
subprocess
will be started with the “spawn” start method, so it will not inherit any file handles from the calling process.Example#async def my_subprocess(callback: Callable[[int], None]) -> str: # This function will run in a new Process in a new event loop. callback(1) await asyncio.sleep(1) callback(2) await asyncio.sleep(1) callback(3) return "done" def my_callback(x:int) -> None: # This function will run in the main process event loop. print(f"callback {x}") async def main() -> None: y = await run_subprocess_with_callback(my_subprocess, my_callback) # If this main() function is cancelled while awaiting the # subprocess then the subprocess will be terminated. print(f"my_subprocess returned {y}")
Cancellation, Exceptions, Crashes#
If this
async
run_subprocess_with_callback()
function is cancelled, then thesubprocess
will be terminated by calling Process.terminate(). Termination of thesubprocess
will occur even if thesubprocess
is blocked. Note that CancelledError will not be raised in thesubprocess
, instead thesubprocess
will be terminated immediately. If you want to perform sudden cleanup and halt of thesubprocess
then send it a message as in the below Example of Queue messaging.Exceptions raised in the
subprocess
will be re-raised fromrun_subprocess_with_callback()
, including CancelledError. Because the Exception must be pickled back to the main process, it will lose its traceback. In Python ≥3.11, the traceback string from thesubprocess
stack will be added to the Exception __notes__.Exceptions raised in the
callback
will be suppressed.If the
subprocess
exits abnormally without returning a value then a ProcessError will be raised fromrun_subprocess_with_callback()
.Pickling the subprocess#
Note
Because “only picklable objects can be executed” by a Process. we cannot pass a local function as the
subprocess
. The best workaround is to define at the module top-level asubprocess
function which takes all its parameters as arguments, and then use functools.partial to bind local values to thesubprocess
parameters.The
callback
does not have this problem; we can pass a local function as thecallback
.Messaging to the subprocess#
The
run_subprocess_with_callback()
function provides acallback
function for messaging back up to the main process, but it does not provide a built-in way to message down to thesubprocess
.To message down to the
subprocess
we can create and pass a messaging object to thesubprocess
, for example a multiprocessing.Queue.Because the
subprocess
is started in the “spawn” context, we must create theQueue
in the"spawn"
context.Example of Queue messaging from the main process to the subprocess#async def my_subprocess( # This function will run in a new Process in a new event loop. msg_queue: multiprocessing.queues.Queue[str], callback: typing.Callable[[int], None], ) -> str: while (msg := msg_queue.get()) != "finish": callback(len(msg)) return "done" async def main() -> None: msg_queue: multiprocessing.queues.Queue[str] = multiprocessing.get_context("spawn").Queue() def local_callback(x:int) -> None: # This function will run in the main process event loop. print(f"callback {x}") async def send_messages() -> None: msg_queue.put("one") msg_queue.put("finish") # In this example we use gather() to run these 2 functions # concurrently (“at the same time”). # 1. run_subprocess_with_callback() # 2. send_messages() y, _ = await asyncio.gather( run_subprocess_with_callback( functools.partial(my_subprocess, msg_queue), local_callback, ), send_messages()) ) print(f"my_subprocess returned {y}")
Note
To get proper type hinting on the
Queue
:from __future__ import annotations