Skip to content

Commit

Permalink
Make setup and cleanup non-async.
Browse files Browse the repository at this point in the history
  • Loading branch information
SyntaxColoring committed Aug 20, 2024
1 parent 514e81d commit 96e8632
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 23 deletions.
2 changes: 1 addition & 1 deletion robot-server/robot_server/app_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def on_startup() -> None:
fbl_mark_persistence_init_complete
],
)
await initialize_notifications(
initialize_notifications(
app_state=app.state,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .publisher_notifier import initialize_pe_publisher_notifier


async def initialize_notifications(app_state: AppState) -> None:
def initialize_notifications(app_state: AppState) -> None:
"""Initialize the notification system for the given app state."""
initialize_notification_client(app_state)
await initialize_pe_publisher_notifier(app_state)
initialize_pe_publisher_notifier(app_state)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import random
import logging
import paho.mqtt.client as mqtt
from anyio import to_thread
from fastapi import Depends
from typing import Annotated, Any, Dict, Optional
from enum import Enum
Expand Down Expand Up @@ -77,10 +76,10 @@ def connect(self) -> None:
)
self._client.loop_start()

async def disconnect(self) -> None:
def disconnect(self) -> None:
"""Disconnect the client from the MQTT broker."""
self._client.loop_stop()
await to_thread.run_sync(self._client.disconnect)
self._client.disconnect()

def publish_advise_refetch(
self,
Expand Down Expand Up @@ -183,6 +182,8 @@ def initialize_notification_client(app_state: AppState) -> None:
)


# todo(mm, 2024-08-20): When ASGI app teardown no longer uses asyncio.gather(),
# this can be non-async.
async def clean_up_notification_client(app_state: AppState) -> None:
"""Clean up the `NotificationClient` stored on `app_state`.
Expand All @@ -193,7 +194,7 @@ async def clean_up_notification_client(app_state: AppState) -> None:
] = _notification_client_accessor.get_from(app_state)

if notification_client is not None:
await notification_client.disconnect()
notification_client.disconnect()


def get_notification_client(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ def register_publish_callbacks(
"""Extend the list of callbacks with a given list of callbacks."""
self._callbacks.extend(callbacks)

async def _initialize(self) -> None:
def _initialize(self) -> None:
"""Initializes an instance of PublisherNotifier. This method should only be called once."""
# fixme(mm, 2024-08-20): This task currently leaks; this class needs a close()
# method or something. This gets easier when app_setup.py switches to using a
# context manager for ASGI app setup and teardown.
self._notifier = asyncio.create_task(self._wait_for_event())

def _notify_publishers(self) -> None:
Expand Down Expand Up @@ -67,7 +70,7 @@ def get_pe_notify_publishers(
return publisher_notifier._notify_publishers


async def initialize_pe_publisher_notifier(app_state: AppState) -> None:
def initialize_pe_publisher_notifier(app_state: AppState) -> None:
"""Create a new `NotificationClient` and store it on `app_state`.
Intended to be called just once, when the server starts up.
Expand All @@ -77,4 +80,4 @@ async def initialize_pe_publisher_notifier(app_state: AppState) -> None:
)
_pe_publisher_notifier_accessor.set_on(app_state, publisher_notifier)

await publisher_notifier._initialize()
publisher_notifier._initialize()
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@
)


async def test_initialize() -> None:
"""It should create a new task."""
publisher_notifier = PublisherNotifier(ChangeNotifier())

await publisher_notifier._initialize()

assert asyncio.get_running_loop()


def test_notify_publishers() -> None:
"""Invoke the change notifier's notify method."""
change_notifier = MagicMock()
Expand Down Expand Up @@ -65,11 +56,9 @@ async def trigger_callbacks() -> None:
await asyncio.sleep(0.1)
change_notifier.notify()

task = asyncio.create_task(publisher_notifier._initialize())
publisher_notifier._initialize()

await asyncio.gather(trigger_callbacks(), task)
await asyncio.gather(trigger_callbacks())

assert callback_called
assert callback_2_called

task.cancel()

0 comments on commit 96e8632

Please sign in to comment.