from __future__ import annotations
import asyncio
import functools
import inspect
import logging
import threading
import typing as tp
from collections import defaultdict
from collections.abc import Callable, Coroutine, Iterable, Iterator
from copy import copy
from dataclasses import dataclass
from textwrap import dedent
from typing_extensions import Self
from .qt import QT_VERSION
if QT_VERSION == "PyQt6" and not tp.TYPE_CHECKING:
from PyQt6 import QtCore, QtGui, QtWidgets
else:
from PySide6 import QtCore, QtGui, QtWidgets
_T_use_state = tp.TypeVar("_T_use_state")
_T_Element = tp.TypeVar("_T_Element", bound="Element")
_T_widget = tp.TypeVar("_T_widget", bound=QtWidgets.QWidget)
logger = logging.getLogger("Edifice")
P = tp.ParamSpec("P")
def stylevalue_to_str(v: QtGui.QColor | tp.Any) -> str:
match v:
case QtGui.QColor():
# should be equivalent to f"rgba({color.red()}, {color.green()}, {color.blue()}, {color.alpha()})"
return v.name(format=QtGui.QColor.NameFormat.HexArgb)
case _:
return v
def _dict_to_style(d: dict[str, tp.Any]) -> str:
return "{" + (";".join(f"{k}: {stylevalue_to_str(v)}" for (k, v) in d.items())) + "}"
# TODO
# https://stackoverflow.com/questions/37278647/fire-and-forget-python-async-await/37345564#37345564
# “Replace asyncio.ensure_future with asyncio.create_task everywhere if you're
# using Python >= 3.7 It's a newer, nicer way to spawn tasks.”
def _ensure_future(fn):
# Ensures future if fn is a coroutine, otherwise don't modify fn
if inspect.iscoroutinefunction(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
return asyncio.ensure_future(fn(*args, **kwargs))
return wrapper
return fn
class CommandType:
def __init__(self, fn: Callable[P, tp.Any], *args: P.args, **kwargs: P.kwargs):
# The return value of fn is ignored and should thus return None. However, in
# order to test with equality on the CommandTypes we need to allow fn to return
# Any value to not allocate wrapper functions ignoring the return value.
self.fn = fn
self.args = args
self.kwargs = kwargs
def __str__(self):
return f"{self.fn}(*{self.args},**{self.kwargs})"
def __repr__(self):
return f"{self.fn.__repr__()}(*{self.args.__repr__()},**{self.kwargs.__repr__()})"
def __eq__(self, other):
if not isinstance(other, CommandType):
return False
if self.fn != other.fn:
return False
if len(self.args) != len(other.args):
return False
if len(self.kwargs) != len(other.kwargs):
return False
for i, arg in enumerate(self.args):
if arg != other.args[i]:
return False
for i, arg in self.kwargs.items():
if arg != other.kwargs[i]:
return False
return True
def __hash__(self):
return (self.fn, *self.args, *self.kwargs.items()).__hash__()
_T_use_state = tp.TypeVar("_T_use_state")
"""
Deferred function call. A tuple with a Callable, and all of the values of its arguments.
"""
class PropsDict:
"""An immutable dictionary for storing props.
Props may be accessed either by indexing
props["myprop"]
or by attribute
props.myprop
By convention, all PropsDict methods will start with :code:`_` to
not conflict with keys.
.. document private functions
.. autoproperty:: _keys
.. autoproperty:: _items
"""
__slots__ = ("_d",)
def __init__(self, dictionary: tp.Mapping[str, tp.Any]):
super().__setattr__("_d", dictionary)
def __getitem__(self, key) -> tp.Any:
return self._d[key]
def __setitem__(self, key, value) -> tp.NoReturn:
raise ValueError("Props are immutable")
@property
def _keys(self) -> Iterator:
"""Returns the keys of the props dict as a list."""
return self._d.keys()
@property
def _items(self) -> Iterator:
"""Returns the (key, value) of the props dict as a list."""
return self._d.items()
def _get(self, key: str, default: tp.Any | None = None) -> tp.Any:
"""Gets item by key.
Equivalent to dictionary.get(key, default)
Args:
key: key to the PropsDict
default: returned value if key is not found in props
Returns:
The corresponding value if key is in PropsDict, the default
value otherwise.
"""
return self._d.get(key, default)
def __len__(self) -> int:
return len(self._d)
def __iter__(self):
return iter(self._d)
def __contains__(self, k) -> bool:
return k in self._d
def __getattr__(self, key) -> tp.Any:
if key in self._d:
return self._d[key]
raise KeyError(str(key) + " not in props")
def __repr__(self):
return "PropsDict(" + str(repr(self._d)) + ")"
def __str__(self) -> str:
return "PropsDict(" + str(self._d) + ")"
def __setattr__(self, key, value) -> tp.NoReturn:
raise ValueError("Props are immutable")
[docs]
class Reference(tp.Generic[_T_Element]):
"""Reference to an :class:`Element` for imperative commands.
Edifice tries to abstract away the need to issue
imperative commands to widgets but this is not always possible and not
every feature of Qt Widgets is supported by Edifice.
In some cases, we might need to issue imperative commands to the
Elements and their underlying Qt Widgets.
:class:`Reference` gives us access to a rendered :class:`Element`.
Create a :class:`Reference` with the :func:`use_ref` Hook.
:func:`Element.register_ref` registers the :class:`Reference` object
to the :class:`Element`.
:class:`Reference` can be dereferenced by calling it. An instance of
type :code:`Reference[Label]` will dereference to an instance of
:code:`Label` when called.
Initially, a :class:`Reference` object will dereference to :code:`None`.
After the first render, it will dereference to the rendered :class:`Element`.
When the rendered :class:`Element` dismounts, the reference will once again
dereference to :code:`None`.
:class:`Reference` is valid whenever it is not :code:`None`.
:class:`Reference` will evaluate false if the :class:`Element` is :code:`None`.
Access the QWidget underlying a :class:`QtWidgetElement`,
through the :code:`underlying` attribute of the :class:`QtWidgetElement`.
:func:`use_effect` Hooks always run after the Elements are fully rendered.
.. code-block:: python
@component
def MyComp(self):
label_ref:Reference[Label] = use_ref()
def did_render():
label = label_ref()
if label and label.underlying:
# Set the text of the Label’s underlying QLabel widget.
label.underlying.setText("After")
use_effect(did_render, ())
with VBoxView():
Label("Before").register_ref(label_ref)
"""
[docs]
def __init__(self):
self._value: _T_Element | None = None
def __bool__(self) -> bool:
return self._value is not None
def __hash__(self) -> int:
return id(self)
def __call__(self) -> _T_Element | None:
return self._value
T = tp.TypeVar("T")
class AppProtocol(tp.Protocol):
"""Protocol for App"""
def _request_rerender(self, components: Iterable[Element], kwargs: dict[str, tp.Any]):
pass
def _defer_rerender(self, element: Element):
pass
def stop(self):
pass
class _Tracker:
"""
During a render, track the current element and the children being
added to the current element.
"""
def __init__(self, component: Element):
self.component = component
self.children: list[Element] = []
def append_child(self, component: Element):
self.children.append(component)
class _RenderContext:
"""
Encapsulates various state that's needed for rendering.
One _RenderContext is created when a render begins, and it is destroyed
at the end of the render.
"""
__slots__ = (
"_callback_queue",
"component_to_old_props",
"component_tree",
"current_element",
"engine",
"enqueued_deletions",
"need_qt_command_reissue",
"trackers",
"widget_tree",
)
trackers: list[_Tracker]
"""Stack of _Tracker"""
current_element: Element | None
"""The Element currently being rendered."""
# I guess static scope typing of instance members is normal in Python?
# https://peps.python.org/pep-0526/#class-and-instance-variable-annotations
def __init__(
self,
engine: RenderEngine,
):
self.engine = engine
self.need_qt_command_reissue = {}
self.component_to_old_props = {}
self.component_tree: dict[Element, list[Element]] = {}
"""
Map of a component to its children.
"""
self.widget_tree: dict[Element, _WidgetTree] = {}
"""
Map of a component to its rendered widget tree.
"""
self.enqueued_deletions: list[Element] = []
self._callback_queue = []
self.trackers = []
self.current_element = None
def mark_props_change(self, component: Element, newprops: PropsDict):
if component not in self.component_to_old_props:
self.component_to_old_props[component] = component.props
component._props = newprops._d
def get_old_props(self, component):
if component in self.component_to_old_props:
return self.component_to_old_props[component]
return PropsDict({})
def mark_qt_rerender(self, component: QtWidgetElement, need_rerender: bool):
self.need_qt_command_reissue[component] = need_rerender
def need_rerender(self, component: QtWidgetElement):
return self.need_qt_command_reissue.get(component, False)
local_state = threading.local()
def get_render_context() -> _RenderContext:
return getattr(local_state, "render_context") # noqa: B009
def get_render_context_maybe() -> _RenderContext | None:
return getattr(local_state, "render_context", None)
[docs]
def child_place(element: Element) -> None:
"""
Place a child passed through the special :code:`children` **props** into
the layout of a parent :func:`@component<component>`.
"""
get_render_context().trackers[-1].append_child(element)
[docs]
class Element:
"""The base class for Edifice Elements.
In user code you should almost never use :class:`Element` directly. Instead
use :doc:`Base Elements <../base_components>` and :func:`@component<component>` Elements.
A :class:`Element` is a stateful container wrapping a render function.
Elements have both internal and external properties.
The external properties, **props**, are passed into the :class:`Element` by another
:class:`Element`’s render function through the constructor. These values are owned
by the external caller and should not be modified by this :class:`Element`.
"""
_render_changes_context: dict | None = None
_render_unwind_context: dict | None = None
_controller: AppProtocol | None = None
_edifice_internal_references: set[Reference[Self]] | None = None
[docs]
def __init__(self):
super().__setattr__("_edifice_internal_references", set())
self._props: dict[str, tp.Any] = {"children": []}
# Ensure we only construct this element once
assert getattr(self, "_initialized", False) is False
self._initialized = True
self._key: str | None = None
ctx = get_render_context_maybe()
if ctx is not None:
trackers = ctx.trackers
if len(trackers) > 0:
parent = trackers[-1]
parent.append_child(self)
# We don't really need these hook indices to be per-instance state.
# They are only used during a render.
self._hook_state_index: int = 0
"""use_state hook index for current render."""
self._hook_effect_index: int = 0
"""use_effect hook index for current render."""
self._hook_async_index: int = 0
"""use_async hook index for current render."""
def __enter__(self: Self) -> Self:
ctx = get_render_context()
tracker = _Tracker(self)
ctx.trackers.append(tracker)
return self
def __exit__(self, *args):
ctx = get_render_context()
tracker = ctx.trackers.pop()
prop: list[Element] = list(self._props.get("children", ()))
prop.extend(tracker.children)
self._props["children"] = tuple(prop)
def _register_props(self, props: tp.Mapping[str, tp.Any]) -> None:
"""Register props.
Args:
props: a dictionary representing the props to register.
"""
self._props.update(props)
[docs]
def set_key(self: Self, key: str | None) -> Self:
"""Set the key of an :class:`Element`.
The key is used when re-rendering to match new child Elements
with old child Elements.
The diffing algorithm will assume that child Elements with the same key
are identical.
Each key must be unique and persistent. The **Edifice Rules
of Keys** are the same as the
`React Rules of Keys <https://react.dev/learn/rendering-lists#rules-of-keys>`_.
- **Keys must be unique among siblings.** However, it’s okay to use the same keys for Elements of different parents.
- **Keys must not change** or that defeats their purpose! Don’t generate them while rendering.
Returns the Element to allow for chaining.
Example::
with VBoxView():
if english:
Label("Hello").set_key("en")
if french:
Label("Bonjour").set_key("fr")
if spanish:
Label("Hola").set_key("es")
Args:
key: The Element unique key string.
Returns:
The Element.
"""
self._key = key
return self
[docs]
def register_ref(self: Self, reference: Reference[Self]) -> Self:
"""Registers provided :class:`Reference` to this Element.
During render, the provided reference will be set to point to the
currently rendered instance of this Element.
Args:
reference: the Reference to register
Returns:
The Element self.
"""
assert self._edifice_internal_references is not None
self._edifice_internal_references.add(reference)
return self
@property
def children(self) -> tuple[Element, ...]:
"""The children of this Element."""
return self._props["children"]
@property
def props(self) -> PropsDict:
"""The props of this Element."""
return PropsDict(self._props)
def _should_update(self, newprops: PropsDict) -> bool:
"""Determines if the Element should rerender upon receiving new props and state.
The :code:`newprops` reflect the
props that change: they
may be a subset of the props. When this function is called,
all props of this Element are the old values, so you can compare
:code:`self.props` to :code:`newprops`
to determine changes.
Args:
newprops: the new set of props
Returns:
Whether or not the Element should be rerendered.
"""
# TODO if an Element has children, then _should_update will always
# return True, because the children will always be different, because
# _recycle_children hasn't been called yet. Is that correct behavior?
for k, v in newprops._items:
if k in self.props:
# If the prop is in the old props, then we check if it's changed.
v2 = self.props._get(k)
if v2 != v:
return True
else:
# If the prop is not in the old props, then we rerender.
return True
return False
def __call__(self, *args):
children = []
for a in args:
if isinstance(a, list):
children.extend(a)
elif a:
children.append(a)
self._props["children"] = tuple(children)
return self
def __hash__(self):
return id(self)
def _tags(self):
classname = self.__class__.__name__
return [
f"<{classname} id=0x%x %s>"
% (id(self), " ".join(f"{p}={val}" for (p, val) in self.props._items if p != "children")),
f"</{classname}>",
f"<{classname} id=0x%x %s />"
% (id(self), " ".join(f"{p}={val}" for (p, val) in self.props._items if p != "children")),
]
def __str__(self):
tags = self._tags()
return tags[2]
def _render_element(self) -> Element | None:
"""Logic for rendering, must be overridden.
The render logic for this Element, not implemented for this abstract class.
The render function itself should be purely stateless, because the application
state should not depend on whether or not the render function is called.
To introduce state or effects in the render function, use
:func:`use_state` or :func:`use_effect`.
Args:
None
Returns:
An Element object.
"""
raise NotImplementedError
P = tp.ParamSpec("P")
selfT = tp.TypeVar("selfT", bound=Element)
def not_ignored(arg: tuple[str, tp.Any]) -> bool:
return arg[0][0] != "_"
[docs]
def component(f: Callable[tp.Concatenate[selfT, P], None]) -> Callable[P, Element]:
"""
Decorator which turns a render function of **props** into an :class:`Element`.
The first argument of the render function must be :code:`self`, for
internal technical reasons.
Props
-----
The **props** are the arguments passed to the :func:`@component<component>` function.
The :func:`@component<component>` will be re-rendered when some of its **props** are not
:code:`__eq__` to the **props** from the last time the
:func:`@component<component>` rendered.
If the **props** are all :code:`__eq__`, the :func:`@component<component>`
will not re-render.
Arguments with a leading underscore :code:`_` will not count as **props**.
They are normal function arguments, and changing their value will not
cause the :func:`@component<component>` to re-render.
Render
------
The @component function must render exactly one :class:`Element`.
In the @component function, declare a tree of :class:`Element` with a
single root.
.. code-block:: python
:caption: a tree with a single root
@component
def MySimpleComp(self, prop1, prop2, prop3):
with VBoxView():
Label(text=prop1)
Label(text=prop2)
Label(text=prop3)
State
-----
The :func:`@component<component>` function is a stateless function from **props**
to an :class:`Element`.
To introduce **state** into a :func:`@component<component>`, use :doc:`Hooks <../hooks>`.
Composition
-----------
An :func:`@component<component>`’s children can be passed to it as **props**. This allows a
:func:`@component<component>` to act as a parent container for other :class:`Element` s,
and to render them in a specific layout.
There are two features to accomplish this.
1. The special :code:`children` **prop** is a tuple of :class:`Element` s.
The special :code:`children` **prop** must
be declared as a **keyword argument** with a default empty tuple,
like this :code:`children:tuple[Element, ...]=()`.
Do not explicitly pass the :code:`children` **prop** when calling
the :func:`@component<component>`. The children will be passed implicitly.
2. The :func:`child_place` function is used to place the :code:`children`
in the parent :func:`@component<component>`’s render function.
With these two features, you can declare how the parent
container :func:`@component<component>` will render its children.
.. code-block:: python
:caption: Example ContainerComponent with children props
@component
def ContainerComponent(self:Element, children:tuple[Element, ...]=()):
with VBoxView():
for child in children:
with VBoxView():
child_place(child)
.. code-block:: python
:caption: Example ContainerComponent usage
with ContainerComponent():
Label(text="First Child")
Label(text="Second Child")
Label(text="Third Child")
Args:
f:
The render function to wrap.
Its first argument must be :code:`self`.
Subsequent arguments are **props**.
"""
varnames = f.__code__.co_varnames[1:]
signature = inspect.signature(f).parameters
defaults = {k: v.default for k, v in signature.items() if v.default is not inspect.Parameter.empty and k[0] != "_"}
class ComponentElement(Element):
_edifice_original = f
@functools.wraps(f)
def __init__(self, *args: P.args, **kwargs: P.kwargs):
super().__init__()
name_to_val = defaults.copy()
name_to_val.update(filter(not_ignored, zip(varnames, args, strict=False)))
name_to_val.update((k, v) for (k, v) in kwargs.items() if k[0] != "_")
name_to_val["children"] = name_to_val.get("children") or ()
self._register_props(name_to_val)
def _render_element(self):
props: dict[str, tp.Any] = self.props._d
params = props.copy()
if "children" not in varnames:
del params["children"]
# We cannot type this because PropsDict forgets the types
# call the render function
f(self, **params) # type: ignore[reportGeneralTypeIssues]
def __repr__(self):
return f.__name__
ComponentElement.__name__ = f.__name__
return tp.cast(Callable[P, Element], ComponentElement)
@component
def Container(self):
pass
ContextMenuType = tp.Mapping[str, tp.Union[None, tp.Callable[[], tp.Any], "ContextMenuType"]]
def _create_qmenu(menu: ContextMenuType, parent, title: str | None = None):
widget = QtWidgets.QMenu(parent)
if title is not None:
widget.setTitle(title)
for key, value in menu.items():
if isinstance(value, dict):
sub_menu = _create_qmenu(value, widget, key)
widget.addMenu(sub_menu)
elif value is None:
widget.addSeparator()
else:
widget.addAction(key, value)
return widget
_CURSORS = {
"default": QtCore.Qt.CursorShape.ArrowCursor,
"arrow": QtCore.Qt.CursorShape.ArrowCursor,
"pointer": QtCore.Qt.CursorShape.PointingHandCursor,
"grab": QtCore.Qt.CursorShape.OpenHandCursor,
"grabbing": QtCore.Qt.CursorShape.ClosedHandCursor,
"text": QtCore.Qt.CursorShape.IBeamCursor,
"crosshair": QtCore.Qt.CursorShape.CrossCursor,
"move": QtCore.Qt.CursorShape.SizeAllCursor,
"wait": QtCore.Qt.CursorShape.WaitCursor,
"ew-resize": QtCore.Qt.CursorShape.SizeHorCursor,
"ns-resize": QtCore.Qt.CursorShape.SizeVerCursor,
"nesw-resize": QtCore.Qt.CursorShape.SizeBDiagCursor,
"nwse-resize": QtCore.Qt.CursorShape.SizeFDiagCursor,
"not-allowed": QtCore.Qt.CursorShape.ForbiddenCursor,
"forbidden": QtCore.Qt.CursorShape.ForbiddenCursor,
}
def _css_to_number(a):
if not isinstance(a, str):
return a
if a.endswith("px"):
return float(a[:-2])
return float(a)
qtT = tp.TypeVar("qtT", bound=QtWidgetElement)
def qt_component(
f: Callable[
tp.Concatenate[qtT, list[str], Callable[[_T_widget, QtWidgets.QLayout | None], list[CommandType]], P],
list[CommandType],
],
) -> Callable[P, QtWidgetElement[_T_widget]]:
varnames = f.__code__.co_varnames[3:]
signature = inspect.signature(f).parameters
defaults = {k: v.default for k, v in signature.items() if v.default is not inspect.Parameter.empty and k[0] != "_"}
class ComponentElement(QtWidgetElement):
_edifice_original = f
@functools.wraps(f)
def __init__(self, *args: P.args, **kwargs: P.kwargs):
super().__init__()
name_to_val = defaults.copy()
name_to_val.update(filter(not_ignored, zip(varnames, args, strict=False)))
# kwards prefixed with _ are excluded from props
name_to_val.update((k, v) for (k, v) in kwargs.items() if k[0] != "_")
self._register_props(name_to_val)
def _qt_update_commands(
self,
widget_trees: dict[Element, _WidgetTree],
newprops: PropsDict,
) -> list[CommandType]:
props: dict[str, tp.Any] = self.props._d
params = props.copy()
newkeys = list(newprops._d.keys())
def super_commands(underlying: QtWidgets.QWidget, layout: QtWidgets.QLayout | None):
return super(ComponentElement, self)._qt_update_commands_super(
widget_trees,
newprops,
underlying,
layout,
)
me = tp.cast(qtT, self)
return f(me, newkeys, super_commands, **params)
def __repr__(self):
return f.__name__
ComponentElement.__name__ = f.__name__
return tp.cast(Callable[P, QtWidgetElement], ComponentElement)
class _WidgetTree:
"""
A QtWidgetElement and its QtWidgetElement children.
"""
__slots__ = ("children", "component")
def __init__(self, component: QtWidgetElement, children: list[QtWidgetElement]):
self.component: QtWidgetElement = component
self.children: list[QtWidgetElement] = children
def _get_widget_children(widget_trees: dict[Element, _WidgetTree], element: QtWidgetElement) -> list[QtWidgetElement]:
if (w := widget_trees.get(element, None)) is not None:
return w.children
return []
def _dereference_tree(widget_trees: dict[Element, _WidgetTree], widget_tree: _WidgetTree, address: list[int]):
"""
This method used only for testing
"""
for index in address:
widget_tree = widget_trees[widget_tree.children[index]]
return widget_tree
def print_tree(widget_trees: dict[Element, _WidgetTree], element: Element, indent=0):
t = widget_trees[element]
tags = t.component._tags()
if t.children:
print(" " * indent + tags[0]) # noqa: T201
for child in t.children:
print_tree(widget_trees, child, indent=indent + 1)
print(" " * indent + tags[1]) # noqa: T201
else:
print(" " * indent + tags[2]) # noqa: T201
class RenderResult:
"""Encapsulates the results of a render.
Concretely, it stores information such as commands,
which must be executed by the caller.
"""
def __init__(
self,
commands: list[CommandType],
):
self.commands: list[CommandType] = commands
@dataclass
class _HookState:
state: tp.Any
updaters: list[tp.Callable[[tp.Any], tp.Any]]
element: Element
engine: RenderEngine
# Stable setter function will always be the same function
# returned by repeated calls to use_state. So it can be used
# as a stable prop.
def setter(self, updater):
if self.element not in self.engine._hook_state:
# Then the component has been deleted and unmounted.
# This might happen if the setter is called during a
# a use_async CancelledError handler.
# In that case, we don't want to update the state.
return
self.updaters.append(updater)
self.engine._hook_state_setted.add(self.element)
assert self.engine._app is not None
self.engine._app._defer_rerender(self.element)
@dataclass
class _HookEffect:
setup: tp.Callable[[], tp.Callable[[], None] | None] | None
cleanup: tp.Callable[[], None] | None
"""
Cleanup function called on unmount and overwrite
"""
dependencies: tp.Any
@dataclass
class _HookAsync:
task: asyncio.Task[tp.Any] | None
"""
The currently executing async effect task.
"""
queue: list[tp.Callable[[], Coroutine[None, None, None]]]
"""
The queue of waiting async effect tasks. Max length 1.
"""
dependencies: tp.Any
"""
The dependencies of use_async().
"""
def elements_match(a: Element, b: Element) -> bool:
"""
Should return True if element b can be used to update element a
by _update_old_component().
"""
# Elements must be of the same __class__.
# Elements must have the same __class__.__name__. This is to distinguish
# between different @component Components. (Why does class __eq__ return
# True if the class __name__ is different?)
return (a.__class__ == b.__class__) and (a.__class__.__name__ == b.__class__.__name__) and (a._key == b._key)
class RenderEngine:
"""
One RenderEngine instance persists across the life of the App.
"""
__slots__ = (
"_app",
"_component_tree",
"_hook_async",
"_hook_effect",
"_hook_state",
"_hook_state_setted",
"_root",
"_widget_tree",
"is_stopped",
)
def __init__(self, root: Element, app: AppProtocol | None = None):
self._component_tree: dict[Element, list[Element]] = {}
"""
The _component_tree maps an Element to its children.
"""
self._widget_tree: dict[Element, _WidgetTree] = {}
"""
Map of an Element to its rendered widget tree.
TODO This should actually be a map of each QtWidgetElement to its rendered
children, of type dict[QtWidgetElement, list[QtWidgetElement]].
"""
self._root = root
self._app: AppProtocol | None = app
self._hook_state: defaultdict[Element, list[_HookState]] = defaultdict(list)
"""
The per-element hooks for use_state().
"""
self._hook_state_setted: set[Element] = set()
"""
The set of elements which have had their use_state() setters called
since the last render.
"""
self._hook_effect: defaultdict[Element, list[_HookEffect]] = defaultdict(list)
"""
The per-element hooks for use_effect().
"""
self._hook_async: defaultdict[Element, list[_HookAsync]] = defaultdict(list)
"""
The per-element hooks for use_async().
"""
self.is_stopped: bool = False
"""
Flag determining if the render engine has been stopped.
"""
def is_hook_async_done(self, element: Element) -> bool:
"""
True if all of the async hooks for an Element are done.
"""
if element not in self._hook_async:
return True
hooks = self._hook_async[element]
for hook in hooks:
if hook.task is not None:
if not hook.task.done():
return False
return True
def _delete_component(self, component: Element, recursive: bool):
# Delete component from render trees
sub_components = self._component_tree[component]
if recursive:
for sub_comp in sub_components:
self._delete_component(sub_comp, recursive)
# Node deletion
# Clean up use_effect for the component
if component in self._hook_effect:
for hook in self._hook_effect[component]:
if hook.cleanup is not None:
# None indicates that the setup effect failed,
# or that there is no cleanup function.
try:
hook.cleanup()
except Exception: # noqa: S110, BLE001
pass
del self._hook_effect[component]
# Clean up use_async for the component
if component in self._hook_async:
for hook in self._hook_async[component]:
hook.queue.clear()
if hook.task is not None:
# If there are some running tasks, wait until they are
# done and then delete this HookAsync object.
def done_callback(_future_object):
if component in self._hook_async:
if self.is_hook_async_done(component):
del self._hook_async[component]
hook.task.add_done_callback(done_callback)
hook.task.cancel()
if self.is_hook_async_done(component):
# If there are no running tasks, then we can delete this
# HookAsync object immediately.
del self._hook_async[component]
# Clean up use_state for the component
if component in self._hook_state:
del self._hook_state[component]
self._hook_state_setted.discard(component)
# Clean up component references
# Do this after use_effect cleanup, so that the cleanup function
# can still access the component References.
assert component._edifice_internal_references is not None
for ref in component._edifice_internal_references:
ref._value = None
del self._component_tree[component]
del self._widget_tree[component]
def _refresh_by_class(self, classes) -> None:
# This refresh is done only for a hot reload. It refreshes all
# elements which were defined in a module which was changed
# on the filesystem.
if self.is_stopped:
return
# Algorithm:
# 1) Find all old components that's not a child of another component
# components_to_replace is (old_component, new_component_class, parent component, new_component)
components_to_replace = []
# classes should be only ComponentElement, because only ComponentElement can change in user code.
old_components = [cls for cls, _ in classes]
def traverse(comp, parent):
if comp.__class__ in old_components and parent is not None: # We can't replace the unparented root
new_component_class = next(new_cls for old_cls, new_cls in classes if old_cls == comp.__class__)
if new_component_class is None:
raise ValueError(f"Error after updating code: cannot find class {comp.__class__}")
components_to_replace.append([comp, new_component_class, parent, None])
return
sub_components = self._component_tree[comp]
if isinstance(sub_components, list):
for sub_comp in sub_components:
traverse(sub_comp, comp)
else:
traverse(sub_components, comp)
traverse(self._root, None)
# 2) For all such old components, construct a new component and merge in old component props
for parts in components_to_replace:
old_comp, new_comp_class, _, _ = parts
parameters = list(inspect.signature(new_comp_class.__init__).parameters.items())
try:
kwargs = {
k: old_comp.props[k]
for k, v in parameters[1:]
if v.default is inspect.Parameter.empty and k[0] != "_" and k != "kwargs"
}
# We don't actually need all the kwargs, just enough
# to construct new_comp_class.
# The other kwargs will be set with _props.update.
except KeyError as err:
k = None
for k, _ in parameters[1:]:
if k not in old_comp.props:
break
raise ValueError(
f"Error while reloading {old_comp}: New class expects prop ({k}) not present in old class",
) from err
parts[3] = new_comp_class(**kwargs)
parts[3]._props.update(old_comp._props)
parts[3]._key = old_comp._key
# 3) Replace old component in the place in the tree where they first appear, with a reference to new component
backup = {}
for old_comp, _, parent_comp, new_comp in components_to_replace:
parent_comp_children = list(parent_comp.children)
backup[parent_comp] = copy(parent_comp.children)
for i, comp in enumerate(parent_comp.children):
if comp is old_comp:
parent_comp_children[i] = new_comp
# Move the hook states to the new component.
# We want to be careful that the hooks don't have
# any references to the old component, especially
# function closures. I think this code is okay.
#
# During the effect functions and the async coroutine, usually
# what happens is that some use_state setters are called,
# and those use_state setters would be closures on the
# state which was moved, not references to the old_comp.
#
# Because this is only during hot-reload, so only during
# development, it's not catastrophic if some references
# to old_comp are retained and cause bugs.
if old_comp in self._hook_state:
self._hook_state[new_comp] = self._hook_state[old_comp]
del self._hook_state[old_comp]
if old_comp in self._hook_effect:
self._hook_effect[new_comp] = self._hook_effect[old_comp]
del self._hook_effect[old_comp]
if old_comp in self._hook_async:
self._hook_async[new_comp] = self._hook_async[old_comp]
del self._hook_async[old_comp]
parent_comp._props["children"] = tuple(parent_comp_children)
# 5) call _render for all new component parents
try:
logger.info(
"Rerendering parents of: %s",
[new_comp_class.__name__ for _, new_comp_class, _, _ in components_to_replace],
)
logger.info("Rerendering: %s", [parent for _, _, parent, _ in components_to_replace])
self._request_rerender([parent_comp for _, _, parent_comp, _ in components_to_replace])
except Exception:
# Restore components
for parent_comp, backup_val in backup.items():
parent_comp._props["children"] = backup_val
raise
def _update_old_component(
self,
component: Element,
new_component: Element,
render_context: _RenderContext,
) -> _WidgetTree:
# new_component is a new rendering of old component, so update
# old component to have props of new_component.
# The new_component will be discarded.
assert component._edifice_internal_references is not None
assert new_component._edifice_internal_references is not None
newprops = new_component.props
# TODO are we leaking memory by holding onto the old references?
component._edifice_internal_references.update(new_component._edifice_internal_references)
# component needs re-rendering if
# 1) props changed
# 2) state changed
# 3) it has any pending _hook_state updates
# 4) it has any references
if (
component._should_update(newprops)
or len(component._edifice_internal_references) > 0
or component in self._hook_state_setted
):
render_context.mark_props_change(component, newprops)
rerendered_obj = self._render(component, render_context)
render_context.mark_qt_rerender(rerendered_obj.component, True)
return rerendered_obj
render_context.mark_props_change(component, newprops)
return self._widget_tree[component]
def _recycle_children(self, component: QtWidgetElement, render_context: _RenderContext) -> list[Element]:
# Children diffing and reconciliation
#
# Returns element children, which contains all the future children of the component:
# a mixture of old components (if they can be updated) and new ones
#
# Returns children widget trees, cached or newly rendered.
children_old_bykey: dict[str, Element] = {}
children_new_bykey: dict[str, Element] = {}
children_old_ = self._component_tree[component]
widgettree = _WidgetTree(component, [])
# We will mutate children_old to reuse and remove old elements if we can match them.
# Ordering of children_old must be preserved for reverse deletion.
children_old: list[Element] = children_old_[:]
for child_old in children_old:
if child_old._key is not None:
children_old_bykey[child_old._key] = child_old
# We will mutate children_new to replace them with old elements if we can match them.
children_new: list[Element] = list(component.children)
for child_new in children_new:
if child_new._key is not None:
if children_new_bykey.get(child_new._key, None) is not None:
raise ValueError("Duplicate keys found in " + str(component))
children_new_bykey[child_new._key] = child_new
# We will not try to intelligently handle the situation where
# an unkeyed element is added or removed.
# If the elements are unkeyed then try to match them pairwise.
i_old = 0
i_new = 0
while i_new < len(children_new):
child_new = children_new[i_new]
if (key := child_new._key) is not None:
if (child_old_bykey := children_old_bykey.get(key, None)) is not None and elements_match(
child_old_bykey,
child_new,
):
# then we have a match for reusing the old child
child_wtree = self._update_old_component(child_old_bykey, child_new, render_context)
children_new[i_new] = child_old_bykey
widgettree.children.append(child_wtree.component)
render_context.widget_tree[child_wtree.component] = child_wtree
children_old.remove(child_old_bykey)
else:
# new child so render
widgettree.children.append(self._render(child_new, render_context).component)
# this component will need qt rerender
render_context.mark_qt_rerender(component, True)
elif i_old < len(children_old):
child_old = children_old[i_old]
if elements_match(child_old, child_new):
# then we have a match for reusing the old child
child_wtree = self._update_old_component(child_old, child_new, render_context)
children_new[i_new] = child_old
widgettree.children.append(child_wtree.component)
render_context.widget_tree[child_wtree.component] = child_wtree
del children_old[i_old]
else:
# new child so render
widgettree.children.append(self._render(child_new, render_context).component)
# this component will need qt rerender
render_context.mark_qt_rerender(component, True)
# leave this old element to be deleted
i_old += 1
else:
# new child so render
widgettree.children.append(self._render(child_new, render_context).component)
# this component will need qt rerender
render_context.mark_qt_rerender(component, True)
i_new += 1
render_context.enqueued_deletions.extend(children_old)
render_context.component_tree[component] = children_new
render_context.widget_tree[component] = widgettree
return children_new
def _render_base_component(self, component: QtWidgetElement, render_context: _RenderContext) -> _WidgetTree:
if component not in self._component_tree:
# New component, simply render everything
render_context.component_tree[component] = list(component.children)
rendered_children = [self._render(child, render_context) for child in component.children]
widgettree = _WidgetTree(component, [c.component for c in rendered_children])
render_context.widget_tree[component] = widgettree
render_context.mark_qt_rerender(component, True)
return widgettree
# Figure out which children can be re-used
children = self._recycle_children(component, render_context)
props_dict = dict(component.props._items)
props_dict["children"] = tuple(children)
render_context.mark_props_change(component, PropsDict(props_dict))
return render_context.widget_tree[component]
def _render(self, component: Element, render_context: _RenderContext) -> _WidgetTree:
if component in render_context.widget_tree:
return render_context.widget_tree[component]
try:
assert component._edifice_internal_references is not None
for ref in component._edifice_internal_references:
ref._value = component
except TypeError as err:
raise ValueError(
f"{component.__class__} is not correctly initialized. "
"Did you remember to call super().__init__() in the constructor? "
"(alternatively, the register_props decorator will also correctly initialize the component)",
) from err
component._controller = self._app
if isinstance(component, QtWidgetElement):
return self._render_base_component(component, render_context)
# Before the render, set the hooks indices to 0.
component._hook_state_index = 0
component._hook_effect_index = 0
component._hook_async_index = 0
# Record that we are rendering this component with current use_state
self._hook_state_setted.discard(component)
# Call user provided render function and retrieve old results
with Container() as container:
prev_element = render_context.current_element
render_context.current_element = component
sub_component = component._render_element()
render_context.current_element = prev_element
# If the component.render() call evaluates to an Element
# we use that as the sub_component the component renders as.
if sub_component is None:
# If the render() method doesn't render as
# an Element (always the case for @component Components)
# we obtain the rendered sub_component as either:
#
# 1. The only child of the Container wrapping the render, or
# 2. A View element containing the children of the Container
if len(container.children) == 1:
sub_component = container.children[0]
else:
newline = "\n"
message = dedent(
f"""\
A @component must render as exactly one Element.
Element {component} renders as {len(container.children)} elements.""",
) + newline.join([child.__str__() for child in container.children])
raise ValueError(message)
old_rendering: list[Element] | None = self._component_tree.get(component, None)
if old_rendering is not None and elements_match(old_rendering[0], sub_component):
# TODO Why do we set the key of the widget_tree to be a #component
# Element here? This is not used anywhere. This widget_tree[component]
# insertion should not happen. widget_tree key should be a QtWidgetElement.
# See _widget_tree.
render_context.widget_tree[component] = self._update_old_component(
old_rendering[0],
sub_component,
render_context,
)
else:
if old_rendering is not None:
render_context.enqueued_deletions.extend(old_rendering)
render_context.component_tree[component] = [sub_component]
# TODO Why do we set the key of the widget_tree to be a #component
# Element here? This is not used anywhere. This widget_tree[component]
# insertion should not happen. widget_tree key should be a QtWidgetElement.
# See _widget_tree.
render_context.widget_tree[component] = self._render(sub_component, render_context)
return render_context.widget_tree[component]
def gen_qt_commands(self, element: QtWidgetElement, render_context: _RenderContext) -> list[CommandType]:
"""
Recursively generate the update commands for the widget tree.
"""
commands: list[CommandType] = []
if self.is_stopped:
return commands
children = _get_widget_children(render_context.widget_tree, element)
for child in children:
rendered = self.gen_qt_commands(child, render_context)
commands.extend(rendered)
if not render_context.need_rerender(element):
return commands
old_props = render_context.get_old_props(element)
new_props = PropsDict({k: v for k, v in element.props._items if k not in old_props or old_props[k] != v})
# Call user provided render function and retrieve old results
prev_element = render_context.current_element
render_context.current_element = element
commands.extend(element._qt_update_commands(render_context.widget_tree, new_props))
render_context.current_element = prev_element
return commands
def _request_rerender(self, components: list[Element]) -> RenderResult:
if self.is_stopped:
return RenderResult([])
components_ = components[:]
# Before the render, reduce the _hook_state updaters.
# We can't do this after the render, because there may have been state
# updates from event handlers.
for element in self._hook_state_setted:
hooks = self._hook_state[element]
for hook in hooks:
state0 = hook.state
for updater in hook.updaters:
if callable(updater):
hook.state = updater(hook.state)
# We don't catch the state updater exceptions.
# We want the program to crash if state updaters throw.
else:
hook.state = updater
if state0 != hook.state:
# State changed so we need to re-render this component.
if element not in components_:
components_.append(element)
hook.updaters.clear()
all_commands: list[CommandType] = []
# Here is the problem.
# We need to render the child before parent if the child state changed.
# We need to render the parent before child if the child props changed.
# So we do a complete render of each component individually, and then
# we don't have to solve the problem of the order of rendering.
for component in components_:
commands: list[CommandType] = []
render_context = _RenderContext(self)
local_state.render_context = render_context
widget_tree = self._render(component, render_context)
# Generate the update commands from the widget trees
commands.extend(self.gen_qt_commands(widget_tree.component, render_context))
# Update the stored component trees and widget trees
self._component_tree.update(render_context.component_tree)
self._widget_tree.update(render_context.widget_tree)
# This is the phase of the render when the commands run.
for command in commands:
try:
command.fn(*command.args, **command.kwargs)
except Exception: # noqa: PERF203
logger.exception(f"Exception while running command:\n{command}") # noqa: G004
all_commands.extend(commands)
# Delete components that should be deleted (and call the respective unmounts)
for component_delete in render_context.enqueued_deletions:
self._delete_component(component_delete, True)
# after render, call the use_effect setup functions.
# we want to guarantee that elements are fully rendered before
# effects are performed.
for hooks in self._hook_effect.values():
for hook in hooks:
if hook.setup is not None:
if hook.cleanup is not None:
try:
hook.cleanup()
except Exception: # noqa: S110, BLE001
pass
finally:
hook.cleanup = None
try:
hook.cleanup = hook.setup()
except Exception: # noqa: BLE001
hook.cleanup = None
finally:
hook.setup = None
# We return all the commands but that's only needed for testing.
return RenderResult(all_commands)
def use_state(
self,
element: Element,
initial_state: _T_use_state | tp.Callable[[], _T_use_state],
) -> tuple[
_T_use_state, # current value
tp.Callable[[_T_use_state | tp.Callable[[_T_use_state], _T_use_state]], None], # updater
]:
hooks = self._hook_state[element]
h_index = element._hook_state_index
element._hook_state_index += 1
if len(hooks) <= h_index:
# Then this is the first render.
hook = _HookState(
# Call the initializer function if it is a function.
initial_state() if callable(initial_state) else initial_state,
[],
element,
self,
)
if callable(hook.state):
raise ValueError("The state value of use_state cannot be Callable.")
hooks.append(hook)
else:
hook = hooks[h_index]
return (hook.state, hook.setter)
def use_effect(
self,
element: Element,
setup: tp.Callable[[], tp.Callable[[], None] | None],
dependencies: tp.Any = None,
) -> None:
# https://legacy.reactjs.org/docs/hooks-effect.html#example-using-hooks
# effects happen “after render”.
# React guarantees the DOM has been updated by the time it runs the effects.
hooks = self._hook_effect[element]
h_index = element._hook_effect_index
element._hook_effect_index += 1
if len(hooks) <= h_index:
# then this is the first render
hook = _HookEffect(setup, None, dependencies)
hooks.append(hook)
else:
# then this is not the first render
hook = hooks[h_index]
if hook.dependencies is None or hook.dependencies != dependencies:
# deps changed
hook.setup = setup
hook.dependencies = dependencies
def use_async(
self,
element: Element,
fn_coroutine: tp.Callable[[], Coroutine[None, None, None]],
dependencies: tp.Any,
) -> Callable[[], None]:
hooks = self._hook_async[element]
h_index = element._hook_async_index
element._hook_async_index += 1
# When the done_callback is called,
# this component might have already unmounted. In that case
# this done_callback will still be holding a reference to the
# _HookAsync, and the _HookAsync.queue will be cleared.
# After the done_callback is called, the _HookAsync object
# should be garbage collected.
def done_callback(hook: _HookAsync, _task:asyncio.Task[tp.Any]):
if hook.task is not None:
try:
# https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.result
# If there is an exception, retrieve the exception and throw it away.
# Otherwise asyncio complains
# “Task exception was never retrieved”
_r = hook.task.result()
except: # noqa: S110, E722
pass
hook.task = None
if len(hook.queue) > 0:
# There is another async task waiting in the queue
hook.task = asyncio.create_task(hook.queue.pop(0)())
hook.task.add_done_callback(lambda t: done_callback(hook, t))
if len(hooks) <= h_index:
# then this is the first render.
task = asyncio.create_task(fn_coroutine())
hook = _HookAsync(
task=task,
dependencies=dependencies,
queue=[],
)
hooks.append(hook)
task.add_done_callback(lambda t: done_callback(hook, t))
def cancel():
task.cancel()
return cancel
if dependencies != (hook := hooks[h_index]).dependencies:
# then this is not the first render and deps changed
hook.dependencies = dependencies
if hook.task is not None:
# There's already an old async effect in flight, so enqueue
# the new async effect and cancel the old async effect.
# We also want to cancel all of the other async effects
# in the queue, so the queue should have max len 1.
# Maybe queue should be type Optional instead of list? That
# would be weird though.
hook.queue.clear()
hook.queue.append(fn_coroutine)
hook.task.cancel()
else:
hook.task = asyncio.create_task(fn_coroutine())
hook.task.add_done_callback(lambda t: done_callback(hook, t))
def cancel():
if hook.task is not None:
hook.task.cancel()
else:
hook.queue.clear()
return cancel
# not first render, dependencies did not change
hook = hooks[h_index]
def cancel():
if hook.task is not None:
hook.task.cancel()
else:
hook.queue.clear()
return cancel