run_subprocess_with_callback#

async edifice.run_subprocess_with_callback(subprocess, callback, daemon=None)[source]#

Run a subprocess function in a Process and return the result.

Parameters:
  • subprocess (Callable[[Callable[[ParamSpec(_P_callback)], None]], TypeVar(_T_subprocess)]) –

    The function to run in a Process. This subprocess function takes a single argument: a function with the same type as the callback function. The subprocess function must be picklable.

  • callback (Callable[[ParamSpec(_P_callback)], None]) – The callback function to pass to the subprocess when it starts. The subprocess may call the callback function at any time. The callback function will run in the main process event loop. All of the arguments to the callback function must be picklable.

  • daemon (Optional[bool]) – Optional argument which will be passed to the Process daemon argument.

Return type:

TypeVar(_T_subprocess)

The advantage of run_subprocess_with_callback() over run_in_executor ProcessPoolExecutor is that run_subprocess_with_callback() behaves well and cleans up properly in the event of cancellation, exceptions, and crashes. ProcessPoolExecutor cannot be cancelled.. This function is useful for a CPU-bound parallel worker subprocess for which we want to report progress back to the main GUI event loop. Like PyTorch stuff.

The subprocess will be started when await run_subprocess_with_callback() is entered, and the subprocess is guaranteed to be terminated when await run_subprocess_with_callback() completes.

While the subprocess is running, it may call the supplied callback function. The callback function will run in the main event loop of the calling process.

The subprocess will be started with the “spawn” start method, so it will not inherit any file handles from the calling process.

Example#
def my_subprocess(callback: Callable[[int], None]) -> str:
    # This function will run in a new Process.
    callback(1)
    time.sleep(1)
    callback(2)
    time.sleep(1)
    callback(3)
    return "done"

def my_callback(x:int) -> None:
    # This function will run in the main process event loop.
    print(f"callback {x}")

async def main() -> None:
    y = await run_subprocess_with_callback(my_subprocess, my_callback)

    # If this main() function is cancelled while awaiting
    # run_subprocess_with_callback then the subprocess will be terminated.

    print(f"my_subprocess returned {y}")

Cancellation, Exceptions, Crashes#

If this async run_subprocess_with_callback() function is cancelled, then the subprocess will be terminated by calling Process.terminate(). Termination of the subprocess will occur even if the subprocess is blocked. Note that CancelledError will not be raised in the subprocess, instead the subprocess will be terminated immediately. If you want to perform sudden cleanup and halt of the subprocess then send it a message as in the below Example of Queue messaging.

Exceptions raised in the subprocess will be re-raised from run_subprocess_with_callback(), including CancelledError. Because the Exception must be pickled back to the main process, it will lose its traceback. In Python ≥3.11, the traceback string from the subprocess stack will be added to the Exception __notes__.

Exceptions raised in the callback will be suppressed.

If the subprocess exits abnormally without returning a value then a ProcessError will be raised from run_subprocess_with_callback().

Pickling the subprocess#

Because “only picklable objects can be executed” by a Process. we cannot pass a local function with local variable bindings as the subprocess. The best workaround is to define at the module top-level a subprocess function which takes all its parameters as arguments, and then use functools.partial to bind local values to the subprocess parameters. See below Example of Queue messaging.

The callback does not have this problem; we can pass a local function as the callback.

Messaging to the subprocess#

The run_subprocess_with_callback() function provides a callback function for messaging back up to the main process, but it does not provide a built-in way to message down to the subprocess.

To message down to the subprocess we can create and pass a messaging object to the subprocess, for example a multiprocessing.Queue.

Because the subprocess is started in the “spawn” context, we must create the Queue in the "spawn" context.

Example of Queue messaging from the main process to the subprocess#
def my_subprocess(
    # This function will run in a new Process.
    msg_queue: multiprocessing.queues.Queue[str],
    callback: typing.Callable[[int], None],
) -> str:
    while (msg := msg_queue.get()) != "finish":
        callback(len(msg))
    return "done"

async def main() -> None:
    msg_queue: multiprocessing.queues.Queue[str] = multiprocessing.get_context("spawn").Queue()

    def local_callback(x:int) -> None:
        # This function will run in the main process event loop.
        print(f"callback {x}")

    async def send_messages() -> None:
        msg_queue.put("one")
        msg_queue.put("finish")

    # In this example we use gather() to run these 2 functions
    # concurrently (“at the same time”).
    #
    # 1. run_subprocess_with_callback()
    # 2. send_messages()
    #
    # In your code you will probably instead want to send_messages()
    # in reaction to some event.
    #
    y, _ = await asyncio.gather(
        run_subprocess_with_callback(
            functools.partial(my_subprocess, msg_queue),
            local_callback,
        ),
        send_messages(),
    )

    print(f"my_subprocess returned {y}")

Note

To get proper type hinting on the Queue:

from __future__ import annotations

Async in the subprocess#

If you want the subprocess to run an async function then make a new event loop and use that as the event loop in the subprocess.

Example async subprocess function#
def my_subprocess(callback: typing.Callable[[int], None]) -> str:
    # This function will run in a new Process.

    async def work() -> str:
        callback(1)
        await asyncio.sleep(1)
        return "done"

    return asyncio.new_event_loop().run_until_complete(work())

A cancelled await in the subprocess will raise CancelledError which will be propagated and re-raised from run_subprocess_with_callback() in the usual way.

PyInstaller#

If you build a distribution with PyInstaller then you should call multiprocessing.freeze_support() to divert the spawn Process before the __main__ imports so that the spawn Process starts up faster.

Independence#

This run_subprocess_with_callback() module depends only on the Python standard library so this module file can be copied and pasted into another project without depending on the Edifice package.