diff --git a/pycrdt_websocket/yroom.py b/pycrdt_websocket/yroom.py index a2a8b3d..f745c03 100644 --- a/pycrdt_websocket/yroom.py +++ b/pycrdt_websocket/yroom.py @@ -134,8 +134,10 @@ def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None): self._on_message = value async def _broadcast_updates(self): - if self.ystore is not None and not self.ystore.started.is_set(): - self._task_group.start_soon(self.ystore.start) + if self.ystore is not None: + async with self.ystore.start_lock: + if not self.ystore.started.is_set(): + await self._task_group.start(self.ystore.start) async with self._update_receive_stream: async for update in self._update_receive_stream: diff --git a/pycrdt_websocket/ystore.py b/pycrdt_websocket/ystore.py index 1bce41e..8660c77 100644 --- a/pycrdt_websocket/ystore.py +++ b/pycrdt_websocket/ystore.py @@ -30,7 +30,8 @@ class BaseYStore(ABC): _started: Event | None = None _stopped: Event | None = None _task_group: TaskGroup | None = None - __start_lock: Lock | None = None + _public_start_lock: Lock | None = None + _private_start_lock: Lock | None = None @abstractmethod def __init__( @@ -57,11 +58,17 @@ def stopped(self) -> Event: self._stopped = Event() return self._stopped + @property + def start_lock(self) -> Lock: + if self._public_start_lock is None: + self._public_start_lock = Lock() + return self._public_start_lock + @property def _start_lock(self) -> Lock: - if self.__start_lock is None: - self.__start_lock = Lock() - return self.__start_lock + if self._private_start_lock is None: + self._private_start_lock = Lock() + return self._private_start_lock async def __aenter__(self) -> BaseYStore: async with self._start_lock: