edifice.use_async#
- edifice.use_async(fn_coroutine, dependencies=(), max_concurrent=1)[source]#
Asynchronous side-effect Hook inside a
@componentfunction.- Parameters:
fn_coroutine (
Callable[[],Coroutine[None,None,None]]) – Function of no arguments which returns an async Coroutine to be run as a Task.dependencies (
Any) – Thefn_coroutineTask will be started when thedependenciesare not__eq__to the olddependencies.max_concurrent (
int|None) – Maximum number of concurrentfn_coroutineTasks to allow to run at the same time. If this limit is exceeded, then the oldestfn_coroutineTask will be cancelled to make room for the new Task. Default is1. Set toNoneto allow unlimited concurrency.
- Return type:
Callable[[],None]- Returns:
A function which can be called to cancel the
fn_coroutineTask manually.
Will create a new Task with the
fn_coroutinecoroutine. This Task will be bound to the lifecycle of this@component.The
fn_coroutinewill be called every time thedependencieschange. Only onefn_coroutinewill be allowed to run at a time.Exceptions raised from the
fn_coroutinewill be suppressed.For general advice about
asyncprogramming in Python see Developing with asyncio.use_async to fetch from the network#@component def WordDefinition(self, word:str): definition, definition_set = use_state("") async def fetcher(): try: definition_set("Fetch definition pending") x = await fetch_definition_from_the_internet(word) definition_set(x) except asyncio.CancelledError: definition_set("Fetch definition cancelled") raise except Exception: defintion_set("Fetch definition failed") cancel_fetcher = use_async(fetcher, word) with VBoxView(): Label(text=word) Label(text=definition) Button(text="Cancel fetch", on_click=lambda _:cancel_fetcher())
Cancellation#
Edifice will call cancel() on the async
fn_coroutineTask in three situations:If the
dependencieschange before thefn_coroutineTask completes, then thefn_coroutineTask will be cancelled if themax_concurrentlimit is exceeded so that the newfn_coroutineTask can start.The
use_asyncHook returns a function which can be called to cancel thefn_coroutineTasks manually. In the example above, thecancel_fetcher()function can be called to cancel the fetcher.If this
@componentis unmounted before thefn_coroutineTasks complete, then thefn_coroutineTasks will be cancelled.
Write your async
fn_coroutinefunction in such a way that it cleans itself up after exceptions. If you catch a CancelledError infn_coroutinethen always re-raise it.You may call a
use_state()setter during aCancelledErrorexception. If thefn_coroutineTask was cancelled because the component is being unmounted, then theuse_state()setter will have no effect.See also Task Cancellation.
Timers#
The
use_asyncHook is useful for timers and animation.Here is an example busy-wait UI indicator which is a bit more visually subtle than Qt’s barbershop-pole
QProgressBarwithminimum=0, maximum=0.BusyWaitIndicator component#@ed.component def BusyWaitIndicator( self, visible: bool = True, size: int | None = None, color: QColor | None = None, ): """ Animated busy wait indicator which looks like ⬤⬤⬤⬤⬤ If not visible, will still occupy the same layout space but will be transparent and animation will not run. """ color_: QColor color_ = color if color else QApplication.palette().color(QPalette.ColorRole.Text) tick, tick_set = ed.use_state(0) async def animation(): if visible: while True: await asyncio.sleep(0.2) tick_set(lambda t: (t + 1) % 5) ed.use_async(animation, visible) with ed.HBoxView( size_policy=QSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed), ): for i in range(0, 5): ed.Label( text="⬤", style={ "color": QColor( color_.red(), color_.green(), color_.blue(), 40 + (((i - tick) % 5) * 20) if visible else 0, ), } | {"font-size": size} if size else {}, )
Worker Process#
We can run a
def my_subprocessfunction in a worker Process by usingrun_subprocess_with_callback().run_subprocess_with_callback()is good for spawing a parallel worker Process from a@componentbecause if the@componentis unmounted, thenrun_subprocess_with_callback()will be cancelled and the Process will be immediately terminated. Which is usually what we want.use_async with run_subprocess_with_callback#def my_subprocess(callback: Callable[[str], None]) -> int: # This function will run in a new Process. callback("Starting long calculation") time.sleep(100.0) x = 1 + 2 callback(f"Finished long calculation")) return x @component def LongCalculator(self): calculation_progress, calculation_progress_set = ed.use_state("") def my_callback(progress: str) -> None: # This function will run in the main process event loop. calculation_progress_set(progress) async def run_my_subprocess() -> None: try: x = await run_subprocess_with_callback(my_subprocess, my_callback) calculation_progress_set(f"Result: {x}") except asyncio.CancelledError: raise except Exception as e: calculation_progress_set(f"Error: {str(e)}") use_async(run_my_subprocess) Label(text=calculation_progress)
Yielding to Qt#
Python
asynccoroutines are a form of cooperative multitasking. During anasyncfunction, Qt will get a chance to render the UI and process events every time there is anawait. Sometimes you may want to deliberately yield to the Qt event loop to allow it to render and process events. The way to do that is asyncio.sleep(0).await asyncio.sleep(0)