run_subprocess_with_callback#

async edifice.run_subprocess_with_callback(subprocess, callback, daemon=None)[source]#

Run an async def subprocess function in a Process and return the result.

Parameters:
  • subprocess (Callable[[Callable[[ParamSpec(_P_callback)], None]], Awaitable[TypeVar(_T_subprocess)]]) –

    The async function to run in a Process. The subprocess function will run in a sub-process in a new event loop. This subprocess function takes a single argument: a function with the same type as the callback function. The subprocess function must be picklable.

  • callback (Callable[[ParamSpec(_P_callback)], None]) – The callback function to pass to the subprocess when it starts. This function will run in the main process event loop. All of the arguments to the callback function must be picklable.

  • daemon (Optional[bool]) – Optional argument which will be passed to the Process daemon argument.

Return type:

TypeVar(_T_subprocess)

The advantage of run_subprocess_with_callback() over run_in_executor ProcessPoolExecutor is that run_subprocess_with_callback() behaves well and cleans up properly in the event of cancellation, exceptions, and crashes. This function is useful for a long-running parallel worker subprocess for which we want to report progress back to the main GUI event loop. Like PyTorch stuff.

The subprocess will be started when await run_subprocess_with_callback() is entered, and the subprocess is guaranteed to be terminated when await run_subprocess_with_callback() completes.

While the subprocess is running, it may call the supplied callback function. The callback function will run in the main event loop of the calling process.

The subprocess will be started with the “spawn” start method, so it will not inherit any file handles from the calling process.

Example#
async def my_subprocess(callback: Callable[[int], None]) -> str:
    # This function will run in a new Process in a new event loop.
    callback(1)
    await asyncio.sleep(1)
    callback(2)
    await asyncio.sleep(1)
    callback(3)
    return "done"

def my_callback(x:int) -> None:
    # This function will run in the main process event loop.
    print(f"callback {x}")

async def main() -> None:
    y = await run_subprocess_with_callback(my_subprocess, my_callback)

    # If this main() function is cancelled while awaiting the
    # subprocess then the subprocess will be terminated.

    print(f"my_subprocess returned {y}")

Cancellation, Exceptions, Crashes#

If this async run_subprocess_with_callback() function is cancelled, then the subprocess will be terminated by calling Process.terminate(). Termination of the subprocess will occur even if the subprocess is blocked. Note that CancelledError will not be raised in the subprocess, instead the subprocess will be terminated immediately. If you want to perform sudden cleanup and halt of the subprocess then send it a message as in the below Example of Queue messaging.

Exceptions raised in the subprocess will be re-raised from run_subprocess_with_callback(), including CancelledError. Because the Exception must be pickled back to the main process, it will lose its traceback. In Python ≥3.11, the traceback string from the subprocess stack will be added to the Exception __notes__.

Exceptions raised in the callback will be suppressed.

If the subprocess exits abnormally without returning a value then a ProcessError will be raised from run_subprocess_with_callback().

Pickling the subprocess#

Note

Because “only picklable objects can be executed” by a Process. we cannot pass a local function as the subprocess. The best workaround is to define at the module top-level a subprocess function which takes all its parameters as arguments, and then use functools.partial to bind local values to the subprocess parameters.

The callback does not have this problem; we can pass a local function as the callback.

Messaging to the subprocess#

The run_subprocess_with_callback() function provides a callback function for messaging back up to the main process, but it does not provide a built-in way to message down to the subprocess.

To message down to the subprocess we can create and pass a messaging object to the subprocess, for example a multiprocessing.Queue.

Because the subprocess is started in the “spawn” context, we must create the Queue in the "spawn" context.

Example of Queue messaging from the main process to the subprocess#
async def my_subprocess(
    # This function will run in a new Process in a new event loop.
    msg_queue: multiprocessing.queues.Queue[str],
    callback: typing.Callable[[int], None],
) -> str:
    while (msg := msg_queue.get()) != "finish":
        callback(len(msg))
    return "done"

async def main() -> None:
    msg_queue: multiprocessing.queues.Queue[str] = multiprocessing.get_context("spawn").Queue()

    def local_callback(x:int) -> None:
        # This function will run in the main process event loop.
        print(f"callback {x}")

    async def send_messages() -> None:
        msg_queue.put("one")
        msg_queue.put("finish")

    # In this example we use gather() to run these 2 functions
    # concurrently (“at the same time”).
    # 1. run_subprocess_with_callback()
    # 2. send_messages()
    y, _ = await asyncio.gather(
        run_subprocess_with_callback(
            functools.partial(my_subprocess, msg_queue),
            local_callback,
        ),
        send_messages())
    )

    print(f"my_subprocess returned {y}")

Note

To get proper type hinting on the Queue:

from __future__ import annotations