From e074005ac42aeb9a4f52006c01eace8e31835a3c Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 22 Aug 2024 11:34:49 +0000 Subject: [PATCH] updated to use bluesky-stomp --- .vscode/settings.json | 3 +++ pyproject.toml | 2 +- src/blueapi/cli/cli.py | 5 ++++- src/blueapi/client/client.py | 5 ++++- src/blueapi/client/event_bus.py | 3 ++- src/blueapi/service/interface.py | 20 ++++++++++++-------- tests/service/test_interface.py | 1 - tests/test_cli.py | 1 - 8 files changed, 26 insertions(+), 14 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index c129d991b..276011c2e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,4 +8,7 @@ "[python]": { "editor.defaultFormatter": "charliermarsh.ruff", }, + "python.testing.pytestArgs": [ + "tests" + ], } \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index e9b09b86c..d94b24f3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "aiohttp", "PyYAML", "click", - "fastapi[all]", + "fastapi>=0.112.0", "uvicorn", "requests", "dls-bluesky-core", #requires ophyd-async diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 139fd579c..d0dbaa906 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -7,6 +7,7 @@ import click from bluesky.callbacks.best_effort import BestEffortCallback from bluesky_stomp.messaging import MessageContext, MessagingTemplate +from bluesky_stomp.models import Broker from pydantic import ValidationError from requests.exceptions import ConnectionError @@ -146,7 +147,9 @@ def listen_to_events(obj: dict) -> None: config: ApplicationConfig = obj["config"] if config.stomp is not None: event_bus_client = EventBusClient( - MessagingTemplate.autoconfigured(config.stomp) + MessagingTemplate.for_broker( + broker=Broker(host=config.stomp.host, port=config.stomp.port, auth=None) + ) ) else: raise RuntimeError("Message bus needs to be configured") diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 9cd953632..6331cbe3f 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -2,6 +2,7 @@ from concurrent.futures import Future from bluesky_stomp.messaging import MessageContext, MessagingTemplate +from bluesky_stomp.models import Broker from blueapi.config import ApplicationConfig from blueapi.core.bluesky_types import DataEvent @@ -39,7 +40,9 @@ def __init__( def from_config(cls, config: ApplicationConfig) -> "BlueapiClient": rest = BlueapiRestClient(config.api) if config.stomp is not None: - template = MessagingTemplate.autoconfigured(config.stomp) + template = MessagingTemplate.for_broker( + broker=Broker(host=config.stomp.host, port=config.stomp.port, auth=None) + ) events = EventBusClient(template) else: events = None diff --git a/src/blueapi/client/event_bus.py b/src/blueapi/client/event_bus.py index b6a2c2e7f..d3e6b17e8 100644 --- a/src/blueapi/client/event_bus.py +++ b/src/blueapi/client/event_bus.py @@ -1,6 +1,7 @@ from collections.abc import Callable from bluesky_stomp.messaging import MessageContext, MessagingTemplate +from bluesky_stomp.models import MessageQueue from blueapi.core import DataEvent from blueapi.worker import ProgressEvent, WorkerEvent @@ -33,7 +34,7 @@ def subscribe_to_all_events( ) -> None: try: self.app.subscribe( - self.app.destinations.topic("public.worker.event"), + MessageQueue(name="public.worker.event"), on_event, ) except Exception as err: diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 5962be67e..b4fbf2ae8 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -4,7 +4,7 @@ from typing import Any from bluesky_stomp.messaging import MessagingTemplate -from stomp import ConnectFailedException +from bluesky_stomp.models import Broker, DestinationBase, MessageQueue from blueapi.config import ApplicationConfig from blueapi.core.context import BlueskyContext @@ -52,10 +52,12 @@ def worker() -> TaskWorker: def messaging_template() -> MessagingTemplate | None: stomp_config = config().stomp if stomp_config is not None: - template = MessagingTemplate.autoconfigured(stomp_config) + template = MessagingTemplate.for_broker( + broker=Broker(host=stomp_config.host, port=stomp_config.port, auth=None) + ) task_worker = worker() - event_topic = template.destinations.topic("public.worker.event") + event_topic = MessageQueue(name="public.worker.event") _publish_event_streams( { @@ -67,7 +69,7 @@ def messaging_template() -> MessagingTemplate | None: try: template.connect() return template - except ConnectFailedException as ex: + except Exception as ex: logging.exception(msg="Failed to connect to message bus", exc_info=ex) return None else: @@ -95,15 +97,17 @@ def teardown() -> None: messaging_template.cache_clear() -def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -> None: +def _publish_event_streams( + streams_to_destinations: Mapping[EventStream, DestinationBase], +) -> None: for stream, destination in streams_to_destinations.items(): _publish_event_stream(stream, destination) -def _publish_event_stream(stream: EventStream, destination: str) -> None: +def _publish_event_stream(stream: EventStream, destination: DestinationBase) -> None: def forward_message(event: Any, correlation_id: str | None) -> None: if (template := messaging_template()) is not None: - template.send(destination, event, None, correlation_id) + template.send(destination, event, None, correlation_id=correlation_id) stream.subscribe(forward_message) @@ -160,7 +164,7 @@ def get_worker_state() -> WorkerState: return worker().state -def pause_worker(defer: bool | None) -> None: +def pause_worker(defer: bool) -> None: """Command the worker to pause""" worker().pause(defer) diff --git a/tests/service/test_interface.py b/tests/service/test_interface.py index 6a08eafc2..4454a21d3 100644 --- a/tests/service/test_interface.py +++ b/tests/service/test_interface.py @@ -269,7 +269,6 @@ def test_get_task_by_id(context_mock: MagicMock): ) -@pytest.mark.stomp def test_stomp_config(): interface.set_config(ApplicationConfig(stomp=StompConfig())) assert interface.messaging_template() is not None diff --git a/tests/test_cli.py b/tests/test_cli.py index ab725c005..ab2e38622 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -136,7 +136,6 @@ def test_cannot_run_plans_without_stomp_config(runner: CliRunner): ) -@pytest.mark.stomp def test_valid_stomp_config_for_listener(runner: CliRunner): result = runner.invoke( main,