diff --git a/src/py/reactpy/reactpy/backend/hooks.py b/src/py/reactpy/reactpy/backend/hooks.py index 19ad114ed..7918e87c4 100644 --- a/src/py/reactpy/reactpy/backend/hooks.py +++ b/src/py/reactpy/reactpy/backend/hooks.py @@ -3,12 +3,15 @@ from collections.abc import MutableMapping from typing import Any +from reactpy.backend.messenger import Messenger from reactpy.backend.types import Connection, Location from reactpy.core.hooks import Context, create_context, use_context # backend implementations should establish this context at the root of an app ConnectionContext: Context[Connection[Any] | None] = create_context(None) +MessengerContext: Context[Messenger | None] = create_context(None) + def use_connection() -> Connection[Any]: """Get the current :class:`~reactpy.backend.types.Connection`.""" @@ -27,3 +30,12 @@ def use_scope() -> MutableMapping[str, Any]: def use_location() -> Location: """Get the current :class:`~reactpy.backend.types.Connection`'s location.""" return use_connection().location + + +def use_messenger() -> Messenger: + """Get the current :class:`~reactpy.core.serve.Messenger`.""" + messenger = use_context(MessengerContext) + if messenger is None: # nocov + msg = "No backend established a messenger." + raise RuntimeError(msg) + return messenger diff --git a/src/py/reactpy/reactpy/backend/messenger.py b/src/py/reactpy/reactpy/backend/messenger.py new file mode 100644 index 000000000..4d288878c --- /dev/null +++ b/src/py/reactpy/reactpy/backend/messenger.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator, Awaitable +from typing import Callable + +from anyio import Event, create_memory_object_stream, create_task_group +from anyio.abc import ObjectReceiveStream, ObjectSendStream + +from reactpy.core.types import Message + +_MessageStream = tuple[ObjectSendStream[Message], ObjectReceiveStream[Message]] + + +class Messenger: + """A messenger for sending and receiving messages""" + + def __init__(self) -> None: + self._task_group = create_task_group() + self._streams: dict[str, list[_MessageStream]] = {} + self._recv_began: dict[str, Event] = {} + + def start_producer(self, producer: Callable[[], AsyncIterator[Message]]) -> None: + """Add a message producer""" + + async def _producer() -> None: + async for message in producer(): + await self.send(message) + + self._task_group.start_soon(_producer) + + def start_consumer( + self, message_type: str, consumer: Callable[[Message], Awaitable[None]] + ) -> None: + """Add a message consumer""" + + async def _consumer() -> None: + async for message in self.receive(message_type): + self._task_group.start_soon(consumer, message) + + self._task_group.start_soon(_consumer) + + async def send(self, message: Message) -> None: + """Send a message to all consumers of the message type""" + for send, _ in self._streams.get(message["type"], []): + await send.send(message) + + async def receive(self, message_type: str) -> AsyncIterator[Message]: + """Receive messages of a given type""" + send, recv = create_memory_object_stream() + self._streams.setdefault(message_type, []).append((send, recv)) + async with recv: + async with send: + async for message in recv: + yield message + + async def __aenter__(self) -> Messenger: + await self._task_group.__aenter__() + return self + + async def __aexit__(self, *args) -> None: + await self._task_group.__aexit__(*args) diff --git a/src/py/reactpy/reactpy/backend/starlette.py b/src/py/reactpy/reactpy/backend/starlette.py index 3a9695b33..9988789ff 100644 --- a/src/py/reactpy/reactpy/backend/starlette.py +++ b/src/py/reactpy/reactpy/backend/starlette.py @@ -3,10 +3,12 @@ import asyncio import json import logging -from collections.abc import Awaitable +from collections.abc import AsyncIterator, Awaitable from dataclasses import dataclass from typing import Any, Callable +from py.reactpy.reactpy.backend.hooks import MessengerContext +from py.reactpy.reactpy.core.serve import Messenger from starlette.applications import Starlette from starlette.middleware.cors import CORSMiddleware from starlette.requests import Request @@ -141,6 +143,25 @@ async def model_stream(socket: WebSocket) -> None: pathname = pathname[len(options.url_prefix) :] or "/" search = socket.scope["query_string"].decode() + async with Messenger() as msgr: + async with Layout( + MessengerContext( + ConnectionContext( + component(), + value=Connection( + scope=socket.scope, + location=Location(pathname, f"?{search}" if search else ""), + carrier=socket, + ), + ), + value=msgr, + ) + ) as layout: + msgr.start_consumer("layout-event", layout.deliver) + msgr.start_producer(layout.renders) + msgr.start_consumer("layout-update", send) + msgr.start_producer(recv) + try: await serve_layout( Layout( @@ -166,7 +187,8 @@ def _make_send_recv_callbacks( async def sock_send(value: Any) -> None: await socket.send_text(json.dumps(value)) - async def sock_recv() -> Any: - return json.loads(await socket.receive_text()) + async def sock_recv() -> AsyncIterator[Any]: + while True: + yield json.loads(await socket.receive_text()) return sock_send, sock_recv diff --git a/src/py/reactpy/reactpy/core/events.py b/src/py/reactpy/reactpy/core/events.py index cd5de3228..8d63a68f0 100644 --- a/src/py/reactpy/reactpy/core/events.py +++ b/src/py/reactpy/reactpy/core/events.py @@ -2,7 +2,7 @@ import asyncio from collections.abc import Sequence -from typing import Any, Callable, Literal, overload +from typing import Any, Callable, Literal, Optional, overload from anyio import create_task_group diff --git a/src/py/reactpy/reactpy/core/layout.py b/src/py/reactpy/reactpy/core/layout.py index df24a9a0a..30420a86a 100644 --- a/src/py/reactpy/reactpy/core/layout.py +++ b/src/py/reactpy/reactpy/core/layout.py @@ -3,7 +3,7 @@ import abc import asyncio from collections import Counter -from collections.abc import Iterator +from collections.abc import AsyncIterator, Iterator from contextlib import ExitStack from logging import getLogger from typing import ( @@ -99,6 +99,11 @@ async def deliver(self, event: LayoutEventMessage) -> None: "does not exist or its component unmounted" ) + async def renders(self) -> AsyncIterator[LayoutUpdateMessage]: + """Yield all available renders""" + while True: + yield await self.render() + async def render(self) -> LayoutUpdateMessage: """Await the next available render. This will block until a component is updated""" while True: diff --git a/src/py/reactpy/reactpy/core/serve.py b/src/py/reactpy/reactpy/core/serve.py index 3a530e854..0efd1d4ef 100644 --- a/src/py/reactpy/reactpy/core/serve.py +++ b/src/py/reactpy/reactpy/core/serve.py @@ -1,14 +1,20 @@ from __future__ import annotations -from collections.abc import Awaitable +import warnings +from collections.abc import AsyncIterator, Awaitable from logging import getLogger from typing import Callable -from anyio import create_task_group -from anyio.abc import TaskGroup +from anyio import Event, create_memory_object_stream, create_task_group +from anyio.abc import ObjectReceiveStream, ObjectSendStream, TaskGroup from reactpy.config import REACTPY_DEBUG_MODE -from reactpy.core.types import LayoutEventMessage, LayoutType, LayoutUpdateMessage +from reactpy.core.types import ( + LayoutEventMessage, + LayoutType, + LayoutUpdateMessage, + Message, +) logger = getLogger(__name__) @@ -37,6 +43,11 @@ async def serve_layout( recv: RecvCoroutine, ) -> None: """Run a dispatch loop for a single view instance""" + warnings.warn( + "serve_layout is deprecated. Use a Messenger object instead.", + DeprecationWarning, + stacklevel=2, + ) async with layout: try: async with create_task_group() as task_group: diff --git a/src/py/reactpy/reactpy/core/types.py b/src/py/reactpy/reactpy/core/types.py index 45f300f4f..419208a26 100644 --- a/src/py/reactpy/reactpy/core/types.py +++ b/src/py/reactpy/reactpy/core/types.py @@ -2,7 +2,7 @@ import sys from collections import namedtuple -from collections.abc import Mapping, Sequence +from collections.abc import AsyncIterator, Mapping, Sequence from types import TracebackType from typing import ( TYPE_CHECKING, @@ -73,6 +73,9 @@ class LayoutType(Protocol[_Render, _Event]): async def render(self) -> _Render: """Render an update to a view""" + async def renders(self) -> AsyncIterator[_Render]: + """Render a series of updates to a view""" + async def deliver(self, event: _Event) -> None: """Relay an event to its respective handler""" @@ -213,7 +216,14 @@ def __call__( ... -class LayoutUpdateMessage(TypedDict): +class Message(TypedDict): + """Base class for all messages""" + + type: str + """The type of message""" + + +class LayoutUpdateMessage(Message): """A message describing an update to a layout""" type: Literal["layout-update"] @@ -224,7 +234,7 @@ class LayoutUpdateMessage(TypedDict): """The model to assign at the given JSON Pointer path""" -class LayoutEventMessage(TypedDict): +class LayoutEventMessage(Message): """Message describing an event originating from an element in the layout""" type: Literal["layout-event"]