edifice.use_async#

edifice.use_async(fn_coroutine, dependencies=(), max_concurrent=1)[source]#

Asynchronous side-effect Hook inside a @component function.

Parameters:
  • fn_coroutine (Callable[[], Coroutine[None, None, None]]) – Function of no arguments which returns an async Coroutine to be run as a Task.

  • dependencies (Any) – The fn_coroutine Task will be started when the dependencies are not __eq__ to the old dependencies.

  • max_concurrent (int | None) – Maximum number of concurrent fn_coroutine Tasks to allow to run at the same time. If this limit is exceeded, then the oldest fn_coroutine Task will be cancelled to make room for the new Task. Default is 1. Set to None to allow unlimited concurrency.

Return type:

Callable[[], None]

Returns:

A function which can be called to cancel the fn_coroutine Task manually.

Will create a new Task with the fn_coroutine coroutine. This Task will be bound to the lifecycle of this @component.

The fn_coroutine will be called every time the dependencies change. Only one fn_coroutine will be allowed to run at a time.

Exceptions raised from the fn_coroutine will be suppressed.

For general advice about async programming in Python see Developing with asyncio.

use_async to fetch from the network#
@component
def WordDefinition(self, word:str):
    definition, definition_set = use_state("")

    async def fetcher():
        try:
            definition_set("Fetch definition pending")
            x = await fetch_definition_from_the_internet(word)
            definition_set(x)
        except asyncio.CancelledError:
            definition_set("Fetch definition cancelled")
            raise
        except Exception:
            defintion_set("Fetch definition failed")

    cancel_fetcher = use_async(fetcher, word)

    with VBoxView():
        Label(text=word)
        Label(text=definition)
        Button(text="Cancel fetch", on_click=lambda _:cancel_fetcher())

Cancellation#

Edifice will call cancel() on the async fn_coroutine Task in three situations:

  1. If the dependencies change before the fn_coroutine Task completes, then the fn_coroutine Task will be cancelled if the max_concurrent limit is exceeded so that the new fn_coroutine Task can start.

  2. The use_async Hook returns a function which can be called to cancel the fn_coroutine Tasks manually. In the example above, the cancel_fetcher() function can be called to cancel the fetcher.

  3. If this @component is unmounted before the fn_coroutine Tasks complete, then the fn_coroutine Tasks will be cancelled.

Write your async fn_coroutine function in such a way that it cleans itself up after exceptions. If you catch a CancelledError in fn_coroutine then always re-raise it.

You may call a use_state() setter during a CancelledError exception. If the fn_coroutine Task was cancelled because the component is being unmounted, then the use_state() setter will have no effect.

See also Task Cancellation.

Timers#

The use_async Hook is useful for timers and animation.

Here is an example busy-wait UI indicator which is a bit more visually subtle than Qt’s barbershop-pole QProgressBar with minimum=0, maximum=0.

BusyWaitIndicator component#
@ed.component
def BusyWaitIndicator(
    self,
    visible: bool = True,
    size: int | None = None,
    color: QColor | None = None,
):
    """
    Animated busy wait indicator which looks like ⬤⬤⬤⬤⬤

    If not visible, will still occupy the same layout space but will be
    transparent and animation will not run.
    """

    color_: QColor
    color_ = color if color else QApplication.palette().color(QPalette.ColorRole.Text)
    tick, tick_set = ed.use_state(0)

    async def animation():
        if visible:
            while True:
                await asyncio.sleep(0.2)
                tick_set(lambda t: (t + 1) % 5)

    ed.use_async(animation, visible)

    with ed.HBoxView(
        size_policy=QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed),
    ):
        for i in range(0, 5):
            ed.Label(
                text="⬤",
                style={
                    "color": QColor(
                        color_.red(),
                        color_.green(),
                        color_.blue(),
                        40 + (((i - tick) % 5) * 20) if visible else 0,
                    ),
                }
                | {"font-size": size}
                if size
                else {},
            )

Worker Process#

We can run a def my_subprocess function in a worker Process by using run_subprocess_with_callback().

run_subprocess_with_callback() is good for spawing a parallel worker Process from a @component because if the @component is unmounted, then run_subprocess_with_callback() will be cancelled and the Process will be immediately terminated. Which is usually what we want.

use_async with run_subprocess_with_callback#
def my_subprocess(callback: Callable[[str], None]) -> int:
    # This function will run in a new Process.
    callback("Starting long calculation")
    time.sleep(100.0)
    x = 1 + 2
    callback(f"Finished long calculation"))
    return x

@component
def LongCalculator(self):
    calculation_progress, calculation_progress_set = ed.use_state("")

    def my_callback(progress: str) -> None:
        # This function will run in the main process event loop.
        calculation_progress_set(progress)

    async def run_my_subprocess() -> None:
        try:
            x = await run_subprocess_with_callback(my_subprocess, my_callback)
            calculation_progress_set(f"Result: {x}")
        except asyncio.CancelledError:
            raise
        except Exception as e:
            calculation_progress_set(f"Error: {str(e)}")

    use_async(run_my_subprocess)

    Label(text=calculation_progress)

Yielding to Qt#

Python async coroutines are a form of cooperative multitasking. During an async function, Qt will get a chance to render the UI and process events every time there is an await. Sometimes you may want to deliberately yield to the Qt event loop to allow it to render and process events. The way to do that is asyncio.sleep(0).

await asyncio.sleep(0)