From ea80fff1e834e54b4eda25a323f570426b6fbe87 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 6 May 2024 09:39:37 +0200 Subject: [PATCH 1/2] Wait for ystore to be started --- pycrdt_websocket/yroom.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pycrdt_websocket/yroom.py b/pycrdt_websocket/yroom.py index a2a8b3d..e087813 100644 --- a/pycrdt_websocket/yroom.py +++ b/pycrdt_websocket/yroom.py @@ -135,7 +135,7 @@ def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None): async def _broadcast_updates(self): if self.ystore is not None and not self.ystore.started.is_set(): - self._task_group.start_soon(self.ystore.start) + await self._task_group.start(self.ystore.start) async with self._update_receive_stream: async for update in self._update_receive_stream: From 798120b2b890999f6d483d543e744b42c5590f61 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Tue, 7 May 2024 11:12:55 +0200 Subject: [PATCH 2/2] Add YStore start_lock --- pycrdt_websocket/yroom.py | 6 ++++-- pycrdt_websocket/ystore.py | 15 +++++++++++---- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pycrdt_websocket/yroom.py b/pycrdt_websocket/yroom.py index e087813..f745c03 100644 --- a/pycrdt_websocket/yroom.py +++ b/pycrdt_websocket/yroom.py @@ -134,8 +134,10 @@ def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None): self._on_message = value async def _broadcast_updates(self): - if self.ystore is not None and not self.ystore.started.is_set(): - await self._task_group.start(self.ystore.start) + if self.ystore is not None: + async with self.ystore.start_lock: + if not self.ystore.started.is_set(): + await self._task_group.start(self.ystore.start) async with self._update_receive_stream: async for update in self._update_receive_stream: diff --git a/pycrdt_websocket/ystore.py b/pycrdt_websocket/ystore.py index 1bce41e..8660c77 100644 --- a/pycrdt_websocket/ystore.py +++ b/pycrdt_websocket/ystore.py @@ -30,7 +30,8 @@ class BaseYStore(ABC): _started: Event | None = None _stopped: Event | None = None _task_group: TaskGroup | None = None - __start_lock: Lock | None = None + _public_start_lock: Lock | None = None + _private_start_lock: Lock | None = None @abstractmethod def __init__( @@ -57,11 +58,17 @@ def stopped(self) -> Event: self._stopped = Event() return self._stopped + @property + def start_lock(self) -> Lock: + if self._public_start_lock is None: + self._public_start_lock = Lock() + return self._public_start_lock + @property def _start_lock(self) -> Lock: - if self.__start_lock is None: - self.__start_lock = Lock() - return self.__start_lock + if self._private_start_lock is None: + self._private_start_lock = Lock() + return self._private_start_lock async def __aenter__(self) -> BaseYStore: async with self._start_lock: