Skip to content

Commit

Permalink
updated to use bluesky-stomp
Browse files Browse the repository at this point in the history
  • Loading branch information
ZohebShaikh committed Aug 23, 2024
1 parent 5dded50 commit e074005
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
},
"python.testing.pytestArgs": [
"tests"
],
}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dependencies = [
"aiohttp",
"PyYAML",
"click",
"fastapi[all]",
"fastapi>=0.112.0",
"uvicorn",
"requests",
"dls-bluesky-core", #requires ophyd-async
Expand Down
5 changes: 4 additions & 1 deletion src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import click
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky_stomp.messaging import MessageContext, MessagingTemplate
from bluesky_stomp.models import Broker
from pydantic import ValidationError
from requests.exceptions import ConnectionError

Expand Down Expand Up @@ -146,7 +147,9 @@ def listen_to_events(obj: dict) -> None:
config: ApplicationConfig = obj["config"]
if config.stomp is not None:
event_bus_client = EventBusClient(
MessagingTemplate.autoconfigured(config.stomp)
MessagingTemplate.for_broker(
broker=Broker(host=config.stomp.host, port=config.stomp.port, auth=None)
)
)
else:
raise RuntimeError("Message bus needs to be configured")
Expand Down
5 changes: 4 additions & 1 deletion src/blueapi/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from concurrent.futures import Future

from bluesky_stomp.messaging import MessageContext, MessagingTemplate
from bluesky_stomp.models import Broker

from blueapi.config import ApplicationConfig
from blueapi.core.bluesky_types import DataEvent
Expand Down Expand Up @@ -39,7 +40,9 @@ def __init__(
def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
rest = BlueapiRestClient(config.api)
if config.stomp is not None:
template = MessagingTemplate.autoconfigured(config.stomp)
template = MessagingTemplate.for_broker(
broker=Broker(host=config.stomp.host, port=config.stomp.port, auth=None)
)
events = EventBusClient(template)
else:
events = None
Expand Down
3 changes: 2 additions & 1 deletion src/blueapi/client/event_bus.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections.abc import Callable

from bluesky_stomp.messaging import MessageContext, MessagingTemplate
from bluesky_stomp.models import MessageQueue

from blueapi.core import DataEvent
from blueapi.worker import ProgressEvent, WorkerEvent
Expand Down Expand Up @@ -33,7 +34,7 @@ def subscribe_to_all_events(
) -> None:
try:
self.app.subscribe(
self.app.destinations.topic("public.worker.event"),
MessageQueue(name="public.worker.event"),
on_event,
)
except Exception as err:
Expand Down
20 changes: 12 additions & 8 deletions src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any

from bluesky_stomp.messaging import MessagingTemplate
from stomp import ConnectFailedException
from bluesky_stomp.models import Broker, DestinationBase, MessageQueue

from blueapi.config import ApplicationConfig
from blueapi.core.context import BlueskyContext
Expand Down Expand Up @@ -52,10 +52,12 @@ def worker() -> TaskWorker:
def messaging_template() -> MessagingTemplate | None:
stomp_config = config().stomp
if stomp_config is not None:
template = MessagingTemplate.autoconfigured(stomp_config)
template = MessagingTemplate.for_broker(
broker=Broker(host=stomp_config.host, port=stomp_config.port, auth=None)
)

task_worker = worker()
event_topic = template.destinations.topic("public.worker.event")
event_topic = MessageQueue(name="public.worker.event")

_publish_event_streams(
{
Expand All @@ -67,7 +69,7 @@ def messaging_template() -> MessagingTemplate | None:
try:
template.connect()
return template
except ConnectFailedException as ex:
except Exception as ex:
logging.exception(msg="Failed to connect to message bus", exc_info=ex)
return None
else:
Expand Down Expand Up @@ -95,15 +97,17 @@ def teardown() -> None:
messaging_template.cache_clear()


def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -> None:
def _publish_event_streams(
streams_to_destinations: Mapping[EventStream, DestinationBase],
) -> None:
for stream, destination in streams_to_destinations.items():
_publish_event_stream(stream, destination)


def _publish_event_stream(stream: EventStream, destination: str) -> None:
def _publish_event_stream(stream: EventStream, destination: DestinationBase) -> None:
def forward_message(event: Any, correlation_id: str | None) -> None:
if (template := messaging_template()) is not None:
template.send(destination, event, None, correlation_id)
template.send(destination, event, None, correlation_id=correlation_id)

stream.subscribe(forward_message)

Expand Down Expand Up @@ -160,7 +164,7 @@ def get_worker_state() -> WorkerState:
return worker().state


def pause_worker(defer: bool | None) -> None:
def pause_worker(defer: bool) -> None:
"""Command the worker to pause"""
worker().pause(defer)

Expand Down
1 change: 0 additions & 1 deletion tests/service/test_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ def test_get_task_by_id(context_mock: MagicMock):
)


@pytest.mark.stomp
def test_stomp_config():
interface.set_config(ApplicationConfig(stomp=StompConfig()))
assert interface.messaging_template() is not None
1 change: 0 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def test_cannot_run_plans_without_stomp_config(runner: CliRunner):
)


@pytest.mark.stomp
def test_valid_stomp_config_for_listener(runner: CliRunner):
result = runner.invoke(
main,
Expand Down

0 comments on commit e074005

Please sign in to comment.