-
-
Notifications
You must be signed in to change notification settings - Fork 317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
messenger pattern #1084
base: main
Are you sure you want to change the base?
messenger pattern #1084
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
from __future__ import annotations | ||
|
||
from collections.abc import AsyncIterator, Awaitable | ||
from typing import Callable | ||
|
||
from anyio import Event, create_memory_object_stream, create_task_group | ||
from anyio.abc import ObjectReceiveStream, ObjectSendStream | ||
|
||
from reactpy.core.types import Message | ||
|
||
_MessageStream = tuple[ObjectSendStream[Message], ObjectReceiveStream[Message]] | ||
|
||
|
||
class Messenger: | ||
"""A messenger for sending and receiving messages""" | ||
|
||
def __init__(self) -> None: | ||
self._task_group = create_task_group() | ||
self._streams: dict[str, list[_MessageStream]] = {} | ||
self._recv_began: dict[str, Event] = {} | ||
|
||
def start_producer(self, producer: Callable[[], AsyncIterator[Message]]) -> None: | ||
"""Add a message producer""" | ||
|
||
async def _producer() -> None: | ||
async for message in producer(): | ||
await self.send(message) | ||
|
||
self._task_group.start_soon(_producer) | ||
|
||
def start_consumer( | ||
self, message_type: str, consumer: Callable[[Message], Awaitable[None]] | ||
) -> None: | ||
"""Add a message consumer""" | ||
|
||
async def _consumer() -> None: | ||
async for message in self.receive(message_type): | ||
self._task_group.start_soon(consumer, message) | ||
|
||
self._task_group.start_soon(_consumer) | ||
|
||
async def send(self, message: Message) -> None: | ||
"""Send a message to all consumers of the message type""" | ||
for send, _ in self._streams.get(message["type"], []): | ||
await send.send(message) | ||
|
||
async def receive(self, message_type: str) -> AsyncIterator[Message]: | ||
"""Receive messages of a given type""" | ||
send, recv = create_memory_object_stream() | ||
self._streams.setdefault(message_type, []).append((send, recv)) | ||
async with recv: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would |
||
async with send: | ||
async for message in recv: | ||
yield message | ||
|
||
async def __aenter__(self) -> Messenger: | ||
await self._task_group.__aenter__() | ||
return self | ||
|
||
async def __aexit__(self, *args) -> None: | ||
await self._task_group.__aexit__(*args) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,12 @@ | |
import asyncio | ||
import json | ||
import logging | ||
from collections.abc import Awaitable | ||
from collections.abc import AsyncIterator, Awaitable | ||
from dataclasses import dataclass | ||
from typing import Any, Callable | ||
|
||
from py.reactpy.reactpy.backend.hooks import MessengerContext | ||
from py.reactpy.reactpy.core.serve import Messenger | ||
from starlette.applications import Starlette | ||
from starlette.middleware.cors import CORSMiddleware | ||
from starlette.requests import Request | ||
|
@@ -141,6 +143,25 @@ async def model_stream(socket: WebSocket) -> None: | |
pathname = pathname[len(options.url_prefix) :] or "/" | ||
search = socket.scope["query_string"].decode() | ||
|
||
async with Messenger() as msgr: | ||
async with Layout( | ||
MessengerContext( | ||
ConnectionContext( | ||
Comment on lines
+148
to
+149
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can consider merging everything into a |
||
component(), | ||
value=Connection( | ||
scope=socket.scope, | ||
location=Location(pathname, f"?{search}" if search else ""), | ||
carrier=socket, | ||
), | ||
), | ||
value=msgr, | ||
) | ||
) as layout: | ||
msgr.start_consumer("layout-event", layout.deliver) | ||
msgr.start_producer(layout.renders) | ||
msgr.start_consumer("layout-update", send) | ||
msgr.start_producer(recv) | ||
|
||
try: | ||
await serve_layout( | ||
Layout( | ||
|
@@ -166,7 +187,8 @@ def _make_send_recv_callbacks( | |
async def sock_send(value: Any) -> None: | ||
await socket.send_text(json.dumps(value)) | ||
|
||
async def sock_recv() -> Any: | ||
return json.loads(await socket.receive_text()) | ||
async def sock_recv() -> AsyncIterator[Any]: | ||
while True: | ||
yield json.loads(await socket.receive_text()) | ||
|
||
return sock_send, sock_recv |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
|
||
import sys | ||
from collections import namedtuple | ||
from collections.abc import Mapping, Sequence | ||
from collections.abc import AsyncIterator, Mapping, Sequence | ||
from types import TracebackType | ||
from typing import ( | ||
TYPE_CHECKING, | ||
|
@@ -73,6 +73,9 @@ class LayoutType(Protocol[_Render, _Event]): | |
async def render(self) -> _Render: | ||
"""Render an update to a view""" | ||
|
||
async def renders(self) -> AsyncIterator[_Render]: | ||
"""Render a series of updates to a view""" | ||
|
||
Comment on lines
+76
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not have Less APIs is generally more intuitive than more. |
||
async def deliver(self, event: _Event) -> None: | ||
"""Relay an event to its respective handler""" | ||
|
||
|
@@ -213,7 +216,14 @@ def __call__( | |
... | ||
|
||
|
||
class LayoutUpdateMessage(TypedDict): | ||
class Message(TypedDict): | ||
"""Base class for all messages""" | ||
|
||
type: str | ||
"""The type of message""" | ||
|
||
|
||
class LayoutUpdateMessage(Message): | ||
"""A message describing an update to a layout""" | ||
|
||
type: Literal["layout-update"] | ||
|
@@ -224,7 +234,7 @@ class LayoutUpdateMessage(TypedDict): | |
"""The model to assign at the given JSON Pointer path""" | ||
|
||
|
||
class LayoutEventMessage(TypedDict): | ||
class LayoutEventMessage(Message): | ||
"""Message describing an event originating from an element in the layout""" | ||
|
||
type: Literal["layout-event"] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These need to return callbacks to stop the consumer/producer - this will work well with
use_effect
.