From 017b3aee4acd07b301e969cd598e081f043c26e6 Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Tue, 6 Aug 2024 16:31:42 +0100 Subject: [PATCH 01/22] Replace references to blueapi messaging package with references to the bluesky stomp library --- src/blueapi/cli/cli.py | 5 +- src/blueapi/client/client.py | 5 +- src/blueapi/client/event_bus.py | 3 +- src/blueapi/messaging/__init__.py | 12 -- src/blueapi/messaging/base.py | 196 ------------------- src/blueapi/messaging/context.py | 12 -- src/blueapi/messaging/stomptemplate.py | 249 ------------------------ src/blueapi/messaging/utils.py | 27 --- src/blueapi/service/interface.py | 6 +- tests/client/test_client.py | 2 +- tests/client/test_event_bus.py | 2 +- tests/conftest.py | 21 -- tests/messaging/__init__.py | 0 tests/messaging/test_stomptemplate.py | 256 ------------------------- tests/messaging/test_utils.py | 34 ---- 15 files changed, 12 insertions(+), 818 deletions(-) delete mode 100644 src/blueapi/messaging/__init__.py delete mode 100644 src/blueapi/messaging/base.py delete mode 100644 src/blueapi/messaging/context.py delete mode 100644 src/blueapi/messaging/stomptemplate.py delete mode 100644 src/blueapi/messaging/utils.py delete mode 100644 tests/messaging/__init__.py delete mode 100644 tests/messaging/test_stomptemplate.py delete mode 100644 tests/messaging/test_utils.py diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 199fe33f5..139fd579c 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -6,6 +6,7 @@ import click from bluesky.callbacks.best_effort import BestEffortCallback +from bluesky_stomp.messaging import MessageContext, MessagingTemplate from pydantic import ValidationError from requests.exceptions import ConnectionError @@ -16,8 +17,6 @@ from blueapi.client.rest import BlueskyRemoteControlError from blueapi.config import ApplicationConfig, ConfigLoader from blueapi.core import DataEvent -from blueapi.messaging import MessageContext -from blueapi.messaging.stomptemplate import StompMessagingTemplate from blueapi.service.main import start from blueapi.service.openapi import ( DOCS_SCHEMA_LOCATION, @@ -147,7 +146,7 @@ def listen_to_events(obj: dict) -> None: config: ApplicationConfig = obj["config"] if config.stomp is not None: event_bus_client = EventBusClient( - StompMessagingTemplate.autoconfigured(config.stomp) + MessagingTemplate.autoconfigured(config.stomp) ) else: raise RuntimeError("Message bus needs to be configured") diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 859491bd2..9cd953632 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -1,9 +1,10 @@ import time from concurrent.futures import Future +from bluesky_stomp.messaging import MessageContext, MessagingTemplate + from blueapi.config import ApplicationConfig from blueapi.core.bluesky_types import DataEvent -from blueapi.messaging import MessageContext, StompMessagingTemplate from blueapi.service.model import ( DeviceModel, DeviceResponse, @@ -38,7 +39,7 @@ def __init__( def from_config(cls, config: ApplicationConfig) -> "BlueapiClient": rest = BlueapiRestClient(config.api) if config.stomp is not None: - template = StompMessagingTemplate.autoconfigured(config.stomp) + template = MessagingTemplate.autoconfigured(config.stomp) events = EventBusClient(template) else: events = None diff --git a/src/blueapi/client/event_bus.py b/src/blueapi/client/event_bus.py index bfd0afd18..b6a2c2e7f 100644 --- a/src/blueapi/client/event_bus.py +++ b/src/blueapi/client/event_bus.py @@ -1,7 +1,8 @@ from collections.abc import Callable +from bluesky_stomp.messaging import MessageContext, MessagingTemplate + from blueapi.core import DataEvent -from blueapi.messaging import MessageContext, MessagingTemplate from blueapi.worker import ProgressEvent, WorkerEvent diff --git a/src/blueapi/messaging/__init__.py b/src/blueapi/messaging/__init__.py deleted file mode 100644 index 0aeb5eb6d..000000000 --- a/src/blueapi/messaging/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -from .base import DestinationProvider, MessageListener, MessagingTemplate -from .context import MessageContext -from .stomptemplate import StompDestinationProvider, StompMessagingTemplate - -__all__ = [ - "MessageListener", - "MessagingTemplate", - "MessageContext", - "StompMessagingTemplate", - "DestinationProvider", - "StompDestinationProvider", -] diff --git a/src/blueapi/messaging/base.py b/src/blueapi/messaging/base.py deleted file mode 100644 index 6c350639a..000000000 --- a/src/blueapi/messaging/base.py +++ /dev/null @@ -1,196 +0,0 @@ -from abc import ABC, abstractmethod -from collections.abc import Callable -from concurrent.futures import Future -from typing import Any - -from .context import MessageContext - -MessageListener = Callable[[MessageContext, Any], None] - - -class DestinationProvider(ABC): - """ - Class that provides destinations for specific types of message bus. - Implementation may be eager or lazy. - """ - - @abstractmethod - def default(self, name: str) -> str: - """ - A default type of destination with a given name. - For example, the provider could default to using queues if no - preference is specified. - - Args: - name (str): The name of the destination - - Returns: - str: Identifier for the destination - """ - - @abstractmethod - def queue(self, name: str) -> str: - """ - A queue with the given name - - Args: - name (str): Name of the queue - - Returns: - str: Identifier for the queue - """ - - @abstractmethod - def topic(self, name: str) -> str: - """ - A topic with the given name - - Args: - name (str): Name of the topic - - Returns: - str: Identifier for the topic - """ - - @abstractmethod - def temporary_queue(self, name: str) -> str: - """ - A temporary queue with the given name - - Args: - name (str): Name of the queue - - Returns: - str: Identifier for the queue - """ - - -class MessagingTemplate(ABC): - """ - Class meant for quickly building message-based applications. - Includes helpers for asynchronous production/consumption and - synchronous send/receive model - """ - - @property - @abstractmethod - def destinations(self) -> DestinationProvider: - """ - Get a destination provider that can create destination - identifiers for this particular template - - Returns: - DestinationProvider: Destination provider - """ - - def send_and_receive( - self, - destination: str, - obj: Any, - reply_type: type = str, - correlation_id: str | None = None, - ) -> Future: - """ - Send a message expecting a single reply. - - Args: - destination (str): Destination to send the message - obj (Any): Message to send, must be serializable - reply_type (Type, optional): Expected type of reply, used - in deserialization. Defaults to str. - correlation_id (Optional[str]): An id which correlates this request with - requests it spawns or the request which - spawned it etc. - Returns: - Future: Future representing the reply - """ - - future: Future = Future() - - def callback(_: MessageContext, reply: Any) -> None: - future.set_result(reply) - - callback.__annotations__["reply"] = reply_type - self.send(destination, obj, callback, correlation_id) - return future - - @abstractmethod - def send( - self, - destination: str, - obj: Any, - on_reply: MessageListener | None = None, - correlation_id: str | None = None, - ) -> None: - """ - Send a message to a destination - - Args: - destination (str): Destination to send the message - obj (Any): Message to send, must be serializable - on_reply (Optional[MessageListener], optional): Callback function for - a reply. Defaults to None. - correlation_id (Optional[str]): An id which correlates this request with - requests it spawns or the request which - spawned it etc. - """ - - def listener(self, destination: str): - """ - Decorator for subscribing to a topic: - - @my_app.listener("my-destination") - def callback(context: MessageContext, message: ???) -> None: - ... - - Args: - destination (str): Destination to subscribe to - """ - - def decorator(callback: MessageListener) -> MessageListener: - self.subscribe(destination, callback) - return callback - - return decorator - - @abstractmethod - def subscribe( - self, - destination: str, - callback: MessageListener, - ) -> None: - """ - Subscribe to messages from a particular destination. Requires - a callback of the form: - - def callback(context: MessageContext, message: ???) -> None: - ... - - The type annotation of the message will be inspected and used in - deserialization. - - Args: - destination (str): Destination to subscribe to - callback (MessageListener): What to do with each message - """ - - @abstractmethod - def connect(self) -> None: - """ - Connect the app to transport - """ - - @abstractmethod - def disconnect(self) -> None: - """ - Disconnect the app from transport - """ - - @abstractmethod - def is_connected(self) -> bool: - """ - Returns status of the connection between the app and the transport. - - Returns: - status (bool): Returns True if connected, False otherwise - """ diff --git a/src/blueapi/messaging/context.py b/src/blueapi/messaging/context.py deleted file mode 100644 index d202b700e..000000000 --- a/src/blueapi/messaging/context.py +++ /dev/null @@ -1,12 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class MessageContext: - """ - Context that comes with a message, provides useful information such as how to reply - """ - - destination: str - reply_destination: str | None - correlation_id: str | None diff --git a/src/blueapi/messaging/stomptemplate.py b/src/blueapi/messaging/stomptemplate.py deleted file mode 100644 index d535c0089..000000000 --- a/src/blueapi/messaging/stomptemplate.py +++ /dev/null @@ -1,249 +0,0 @@ -import itertools -import json -import logging -import time -import uuid -from collections.abc import Callable -from dataclasses import dataclass -from threading import Event -from typing import Any - -import orjson -import stomp -from pydantic import parse_obj_as -from stomp.exception import ConnectFailedException -from stomp.utils import Frame - -from blueapi.config import BasicAuthentication, StompConfig -from blueapi.utils import handle_all_exceptions, serialize - -from .base import DestinationProvider, MessageListener, MessagingTemplate -from .context import MessageContext -from .utils import determine_deserialization_type - -LOGGER = logging.getLogger(__name__) - -CORRELATION_ID_HEADER = "correlation-id" - - -class StompDestinationProvider(DestinationProvider): - """ - Destination provider for stomp, stateless so just - uses naming conventions - """ - - def queue(self, name: str) -> str: - return f"/queue/{name}" - - def topic(self, name: str) -> str: - return f"/topic/{name}" - - def temporary_queue(self, name: str) -> str: - return f"/temp-queue/{name}" - - default = queue - - -@dataclass -class StompReconnectPolicy: - """ - Details of how often stomp will try to reconnect if connection is unexpectedly lost - """ - - initial_delay: float = 0.0 - attempt_period: float = 10.0 - - -@dataclass -class Subscription: - """ - Details of a subscription, the template needs its own representation to - defer subscriptions until after connection - """ - - destination: str - callback: Callable[[Frame], None] - - -class StompMessagingTemplate(MessagingTemplate): - """ - MessagingTemplate that uses the stomp protocol, meant for use - with ActiveMQ. - """ - - _conn: stomp.Connection - _reconnect_policy: StompReconnectPolicy - _authentication: BasicAuthentication - _sub_num: itertools.count - _listener: stomp.ConnectionListener - _subscriptions: dict[str, Subscription] - _pending_subscriptions: set[str] - _disconnected: Event - - # Stateless implementation means attribute can be static - _destination_provider: DestinationProvider = StompDestinationProvider() - - def __init__( - self, - conn: stomp.Connection, - reconnect_policy: StompReconnectPolicy | None = None, - authentication: BasicAuthentication | None = None, - ) -> None: - self._conn = conn - self._reconnect_policy = reconnect_policy or StompReconnectPolicy() - self._authentication = authentication or BasicAuthentication() - - self._sub_num = itertools.count() - self._listener = stomp.ConnectionListener() - - self._listener.on_message = self._on_message - self._conn.set_listener("", self._listener) - - self._subscriptions = {} - - @classmethod - def autoconfigured(cls, config: StompConfig) -> MessagingTemplate: - return cls( - stomp.Connection( - [(config.host, config.port)], - auto_content_length=False, - ), - authentication=config.auth, - ) - - @property - def destinations(self) -> DestinationProvider: - return self._destination_provider - - def send( - self, - destination: str, - obj: Any, - on_reply: MessageListener | None = None, - correlation_id: str | None = None, - ) -> None: - self._send_str( - destination, - orjson.dumps(serialize(obj), option=orjson.OPT_SERIALIZE_NUMPY), - on_reply, - correlation_id, - ) - - def _send_str( - self, - destination: str, - message: bytes, - on_reply: MessageListener | None = None, - correlation_id: str | None = None, - ) -> None: - LOGGER.info(f"SENDING {message!r} to {destination}") - - headers: dict[str, Any] = {"JMSType": "TextMessage"} - if on_reply is not None: - reply_queue_name = self.destinations.temporary_queue(str(uuid.uuid1())) - headers = {**headers, "reply-to": reply_queue_name} - self.subscribe(reply_queue_name, on_reply) - if correlation_id: - headers = {**headers, CORRELATION_ID_HEADER: correlation_id} - self._conn.send(headers=headers, body=message, destination=destination) - - def subscribe(self, destination: str, callback: MessageListener) -> None: - LOGGER.debug(f"New subscription to {destination}") - obj_type = determine_deserialization_type(callback, default=str) - - def wrapper(frame: Frame) -> None: - as_dict = json.loads(frame.body) - value: Any = parse_obj_as(obj_type, as_dict) - - context = MessageContext( - frame.headers["destination"], - frame.headers.get("reply-to"), - frame.headers.get(CORRELATION_ID_HEADER), - ) - callback(context, value) - - sub_id = ( - destination - if destination.startswith("/temp-queue/") - else str(next(self._sub_num)) - ) - self._subscriptions[sub_id] = Subscription(destination, wrapper) - # If we're connected, subscribe immediately, otherwise the subscription is - # deferred until connection. - self._ensure_subscribed([sub_id]) - - def connect(self) -> None: - if self._conn.is_connected(): - return - - connected: Event = Event() - - def finished_connecting(_: Frame): - connected.set() - - self._listener.on_connected = finished_connecting - self._listener.on_disconnected = self._on_disconnected - - LOGGER.info("Connecting...") - - try: - self._conn.connect( - username=self._authentication.username, - passcode=self._authentication.passcode, - wait=True, - ) - connected.wait() - except ConnectFailedException as ex: - LOGGER.exception(msg="Failed to connect to message bus", exc_info=ex) - - self._ensure_subscribed() - - def _ensure_subscribed(self, sub_ids: list[str] | None = None) -> None: - # We must defer subscription until after connection, because stomp literally - # sends a SUB to the broker. But it still nice to be able to call subscribe - # on template before it connects, then just run the subscribes after connection. - if self._conn.is_connected(): - for sub_id in sub_ids or self._subscriptions.keys(): - sub = self._subscriptions[sub_id] - LOGGER.info(f"Subscribing to {sub.destination}") - self._conn.subscribe(destination=sub.destination, id=sub_id, ack="auto") - - def disconnect(self) -> None: - LOGGER.info("Disconnecting...") - if not self.is_connected(): - LOGGER.info("Already disconnected") - return - # We need to synchronise the disconnect on an event because the stomp Connection - # object doesn't do it for us - disconnected = Event() - self._listener.on_disconnected = disconnected.set - self._conn.disconnect() - disconnected.wait() - self._listener.on_disconnected = None - - @handle_all_exceptions - def _on_disconnected(self) -> None: - LOGGER.warn( - "Stomp connection lost, will attempt reconnection with " - f"policy {self._reconnect_policy}" - ) - time.sleep(self._reconnect_policy.initial_delay) - while not self._conn.is_connected(): - try: - self.connect() - except ConnectFailedException: - LOGGER.exception("Reconnect failed") - time.sleep(self._reconnect_policy.attempt_period) - - @handle_all_exceptions - def _on_message(self, frame: Frame) -> None: - LOGGER.info(f"Received {frame}") - sub_id = frame.headers.get("subscription") - sub = self._subscriptions.get(sub_id) - if sub is not None: - sub.callback(frame) - else: - LOGGER.warn(f"No subscription active for id: {sub_id}") - - def is_connected(self) -> bool: - return self._conn.is_connected() diff --git a/src/blueapi/messaging/utils.py b/src/blueapi/messaging/utils.py deleted file mode 100644 index 175005c65..000000000 --- a/src/blueapi/messaging/utils.py +++ /dev/null @@ -1,27 +0,0 @@ -import inspect - -from .base import MessageListener - - -def determine_deserialization_type( - listener: MessageListener, default: type = str -) -> type: - """ - Inspect a message listener function to determine the type to deserialize - a message to - - Args: - listener (MessageListener): The function that takes a deserialized message - default (Type, optional): If the type cannot be determined, what default - should we fall back on? Defaults to str. - - Returns: - Type: _description_ - """ - - _, message = inspect.signature(listener).parameters.values() - a_type = message.annotation - if a_type is not inspect.Parameter.empty: - return a_type - else: - return default diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 55cf6018f..93340fdcc 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -3,11 +3,11 @@ from functools import lru_cache from typing import Any +from bluesky_stomp.messaging import MessagingTemplate + from blueapi.config import ApplicationConfig from blueapi.core.context import BlueskyContext from blueapi.core.event import EventStream -from blueapi.messaging.base import MessagingTemplate -from blueapi.messaging.stomptemplate import StompMessagingTemplate from blueapi.service.model import DeviceModel, PlanModel, WorkerTask from blueapi.worker.event import TaskStatusEnum, WorkerState from blueapi.worker.task import Task @@ -51,7 +51,7 @@ def worker() -> TaskWorker: def messaging_template() -> MessagingTemplate | None: stomp_config = config().stomp if stomp_config is not None: - template = StompMessagingTemplate.autoconfigured(stomp_config) + template = MessagingTemplate.autoconfigured(stomp_config) task_worker = worker() event_topic = template.destinations.topic("public.worker.event") diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 433e7115e..5a541f0e4 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -2,12 +2,12 @@ from unittest.mock import MagicMock, Mock, call import pytest +from bluesky_stomp.messaging import MessageContext from blueapi.client.client import BlueapiClient from blueapi.client.event_bus import AnyEvent, BlueskyStreamingError, EventBusClient from blueapi.client.rest import BlueapiRestClient, BlueskyRemoteControlError from blueapi.core import DataEvent -from blueapi.messaging.context import MessageContext from blueapi.service.model import ( DeviceModel, DeviceResponse, diff --git a/tests/client/test_event_bus.py b/tests/client/test_event_bus.py index 45aab501b..ff5eb3101 100644 --- a/tests/client/test_event_bus.py +++ b/tests/client/test_event_bus.py @@ -1,9 +1,9 @@ from unittest.mock import ANY, Mock import pytest +from bluesky_stomp.messaging import MessagingTemplate from blueapi.client.event_bus import BlueskyStreamingError, EventBusClient -from blueapi.messaging import MessagingTemplate @pytest.fixture diff --git a/tests/conftest.py b/tests/conftest.py index ed2caf1a8..838d4b219 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,27 +6,6 @@ from bluesky.run_engine import TransitionError -def pytest_addoption(parser): - parser.addoption( - "--skip-stomp", - action="store_true", - default=False, - help="skip stomp tests (e.g. because a server is unavailable)", - ) - - -def pytest_configure(config): - config.addinivalue_line("markers", "stomp: mark test as requiring stomp broker") - - -def pytest_collection_modifyitems(config, items): - if config.getoption("--skip-stomp"): - skip_stomp = pytest.mark.skip(reason="skipping stomp tests at user request") - for item in items: - if "stomp" in item.keywords: - item.add_marker(skip_stomp) - - @pytest.fixture(scope="function") def RE(request): loop = asyncio.new_event_loop() diff --git a/tests/messaging/__init__.py b/tests/messaging/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/messaging/test_stomptemplate.py b/tests/messaging/test_stomptemplate.py deleted file mode 100644 index f805ea7e0..000000000 --- a/tests/messaging/test_stomptemplate.py +++ /dev/null @@ -1,256 +0,0 @@ -import itertools -from collections.abc import Iterable -from concurrent.futures import Future -from queue import Queue -from typing import Any -from unittest.mock import ANY, MagicMock, call, patch - -import numpy as np -import pytest -from pydantic import BaseModel, BaseSettings, Field -from stomp import Connection -from stomp.exception import ConnectFailedException, NotConnectedException - -from blueapi.config import StompConfig -from blueapi.messaging import MessageContext, MessagingTemplate, StompMessagingTemplate - -_TIMEOUT: float = 10.0 -_COUNT = itertools.count() - - -class StompTestingSettings(BaseSettings): - blueapi_test_stomp_ports: list[int] = Field(default=[61613]) - - def test_stomp_configs(self) -> Iterable[StompConfig]: - for port in self.blueapi_test_stomp_ports: - yield StompConfig(port=port) - - -@pytest.fixture(params=StompTestingSettings().test_stomp_configs()) -def disconnected_template(request: pytest.FixtureRequest) -> MessagingTemplate: - stomp_config = request.param - template = StompMessagingTemplate.autoconfigured(stomp_config) - assert template is not None - return template - - -@pytest.fixture(params=StompTestingSettings().test_stomp_configs()) -def template(request: pytest.FixtureRequest) -> Iterable[MessagingTemplate]: - stomp_config = request.param - template = StompMessagingTemplate.autoconfigured(stomp_config) - assert template is not None - template.connect() - yield template - template.disconnect() - - -@pytest.fixture -def test_queue(template: MessagingTemplate) -> str: - return template.destinations.queue(f"test-{next(_COUNT)}") - - -@pytest.fixture -def test_queue_2(template: MessagingTemplate) -> str: - return template.destinations.queue(f"test-{next(_COUNT)}") - - -@pytest.fixture -def test_topic(template: MessagingTemplate) -> str: - return template.destinations.topic(f"test-{next(_COUNT)}") - - -def test_disconnected_error(template: MessagingTemplate, test_queue: str) -> None: - acknowledge(template, test_queue) - - f: Future = Future() - - def callback(ctx: MessageContext, message: str) -> None: - f.set_result(message) - - if template.is_connected(): - template.disconnect() - with pytest.raises(NotConnectedException): - template.send(test_queue, "test_message", callback) - - with patch( - "blueapi.messaging.stomptemplate.LOGGER.info", autospec=True - ) as mock_logger: - template.disconnect() - assert not template.is_connected() - expected_calls = [ - call("Disconnecting..."), - call("Already disconnected"), - ] - mock_logger.assert_has_calls(expected_calls) - - -@pytest.mark.stomp -def test_send(template: MessagingTemplate, test_queue: str) -> None: - f: Future = Future() - - def callback(ctx: MessageContext, message: str) -> None: - f.set_result(message) - - template.subscribe(test_queue, callback) - template.send(test_queue, "test_message") - assert f.result(timeout=_TIMEOUT) - - -@pytest.mark.stomp -def test_send_to_topic(template: MessagingTemplate, test_topic: str) -> None: - f: Future = Future() - - def callback(ctx: MessageContext, message: str) -> None: - f.set_result(message) - - template.subscribe(test_topic, callback) - template.send(test_topic, "test_message") - assert f.result(timeout=_TIMEOUT) - - -@pytest.mark.stomp -def test_send_on_reply(template: MessagingTemplate, test_queue: str) -> None: - acknowledge(template, test_queue) - - f: Future = Future() - - def callback(ctx: MessageContext, message: str) -> None: - f.set_result(message) - - template.send(test_queue, "test_message", callback) - assert f.result(timeout=_TIMEOUT) - - -@pytest.mark.stomp -def test_send_and_receive(template: MessagingTemplate, test_queue: str) -> None: - acknowledge(template, test_queue) - reply = template.send_and_receive(test_queue, "test", str).result(timeout=_TIMEOUT) - assert reply == "ack" - - -@pytest.mark.stomp -def test_listener(template: MessagingTemplate, test_queue: str) -> None: - @template.listener(test_queue) - def server(ctx: MessageContext, message: str) -> None: - reply_queue = ctx.reply_destination - if reply_queue is None: - raise RuntimeError("reply queue is None") - template.send(reply_queue, "ack", correlation_id=ctx.correlation_id) - - reply = template.send_and_receive(test_queue, "test", str).result(timeout=_TIMEOUT) - assert reply == "ack" - - -class Foo(BaseModel): - a: int - b: str - - -@pytest.mark.stomp -@pytest.mark.parametrize( - "message,message_type", - [ - ("test", str), - (1, int), - (Foo(a=1, b="test"), Foo), - (np.array([1, 2, 3]), list), - ], -) -def test_deserialization( - template: MessagingTemplate, test_queue: str, message: Any, message_type: type -) -> None: - def server(ctx: MessageContext, message: message_type) -> None: # type: ignore - reply_queue = ctx.reply_destination - if reply_queue is None: - raise RuntimeError("reply queue is None") - template.send(reply_queue, message, correlation_id=ctx.correlation_id) - - template.subscribe(test_queue, server) - reply = template.send_and_receive(test_queue, message, message_type).result( - timeout=_TIMEOUT - ) - if type(message) == np.ndarray: - message = message.tolist() - assert reply == message - - -@pytest.mark.stomp -def test_subscribe_before_connect( - disconnected_template: MessagingTemplate, test_queue: str -) -> None: - acknowledge(disconnected_template, test_queue) - disconnected_template.connect() - reply = disconnected_template.send_and_receive(test_queue, "test", str).result( - timeout=_TIMEOUT - ) - assert reply == "ack" - - -@pytest.mark.stomp -def test_reconnect(template: MessagingTemplate, test_queue: str) -> None: - acknowledge(template, test_queue) - reply = template.send_and_receive(test_queue, "test", str).result(timeout=_TIMEOUT) - assert reply == "ack" - template.disconnect() - assert not template.is_connected() - template.connect() - assert template.is_connected() - reply = template.send_and_receive(test_queue, "test", str).result(timeout=_TIMEOUT) - assert reply == "ack" - - -@pytest.fixture() -def failing_template() -> MessagingTemplate: - def connection_exception(*args, **kwargs): - raise ConnectFailedException - - connection = Connection() - connection.connect = MagicMock(side_effect=connection_exception) - return StompMessagingTemplate(connection) - - -@pytest.mark.stomp -def test_failed_connect(failing_template: MessagingTemplate, test_queue: str) -> None: - assert not failing_template.is_connected() - with patch( - "blueapi.messaging.stomptemplate.LOGGER.error", autospec=True - ) as mock_logger: - failing_template.connect() - assert not failing_template.is_connected() - mock_logger.assert_called_once_with( - "Failed to connect to message bus", exc_info=ANY - ) - - -@pytest.mark.stomp -def test_correlation_id( - template: MessagingTemplate, test_queue: str, test_queue_2: str -) -> None: - correlation_id = "foobar" - q: Queue = Queue() - - def server(ctx: MessageContext, msg: str) -> None: - q.put(ctx) - template.send(test_queue_2, msg, correlation_id=ctx.correlation_id) - - def client(ctx: MessageContext, msg: str) -> None: - q.put(ctx) - - template.subscribe(test_queue, server) - template.subscribe(test_queue_2, client) - template.send(test_queue, "test", None, correlation_id) - - ctx_req: MessageContext = q.get(timeout=_TIMEOUT) - assert ctx_req.correlation_id == correlation_id - ctx_ack: MessageContext = q.get(timeout=_TIMEOUT) - assert ctx_ack.correlation_id == correlation_id - - -def acknowledge(template: MessagingTemplate, destination: str) -> None: - def server(ctx: MessageContext, message: str) -> None: - reply_queue = ctx.reply_destination - if reply_queue is None: - raise RuntimeError("reply queue is None") - template.send(reply_queue, "ack", correlation_id=ctx.correlation_id) - - template.subscribe(destination, server) diff --git a/tests/messaging/test_utils.py b/tests/messaging/test_utils.py deleted file mode 100644 index feafd2afa..000000000 --- a/tests/messaging/test_utils.py +++ /dev/null @@ -1,34 +0,0 @@ -from collections.abc import Mapping -from dataclasses import dataclass -from typing import Any - -import pytest - -from blueapi.messaging.utils import determine_deserialization_type - - -@dataclass -class Foo: - bar: int - baz: str - - -def test_determine_deserialization_type() -> None: - def on_message(headers: Mapping[str, Any], message: Foo) -> None: ... - - deserialization_type = determine_deserialization_type(on_message) # type: ignore - assert deserialization_type is Foo - - -def test_determine_deserialization_type_with_no_type() -> None: - def on_message(headers: Mapping[str, Any], message) -> None: ... - - deserialization_type = determine_deserialization_type(on_message) # type: ignore - assert deserialization_type is str - - -def test_determine_deserialization_type_with_wrong_signature() -> None: - def on_message(message: Foo) -> None: ... - - with pytest.raises(ValueError): - determine_deserialization_type(on_message) # type: ignore From cbcac76f2f7576ac912c59a3d9cb0fcb1d33583c Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Wed, 7 Aug 2024 16:34:37 +0100 Subject: [PATCH 02/22] Handle connect exception in blueapi --- src/blueapi/service/interface.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 93340fdcc..5962be67e 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -4,6 +4,7 @@ from typing import Any from bluesky_stomp.messaging import MessagingTemplate +from stomp import ConnectFailedException from blueapi.config import ApplicationConfig from blueapi.core.context import BlueskyContext @@ -63,8 +64,12 @@ def messaging_template() -> MessagingTemplate | None: task_worker.data_events: event_topic, } ) - template.connect() - return template + try: + template.connect() + return template + except ConnectFailedException as ex: + logging.exception(msg="Failed to connect to message bus", exc_info=ex) + return None else: return None From bce80816951aea9cf0ace65c930dedf9e1bd7fde Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 22 Aug 2024 11:34:49 +0000 Subject: [PATCH 03/22] updated to use bluesky-stomp --- dev-requirements.txt | 1 + pyproject.toml | 4 ++-- src/blueapi/cli/cli.py | 5 ++++- src/blueapi/client/client.py | 5 ++++- src/blueapi/client/event_bus.py | 3 ++- src/blueapi/service/interface.py | 20 ++++++++++++-------- src/blueapi/service/main.py | 5 ++++- 7 files changed, 29 insertions(+), 14 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 6e0905761..abe658dd2 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -18,6 +18,7 @@ black==24.8.0 bluesky==1.13.0a4 bluesky-kafka==0.10.0 bluesky-live==0.0.8 +bluesky-stomp @ git+https://github.com/DiamondLightSource/bluesky-stomp@4dbdb6b144b4b03243a1784f2f53dc13cbbef30e boltons==24.0.0 cachetools==5.4.0 caproto==1.1.1 diff --git a/pyproject.toml b/pyproject.toml index e9b09b86c..12d4cf07c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "aiohttp", "PyYAML", "click", - "fastapi[all]", + "fastapi>=0.112.0", "uvicorn", "requests", "dls-bluesky-core", #requires ophyd-async @@ -91,7 +91,7 @@ filterwarnings = ["error", "ignore::DeprecationWarning"] # Doctest python code in docs, python code in src docstrings, test functions in tests testpaths = "docs src tests" markers = [ - "handler: marks tests that interact with the global handler object in handler.py", + "stomp: marks tests that require the stomp" ] asyncio_mode = "auto" diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 139fd579c..d0dbaa906 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -7,6 +7,7 @@ import click from bluesky.callbacks.best_effort import BestEffortCallback from bluesky_stomp.messaging import MessageContext, MessagingTemplate +from bluesky_stomp.models import Broker from pydantic import ValidationError from requests.exceptions import ConnectionError @@ -146,7 +147,9 @@ def listen_to_events(obj: dict) -> None: config: ApplicationConfig = obj["config"] if config.stomp is not None: event_bus_client = EventBusClient( - MessagingTemplate.autoconfigured(config.stomp) + MessagingTemplate.for_broker( + broker=Broker(host=config.stomp.host, port=config.stomp.port, auth=None) + ) ) else: raise RuntimeError("Message bus needs to be configured") diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 9cd953632..6331cbe3f 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -2,6 +2,7 @@ from concurrent.futures import Future from bluesky_stomp.messaging import MessageContext, MessagingTemplate +from bluesky_stomp.models import Broker from blueapi.config import ApplicationConfig from blueapi.core.bluesky_types import DataEvent @@ -39,7 +40,9 @@ def __init__( def from_config(cls, config: ApplicationConfig) -> "BlueapiClient": rest = BlueapiRestClient(config.api) if config.stomp is not None: - template = MessagingTemplate.autoconfigured(config.stomp) + template = MessagingTemplate.for_broker( + broker=Broker(host=config.stomp.host, port=config.stomp.port, auth=None) + ) events = EventBusClient(template) else: events = None diff --git a/src/blueapi/client/event_bus.py b/src/blueapi/client/event_bus.py index b6a2c2e7f..d3e6b17e8 100644 --- a/src/blueapi/client/event_bus.py +++ b/src/blueapi/client/event_bus.py @@ -1,6 +1,7 @@ from collections.abc import Callable from bluesky_stomp.messaging import MessageContext, MessagingTemplate +from bluesky_stomp.models import MessageQueue from blueapi.core import DataEvent from blueapi.worker import ProgressEvent, WorkerEvent @@ -33,7 +34,7 @@ def subscribe_to_all_events( ) -> None: try: self.app.subscribe( - self.app.destinations.topic("public.worker.event"), + MessageQueue(name="public.worker.event"), on_event, ) except Exception as err: diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 5962be67e..b4fbf2ae8 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -4,7 +4,7 @@ from typing import Any from bluesky_stomp.messaging import MessagingTemplate -from stomp import ConnectFailedException +from bluesky_stomp.models import Broker, DestinationBase, MessageQueue from blueapi.config import ApplicationConfig from blueapi.core.context import BlueskyContext @@ -52,10 +52,12 @@ def worker() -> TaskWorker: def messaging_template() -> MessagingTemplate | None: stomp_config = config().stomp if stomp_config is not None: - template = MessagingTemplate.autoconfigured(stomp_config) + template = MessagingTemplate.for_broker( + broker=Broker(host=stomp_config.host, port=stomp_config.port, auth=None) + ) task_worker = worker() - event_topic = template.destinations.topic("public.worker.event") + event_topic = MessageQueue(name="public.worker.event") _publish_event_streams( { @@ -67,7 +69,7 @@ def messaging_template() -> MessagingTemplate | None: try: template.connect() return template - except ConnectFailedException as ex: + except Exception as ex: logging.exception(msg="Failed to connect to message bus", exc_info=ex) return None else: @@ -95,15 +97,17 @@ def teardown() -> None: messaging_template.cache_clear() -def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -> None: +def _publish_event_streams( + streams_to_destinations: Mapping[EventStream, DestinationBase], +) -> None: for stream, destination in streams_to_destinations.items(): _publish_event_stream(stream, destination) -def _publish_event_stream(stream: EventStream, destination: str) -> None: +def _publish_event_stream(stream: EventStream, destination: DestinationBase) -> None: def forward_message(event: Any, correlation_id: str | None) -> None: if (template := messaging_template()) is not None: - template.send(destination, event, None, correlation_id) + template.send(destination, event, None, correlation_id=correlation_id) stream.subscribe(forward_message) @@ -160,7 +164,7 @@ def get_worker_state() -> WorkerState: return worker().state -def pause_worker(defer: bool | None) -> None: +def pause_worker(defer: bool) -> None: """Command the worker to pause""" worker().pause(defer) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 0a5c25070..364ed0c02 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -312,7 +312,10 @@ def set_state( and new_state in _ALLOWED_TRANSITIONS[current_state] ): if new_state == WorkerState.PAUSED: - runner.run(interface.pause_worker, state_change_request.defer) + if state_change_request.defer is not None: + runner.run(interface.pause_worker, state_change_request.defer) + else: + runner.run(interface.pause_worker, False) elif new_state == WorkerState.RUNNING: runner.run(interface.resume_worker) elif new_state in {WorkerState.ABORTING, WorkerState.STOPPING}: From 53ea6ffebaf03fdb53d61497a5304fdd4f421fb5 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Fri, 23 Aug 2024 13:28:49 +0000 Subject: [PATCH 04/22] updated ci to working command --- .github/actions/install_requirements/action.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/actions/install_requirements/action.yml b/.github/actions/install_requirements/action.yml index d33e08052..c5159b77a 100644 --- a/.github/actions/install_requirements/action.yml +++ b/.github/actions/install_requirements/action.yml @@ -26,7 +26,11 @@ runs: python-version: ${{ env.PYTHON_VERSION }} - name: Install packages - run: pip install ${{ inputs.pip-install }} + run: pip install $([ -f dev-requirements.txt ] && echo '-c dev-requirements.txt') -e .[dev] + shell: bash + + - name: Install bluesky_stomp + run: pip install git+https://github.com/DiamondLightSource/bluesky-stomp.git@main shell: bash - name: Report what was installed From c6117401b08def08beac6616d4c97c41920aeeeb Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Fri, 23 Aug 2024 13:56:37 +0000 Subject: [PATCH 05/22] change signature of subscribe callback --- src/blueapi/cli/cli.py | 2 +- src/blueapi/client/client.py | 2 +- src/blueapi/client/event_bus.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index d0dbaa906..1cd648b0b 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -155,8 +155,8 @@ def listen_to_events(obj: dict) -> None: raise RuntimeError("Message bus needs to be configured") def on_event( - context: MessageContext, event: WorkerEvent | ProgressEvent | DataEvent, + context: MessageContext, ) -> None: converted = json.dumps(event.dict(), indent=2) print(converted) diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 6331cbe3f..14acc4cc8 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -182,7 +182,7 @@ def run_task( complete: Future[WorkerEvent] = Future() - def inner_on_event(ctx: MessageContext, event: AnyEvent) -> None: + def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None: match event: case WorkerEvent(task_status=TaskStatus(task_id=test_id)): relates_to_task = test_id == task_id diff --git a/src/blueapi/client/event_bus.py b/src/blueapi/client/event_bus.py index d3e6b17e8..bee5a1ce6 100644 --- a/src/blueapi/client/event_bus.py +++ b/src/blueapi/client/event_bus.py @@ -30,7 +30,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback) -> None: def subscribe_to_all_events( self, - on_event: Callable[[MessageContext, AnyEvent], None], + on_event: Callable[[AnyEvent, MessageContext], None], ) -> None: try: self.app.subscribe( From 1dc116c0bc50b839e4e8c1f67ca4f3016bc64f50 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Tue, 27 Aug 2024 07:58:25 +0000 Subject: [PATCH 06/22] added auth to the broker constructor --- src/blueapi/cli/cli.py | 6 +++++- src/blueapi/client/client.py | 6 +++++- src/blueapi/service/interface.py | 4 +++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 1cd648b0b..3802e4d13 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -148,7 +148,11 @@ def listen_to_events(obj: dict) -> None: if config.stomp is not None: event_bus_client = EventBusClient( MessagingTemplate.for_broker( - broker=Broker(host=config.stomp.host, port=config.stomp.port, auth=None) + broker=Broker( + host=config.stomp.host, + port=config.stomp.port, + auth=config.stomp.auth, + ) ) ) else: diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 14acc4cc8..176f21350 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -41,7 +41,11 @@ def from_config(cls, config: ApplicationConfig) -> "BlueapiClient": rest = BlueapiRestClient(config.api) if config.stomp is not None: template = MessagingTemplate.for_broker( - broker=Broker(host=config.stomp.host, port=config.stomp.port, auth=None) + broker=Broker( + host=config.stomp.host, + port=config.stomp.port, + auth=config.stomp.auth, + ) ) events = EventBusClient(template) else: diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index b4fbf2ae8..9eb8729d5 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -53,7 +53,9 @@ def messaging_template() -> MessagingTemplate | None: stomp_config = config().stomp if stomp_config is not None: template = MessagingTemplate.for_broker( - broker=Broker(host=stomp_config.host, port=stomp_config.port, auth=None) + broker=Broker( + host=stomp_config.host, port=stomp_config.port, auth=stomp_config.auth + ) ) task_worker = worker() From c9167773a8b4631effc634ffa2b33eb0c6ac9909 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Tue, 27 Aug 2024 10:20:41 +0000 Subject: [PATCH 07/22] update test and runner --- src/blueapi/service/runner.py | 14 ++++++++------ tests/client/test_client.py | 18 ++++++++++-------- tests/core/fake_device_module.py | 2 +- tests/service/test_runner.py | 6 +++--- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/src/blueapi/service/runner.py b/src/blueapi/service/runner.py index 1e3b07ad6..feca3b908 100644 --- a/src/blueapi/service/runner.py +++ b/src/blueapi/service/runner.py @@ -7,6 +7,8 @@ from multiprocessing.pool import Pool as PoolClass from typing import Any, ParamSpec, TypeVar +from pydantic import TypeAdapter + from blueapi.config import ApplicationConfig from blueapi.service.interface import setup, teardown from blueapi.service.model import EnvironmentResponse @@ -143,13 +145,13 @@ def _rpc( mod.__dict__.get(function_name, None), function_name ) value = func(*args, **kwargs) - if expected_type is None or isinstance(value, expected_type): + return _valid_return(value, expected_type) + + +def _valid_return(value: Any, expected_type: type[T] | None = None) -> T: + if expected_type is None or expected_type is Any: return value - else: - raise TypeError( - f"{function_name} returned value of type {type(value)}" - + f" which is incompatible with expected {expected_type}" - ) + return TypeAdapter(expected_type).validate_python(value) def _validate_function(func: Any, function_name: str) -> Callable: diff --git a/tests/client/test_client.py b/tests/client/test_client.py index 5a541f0e4..c2f23a626 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -307,9 +307,11 @@ def test_run_task_sets_up_control( ): mock_rest.create_task.return_value = TaskResponse(task_id="foo") mock_rest.update_worker_task.return_value = TaskResponse(task_id="foo") + ctx = Mock() + ctx.correlation_id = "foo" + mock_events.subscribe_to_all_events = lambda on_event: on_event(COMPLETE_EVENT, ctx) client_with_events.run_task(Task(name="foo")) - mock_rest.create_task.assert_called_once_with(Task(name="foo")) mock_rest.update_worker_task.assert_called_once_with(WorkerTask(task_id="foo")) @@ -324,7 +326,7 @@ def test_run_task_fails_on_failing_event( ctx = Mock() ctx.correlation_id = "foo" - mock_events.subscribe_to_all_events = lambda on_event: on_event(ctx, FAILED_EVENT) + mock_events.subscribe_to_all_events = lambda on_event: on_event(FAILED_EVENT, ctx) on_event = Mock() with pytest.raises(BlueskyStreamingError): @@ -360,9 +362,9 @@ def test_run_task_calls_event_callback( ctx = Mock() ctx.correlation_id = "foo" - def callback(on_event: Callable[[MessageContext, AnyEvent], None]): - on_event(ctx, test_event) - on_event(ctx, COMPLETE_EVENT) + def callback(on_event: Callable[[AnyEvent, MessageContext], None]): + on_event(test_event, ctx) + on_event(COMPLETE_EVENT, ctx) mock_events.subscribe_to_all_events = callback # type: ignore @@ -399,9 +401,9 @@ def test_run_task_ignores_non_matching_events( ctx = Mock() ctx.correlation_id = "foo" - def callback(on_event: Callable[[MessageContext, AnyEvent], None]): - on_event(ctx, test_event) - on_event(ctx, COMPLETE_EVENT) + def callback(on_event: Callable[[AnyEvent, MessageContext], None]): + on_event(test_event, ctx) # type: ignore + on_event(COMPLETE_EVENT, ctx) mock_events.subscribe_to_all_events = callback diff --git a/tests/core/fake_device_module.py b/tests/core/fake_device_module.py index dfdcefbbb..d27af8480 100644 --- a/tests/core/fake_device_module.py +++ b/tests/core/fake_device_module.py @@ -33,7 +33,7 @@ def _mock_with_name(name: str) -> MagicMock: def wrong_return_type() -> int: - return "0" # type: ignore + return "str" # type: ignore fetchable_non_callable = NonCallableMock() diff --git a/tests/service/test_runner.py b/tests/service/test_runner.py index f107e1881..e3edde0c4 100644 --- a/tests/service/test_runner.py +++ b/tests/service/test_runner.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch import pytest +from pydantic import ValidationError from blueapi.service import interface from blueapi.service.model import EnvironmentResponse @@ -156,8 +157,7 @@ def test_clear_message_for_wrong_return(started_runner: WorkerDispatcher): from tests.core.fake_device_module import wrong_return_type with pytest.raises( - TypeError, - match="wrong_return_type returned value of type " - + " which is incompatible with expected ", + ValidationError, + match="1 validation error for int", ): started_runner.run(wrong_return_type) From 8e0410f56e4c365f52763b9c85d65c2b1daf4a11 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Tue, 27 Aug 2024 11:30:49 +0000 Subject: [PATCH 08/22] stop gap commit will be removed --- tests/test_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index ab725c005..c44eeb842 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -150,7 +150,7 @@ def test_valid_stomp_config_for_listener(runner: CliRunner): ) assert ( result.output - == "Subscribing to all bluesky events from localhost:61613\nPress enter to exit" + == 'Subscribing to all bluesky events from localhost:61613\nPress enter to exit{\n "state": "IDLE",\n "task_status": null,\n "errors": [],\n "warnings": []\n}\n' ) assert result.exit_code == 0 From 258c149642d933230fdf363a1815650207eb0824 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Tue, 27 Aug 2024 13:24:50 +0000 Subject: [PATCH 09/22] code review changes --- pyproject.toml | 2 +- src/blueapi/client/event_bus.py | 4 ++-- src/blueapi/service/interface.py | 4 ++-- src/blueapi/service/main.py | 7 +++---- tests/service/test_interface.py | 9 +++++++++ tests/test_cli.py | 4 ++-- 6 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 12d4cf07c..5098ea769 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,7 +91,7 @@ filterwarnings = ["error", "ignore::DeprecationWarning"] # Doctest python code in docs, python code in src docstrings, test functions in tests testpaths = "docs src tests" markers = [ - "stomp: marks tests that require the stomp" + "stomp: marks tests that require a message bus configured for stomp" ] asyncio_mode = "auto" diff --git a/src/blueapi/client/event_bus.py b/src/blueapi/client/event_bus.py index bee5a1ce6..94e374d4b 100644 --- a/src/blueapi/client/event_bus.py +++ b/src/blueapi/client/event_bus.py @@ -1,7 +1,7 @@ from collections.abc import Callable from bluesky_stomp.messaging import MessageContext, MessagingTemplate -from bluesky_stomp.models import MessageQueue +from bluesky_stomp.models import MessageTopic from blueapi.core import DataEvent from blueapi.worker import ProgressEvent, WorkerEvent @@ -34,7 +34,7 @@ def subscribe_to_all_events( ) -> None: try: self.app.subscribe( - MessageQueue(name="public.worker.event"), + MessageTopic(name="public.worker.event"), on_event, ) except Exception as err: diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 9eb8729d5..23890069a 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -4,7 +4,7 @@ from typing import Any from bluesky_stomp.messaging import MessagingTemplate -from bluesky_stomp.models import Broker, DestinationBase, MessageQueue +from bluesky_stomp.models import Broker, DestinationBase, MessageTopic from blueapi.config import ApplicationConfig from blueapi.core.context import BlueskyContext @@ -59,7 +59,7 @@ def messaging_template() -> MessagingTemplate | None: ) task_worker = worker() - event_topic = MessageQueue(name="public.worker.event") + event_topic = MessageTopic(name="public.worker.event") _publish_event_streams( { diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 364ed0c02..aac76e1cc 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -312,10 +312,9 @@ def set_state( and new_state in _ALLOWED_TRANSITIONS[current_state] ): if new_state == WorkerState.PAUSED: - if state_change_request.defer is not None: - runner.run(interface.pause_worker, state_change_request.defer) - else: - runner.run(interface.pause_worker, False) + if state_change_request.defer is None: + state_change_request.defer = False + runner.run(interface.pause_worker, state_change_request.defer) elif new_state == WorkerState.RUNNING: runner.run(interface.resume_worker) elif new_state in {WorkerState.ABORTING, WorkerState.STOPPING}: diff --git a/tests/service/test_interface.py b/tests/service/test_interface.py index 6a08eafc2..a053f9242 100644 --- a/tests/service/test_interface.py +++ b/tests/service/test_interface.py @@ -1,3 +1,4 @@ +import logging import uuid from dataclasses import dataclass from unittest.mock import MagicMock, patch @@ -273,3 +274,11 @@ def test_get_task_by_id(context_mock: MagicMock): def test_stomp_config(): interface.set_config(ApplicationConfig(stomp=StompConfig())) assert interface.messaging_template() is not None + + +@pytest.mark.stomp +def test_stomp_connection_failure_with_invalid_config(caplog): + caplog.at_level(logging.exception) + interface.set_config(ApplicationConfig(stomp=StompConfig(host="Not Exists"))) + assert interface.messaging_template() is None + assert "Failed to connect to message bus" in caplog.text diff --git a/tests/test_cli.py b/tests/test_cli.py index c44eeb842..da09f5be5 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -149,8 +149,8 @@ def test_valid_stomp_config_for_listener(runner: CliRunner): input="\n", ) assert ( - result.output - == 'Subscribing to all bluesky events from localhost:61613\nPress enter to exit{\n "state": "IDLE",\n "task_status": null,\n "errors": [],\n "warnings": []\n}\n' + "Subscribing to all bluesky events from localhost:61613\nPress enter to exit" + in result.output ) assert result.exit_code == 0 From 594c1dc827bc00cda43fdb07ce6d12f04ccce8f3 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Wed, 28 Aug 2024 15:29:50 +0000 Subject: [PATCH 10/22] docker container change --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index dd1a2398f..7306daac4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,7 @@ FROM developer as build COPY . /context WORKDIR /context RUN touch dev-requirements.txt && pip install --upgrade pip && pip install -c dev-requirements.txt . +RUN pip install git+https://github.com/DiamondLightSource/bluesky-stomp.git@main # The runtime stage copies the built venv into a slim runtime container FROM python:${PYTHON_VERSION}-slim as runtime From 730191c98584f1fe1da8f1c9f0676281dece0dab Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 29 Aug 2024 12:50:49 +0100 Subject: [PATCH 11/22] pyproject dependency fix (#607) --- dev-requirements.txt | 87 +++++++++++++++++--------------------------- pyproject.toml | 12 +++--- 2 files changed, 40 insertions(+), 59 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index abe658dd2..62b7a6907 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,10 +1,10 @@ accessible-pygments==0.0.5 aioca==1.7 aiofiles==24.1.0 -aiohappyeyeballs==2.3.5 -aiohttp==3.10.3 +aiohappyeyeballs==2.4.0 +aiohttp==3.10.5 aiosignal==1.3.1 -alabaster==0.7.16 +alabaster==1.0.0 annotated-types==0.7.0 anyio==4.4.0 appdirs==1.4.4 @@ -14,17 +14,15 @@ attrs==24.2.0 babel==2.16.0 beautifulsoup4==4.12.3 bidict==0.23.1 -black==24.8.0 bluesky==1.13.0a4 bluesky-kafka==0.10.0 bluesky-live==0.0.8 bluesky-stomp @ git+https://github.com/DiamondLightSource/bluesky-stomp@4dbdb6b144b4b03243a1784f2f53dc13cbbef30e boltons==24.0.0 -cachetools==5.4.0 +cachetools==5.5.0 caproto==1.1.1 certifi==2024.7.4 cfgv==3.4.0 -chardet==5.2.0 charset-normalizer==3.3.2 click==8.1.7 cloudpickle==3.0.0 @@ -32,31 +30,28 @@ colorama==0.4.6 colorlog==6.8.2 comm==0.2.2 confluent-kafka==2.5.0 -contourpy==1.2.1 +contourpy==1.3.0 copier==9.3.1 coverage==7.6.1 cycler==0.12.1 -dask==2024.8.0 +dask==2024.8.1 databroker==1.2.5 dataclasses-json==0.6.7 decorator==5.1.1 deepmerge==1.1.1 -diff_cover==9.1.1 distlib==0.3.8 dls-bluesky-core==0.0.4 -dls-dodal==1.28.0 +dls-dodal==1.29.4 dnspython==2.6.1 docopt==0.6.2 doct==1.1.0 docutils==0.21.2 dunamai==1.22.0 -email_validator==2.2.0 entrypoints==0.4 epicscorelibs==7.0.7.99.0.2 event-model==1.20.0 executing==2.0.1 -fastapi==0.112.0 -fastapi-cli==0.0.5 +fastapi==0.112.2 fasteners==0.19 filelock==3.15.4 flexcache==0.3 @@ -73,32 +68,30 @@ h5py==3.11.0 HeapDict==1.0.1 historydict==1.2.6 httpcore==1.0.5 -httptools==0.6.1 -httpx==0.27.0 +httpx==0.27.2 humanize==4.10.0 identify==2.6.0 -idna==3.7 -imageio==2.34.2 +idna==3.8 +imageio==2.35.1 imagesize==1.4.1 -importlib_metadata==8.2.0 -importlib_resources==6.4.0 +importlib_metadata==8.4.0 +importlib_resources==6.4.4 iniconfig==2.0.0 intake==0.6.4 ipython==8.18.0 -ipywidgets==8.1.3 -itsdangerous==2.2.0 +ipywidgets==8.1.5 jedi==0.19.1 Jinja2==3.1.4 jinja2-ansible-filters==1.3.2 jsonschema==4.23.0 jsonschema-specifications==2023.12.1 -jupyterlab_widgets==3.0.11 +jupyterlab_widgets==3.0.13 kiwisolver==1.4.5 ldap3==2.9.1 locket==1.0.0 markdown-it-py==3.0.0 MarkupSafe==2.1.5 -marshmallow==3.21.3 +marshmallow==3.22.0 matplotlib==3.9.2 matplotlib-inline==0.1.7 mdit-py-plugins==0.4.1 @@ -109,7 +102,7 @@ mongoquery==1.4.2 msgpack==1.0.8 msgpack-numpy==0.4.8 multidict==6.0.5 -mypy==1.11.1 +mypy==1.11.2 mypy-extensions==1.0.0 myst-parser==4.0.0 networkx==3.3 @@ -117,10 +110,10 @@ nodeenv==1.9.1 nose2==0.15.1 nslsii==0.10.3 numcodecs==0.13.0 -numpy==2.0.1 +numpy==1.26.4 opencv-python-headless==4.10.0.84 ophyd==1.9.0 -ophyd-async==0.3.1 +ophyd-async==0.3.4 orjson==3.10.7 p4p==4.1.12 packaging==24.1 @@ -151,23 +144,19 @@ py==1.11.0 pyasn1==0.6.0 pycryptodome==3.20.0 pydantic==2.8.2 -pydantic-extra-types==2.9.0 pydantic-settings==2.4.0 pydantic_core==2.20.1 pydata-sphinx-theme==0.15.4 -pyepics==3.5.6 +pyepics==3.5.7 Pygments==2.18.0 pymongo==4.8.0 pyOlog==4.5.0 -pyparsing==3.1.2 -pyright==1.1.375 +pyparsing==3.1.4 pytest==8.3.2 -pytest-asyncio==0.23.8 +pytest-asyncio==0.24.0 pytest-cov==5.0.0 -pytest-random-order==1.1.1 python-dateutil==2.9.0.post0 python-dotenv==1.0.1 -python-multipart==0.0.9 pytz==2024.1 PyYAML==6.0.2 questionary==2.0.1 @@ -176,21 +165,18 @@ redis-json-dict==0.2.0 referencing==0.35.1 requests==2.32.3 responses==0.25.3 -rich==13.7.1 rpds-py==0.20.0 -ruff==0.5.7 -scanspec==0.7.1 -setuptools-dso==2.11a2 -shellingham==1.5.4 +ruff==0.6.2 +scanspec==0.7.2 +setuptools-dso==2.11 six==1.16.0 slicerator==1.1.0 smmap==5.0.1 sniffio==1.3.1 snowballstemmer==2.2.0 -soupsieve==2.5 -Sphinx==7.4.5 +soupsieve==2.6 +Sphinx==8.0.2 sphinx-autobuild==2024.4.16 -sphinx-autodoc-typehints==2.2.3 sphinx-click==6.0.0 sphinx-copybutton==0.5.2 sphinx_design==0.6.1 @@ -200,25 +186,22 @@ sphinxcontrib-devhelp==2.0.0 sphinxcontrib-htmlhelp==2.1.0 sphinxcontrib-httpdomain==1.8.1 sphinxcontrib-jsmath==1.0.1 -sphinxcontrib-mermaid==0.9.2 sphinxcontrib-openapi==0.8.4 sphinxcontrib-qthelp==2.0.0 sphinxcontrib-serializinghtml==2.0.0 stack-data==0.6.3 -starlette==0.37.2 +starlette==0.38.2 stomp-py==8.1.2 suitcase-mongo==0.6.0 suitcase-msgpack==0.3.0 suitcase-utils==0.5.4 super-state-machine==2.0.2 -tifffile==2024.8.10 +tifffile==2024.8.28 toolz==0.12.1 tox==3.28.0 tox-direct==0.4 tqdm==4.66.5 traitlets==5.14.3 -typer==0.12.3 -types-aiofiles==24.1.0.20240626 types-mock==5.1.0.20240425 types-PyYAML==6.0.12.20240808 types-requests==2.32.0.20240712 @@ -227,20 +210,18 @@ typing-inspect==0.9.0 typing_extensions==4.12.2 tzdata==2024.1 tzlocal==5.2 -ujson==5.10.0 urllib3==2.2.2 -uvicorn==0.30.5 -uvloop==0.19.0 +uvicorn==0.30.6 virtualenv==20.26.3 watchfiles==0.23.0 wcwidth==0.2.13 websocket-client==1.8.0 -websockets==12.0 -widgetsnbextension==4.0.11 +websockets==13.0 +widgetsnbextension==4.0.13 workflows==2.27 xarray==2024.7.0 yarl==1.9.4 zarr==2.18.2 zict==2.2.0 -zipp==3.20.0 -zocalo==1.0.0 +zipp==3.20.1 +zocalo==1.1.0 diff --git a/pyproject.toml b/pyproject.toml index 5098ea769..ce567eb49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,11 +18,11 @@ dependencies = [ "pyepics", "aioca", "pydantic>=2.0", - "scanspec>=0.7.1", + "scanspec>=0.7.2", "pydantic-settings", "stomp-py", "aiohttp", - "PyYAML", + "PyYAML>=6.0.2", "click", "fastapi>=0.112.0", "uvicorn", @@ -40,16 +40,16 @@ requires-python = ">=3.10" [project.optional-dependencies] dev = [ "copier", - "myst-parser", + "myst-parser>=4.0.0", "pipdeptree", - "pre-commit", - "pydata-sphinx-theme>=0.12", + "pre-commit>=3.8.0", + "pydata-sphinx-theme>=0.15.4", "mypy", "pytest-cov", "pytest-asyncio", "responses", "ruff", - "sphinx-autobuild", + "sphinx-autobuild>=2024.4.16", "sphinx-copybutton", "sphinx-click", "sphinx-design", From 3cd7945046032c0364bfd3b66a84a4c33be4d7e0 Mon Sep 17 00:00:00 2001 From: DiamondJoseph <53935796+DiamondJoseph@users.noreply.github.com> Date: Thu, 29 Aug 2024 13:27:49 +0100 Subject: [PATCH 12/22] Handle parameterised generics (#598) Closes #597 --------- Co-authored-by: Callum Forrester --- src/blueapi/service/runner.py | 5 ++- tests/service/test_runner.py | 82 ++++++++++++++++++++++++++++++++++- 2 files changed, 84 insertions(+), 3 deletions(-) diff --git a/src/blueapi/service/runner.py b/src/blueapi/service/runner.py index feca3b908..e122bf7d7 100644 --- a/src/blueapi/service/runner.py +++ b/src/blueapi/service/runner.py @@ -149,9 +149,10 @@ def _rpc( def _valid_return(value: Any, expected_type: type[T] | None = None) -> T: - if expected_type is None or expected_type is Any: + if expected_type is None: return value - return TypeAdapter(expected_type).validate_python(value) + else: + return TypeAdapter(expected_type).validate_python(value) def _validate_function(func: Any, function_name: str) -> Callable: diff --git a/tests/service/test_runner.py b/tests/service/test_runner.py index e3edde0c4..b0c66b745 100644 --- a/tests/service/test_runner.py +++ b/tests/service/test_runner.py @@ -1,8 +1,10 @@ +from typing import Any, Generic, TypeVar from unittest import mock from unittest.mock import MagicMock, patch import pytest -from pydantic import ValidationError +from ophyd import Callable +from pydantic import BaseModel, ValidationError from blueapi.service import interface from blueapi.service.model import EnvironmentResponse @@ -161,3 +163,81 @@ def test_clear_message_for_wrong_return(started_runner: WorkerDispatcher): match="1 validation error for int", ): started_runner.run(wrong_return_type) + + +T = TypeVar("T") + + +class SimpleModel(BaseModel): + a: int + b: str + + +class NestedModel(BaseModel): + nested: SimpleModel + c: bool + + +class GenericModel(BaseModel, Generic[T]): + a: T + b: str + + +def return_int() -> int: + return 1 + + +def return_str() -> str: + return "hello" + + +def return_list() -> list[int]: + return [1, 2, 3] + + +def return_dict() -> dict[str, int]: + return { + "test": 1, + "other_test": 2, + } + + +def return_simple_model() -> SimpleModel: + return SimpleModel(a=1, b="hi") + + +def return_nested_model() -> NestedModel: + return NestedModel(nested=return_simple_model(), c=False) + + +def return_unbound_generic_model() -> GenericModel: + return GenericModel(a="foo", b="bar") + + +def return_bound_generic_model() -> GenericModel[int]: + return GenericModel(a=1, b="hi") + + +def return_explicitly_bound_generic_model() -> GenericModel[int]: + return GenericModel[int](a=1, b="hi") + + +@pytest.mark.parametrize( + "rpc_function", + [ + return_int, + return_str, + return_list, + return_dict, + return_simple_model, + return_nested_model, + return_unbound_generic_model, + # https://github.com/pydantic/pydantic/issues/6870 return_bound_generic_model, + return_explicitly_bound_generic_model, + ], +) +def test_accepts_return_type( + started_runner: WorkerDispatcher, + rpc_function: Callable[[], Any], +): + started_runner.run(rpc_function) From ec685d1369c3239d4db4e725bfab011b602ce10f Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 29 Aug 2024 16:17:45 +0100 Subject: [PATCH 13/22] added code review changes --- .../actions/install_requirements/action.yml | 6 +- Dockerfile | 1 - dev-requirements.txt | 6 +- docs/reference/openapi.yaml | 804 +++++++++--------- pyproject.toml | 1 + src/blueapi/service/interface.py | 12 +- src/blueapi/service/main.py | 2 - src/blueapi/service/model.py | 2 +- tests/service/test_interface.py | 9 - 9 files changed, 415 insertions(+), 428 deletions(-) diff --git a/.github/actions/install_requirements/action.yml b/.github/actions/install_requirements/action.yml index c5159b77a..d33e08052 100644 --- a/.github/actions/install_requirements/action.yml +++ b/.github/actions/install_requirements/action.yml @@ -26,11 +26,7 @@ runs: python-version: ${{ env.PYTHON_VERSION }} - name: Install packages - run: pip install $([ -f dev-requirements.txt ] && echo '-c dev-requirements.txt') -e .[dev] - shell: bash - - - name: Install bluesky_stomp - run: pip install git+https://github.com/DiamondLightSource/bluesky-stomp.git@main + run: pip install ${{ inputs.pip-install }} shell: bash - name: Report what was installed diff --git a/Dockerfile b/Dockerfile index 7306daac4..dd1a2398f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,6 @@ FROM developer as build COPY . /context WORKDIR /context RUN touch dev-requirements.txt && pip install --upgrade pip && pip install -c dev-requirements.txt . -RUN pip install git+https://github.com/DiamondLightSource/bluesky-stomp.git@main # The runtime stage copies the built venv into a slim runtime container FROM python:${PYTHON_VERSION}-slim as runtime diff --git a/dev-requirements.txt b/dev-requirements.txt index 62b7a6907..317ce9015 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -17,7 +17,7 @@ bidict==0.23.1 bluesky==1.13.0a4 bluesky-kafka==0.10.0 bluesky-live==0.0.8 -bluesky-stomp @ git+https://github.com/DiamondLightSource/bluesky-stomp@4dbdb6b144b4b03243a1784f2f53dc13cbbef30e +bluesky-stomp==0.1.0 boltons==24.0.0 cachetools==5.5.0 caproto==1.1.1 @@ -213,10 +213,10 @@ tzlocal==5.2 urllib3==2.2.2 uvicorn==0.30.6 virtualenv==20.26.3 -watchfiles==0.23.0 +watchfiles==0.24.0 wcwidth==0.2.13 websocket-client==1.8.0 -websockets==13.0 +websockets==13.0.1 widgetsnbextension==4.0.13 workflows==2.27 xarray==2024.7.0 diff --git a/docs/reference/openapi.yaml b/docs/reference/openapi.yaml index 09f4b256c..64f5d4c9e 100644 --- a/docs/reference/openapi.yaml +++ b/docs/reference/openapi.yaml @@ -1,481 +1,289 @@ -components: - schemas: - DeviceModel: - additionalProperties: false - description: Representation of a device - properties: - name: - description: Name of the device - title: Name - type: string - protocols: - description: Protocols that a device conforms to, indicating its capabilities - items: - type: string - title: Protocols - type: array - required: - - name - - protocols - title: DeviceModel - type: object - DeviceResponse: - additionalProperties: false - description: Response to a query for devices - properties: - devices: - description: Devices available to use in plans - items: - $ref: '#/components/schemas/DeviceModel' - title: Devices - type: array - required: - - devices - title: DeviceResponse - type: object - EnvironmentResponse: - additionalProperties: false - description: State of internal environment. - properties: - error_message: - anyOf: - - minLength: 1 - type: string - - type: 'null' - description: If present - error loading context - title: Error Message - initialized: - description: blueapi context initialized - title: Initialized - type: boolean - required: - - initialized - title: EnvironmentResponse - type: object - HTTPValidationError: - properties: - detail: - items: - $ref: '#/components/schemas/ValidationError' - title: Detail - type: array - title: HTTPValidationError - type: object - PlanModel: - additionalProperties: false - description: Representation of a plan - properties: - description: - anyOf: - - type: string - - type: 'null' - description: Docstring of the plan - title: Description - name: - description: Name of the plan - title: Name - type: string - schema: - anyOf: - - type: object - - type: 'null' - description: Schema of the plan's parameters - title: Schema - required: - - name - title: PlanModel - type: object - PlanResponse: - additionalProperties: false - description: Response to a query for plans - properties: - plans: - description: Plans available to use by a worker - items: - $ref: '#/components/schemas/PlanModel' - title: Plans - type: array - required: - - plans - title: PlanResponse - type: object - StateChangeRequest: - additionalProperties: false - description: Request to change the state of the worker. - properties: - defer: - anyOf: - - type: boolean - - type: 'null' - default: false - description: Should worker defer Pausing until the next checkpoint - title: Defer - new_state: - $ref: '#/components/schemas/WorkerState' - reason: - anyOf: - - type: string - - type: 'null' - description: The reason for the current run to be aborted - title: Reason - required: - - new_state - title: StateChangeRequest - type: object - Task: - additionalProperties: false - description: Task that will run a plan - properties: - name: - description: Name of plan to run - title: Name - type: string - params: - description: Values for parameters to plan, if any - title: Params - type: object - required: - - name - title: Task - type: object - TaskResponse: - additionalProperties: false - description: Acknowledgement that a task has started, includes its ID - properties: - task_id: - description: Unique identifier for the task - title: Task Id - type: string - required: - - task_id - title: TaskResponse - type: object - TasksListResponse: - additionalProperties: false - description: Diagnostic information on the tasks - properties: - tasks: - description: List of tasks - items: - $ref: '#/components/schemas/TrackableTask' - title: Tasks - type: array - required: - - tasks - title: TasksListResponse - type: object - TrackableTask: - additionalProperties: false - description: A representation of a task that the worker recognizes - properties: - errors: - items: - type: string - title: Errors - type: array - is_complete: - default: false - title: Is Complete - type: boolean - is_pending: - default: true - title: Is Pending - type: boolean - task: - title: Task - task_id: - title: Task Id - type: string - required: - - task_id - - task - title: TrackableTask - type: object - ValidationError: - properties: - loc: - items: - anyOf: - - type: string - - type: integer - title: Location - type: array - msg: - title: Message - type: string - type: - title: Error Type - type: string - required: - - loc - - msg - - type - title: ValidationError - type: object - WorkerState: - description: The state of the Worker. - enum: - - IDLE - - RUNNING - - PAUSING - - PAUSED - - HALTING - - STOPPING - - ABORTING - - SUSPENDING - - PANICKED - - UNKNOWN - title: WorkerState - type: string - WorkerTask: - additionalProperties: false - description: Worker's active task ID, can be None - properties: - task_id: - anyOf: - - type: string - - type: 'null' - description: The ID of the current task, None if the worker is idle - title: Task Id - required: - - task_id - title: WorkerTask - type: object +openapi: 3.1.0 info: title: BlueAPI Control version: 0.0.5 -openapi: 3.1.0 paths: - /devices: - get: - description: Retrieve information about all available devices. - operationId: get_devices_devices_get - responses: - '200': - content: - application/json: - schema: - $ref: '#/components/schemas/DeviceResponse' - description: Successful Response - summary: Get Devices - /devices/{name}: + /environment: get: - description: Retrieve information about a devices by its (unique) name. - operationId: get_device_by_name_devices__name__get - parameters: - - in: path - name: name - required: true - schema: - title: Name - type: string + summary: Get Environment + description: Get the current state of the environment, i.e. initialization state. + operationId: get_environment_environment_get responses: '200': - content: - application/json: - schema: - $ref: '#/components/schemas/DeviceModel' description: Successful Response - '422': content: application/json: schema: - $ref: '#/components/schemas/HTTPValidationError' - description: Validation Error - summary: Get Device By Name - /environment: + $ref: '#/components/schemas/EnvironmentResponse' delete: - description: Delete the current environment, causing internal components to - be reloaded. + summary: Delete Environment + description: >- + Delete the current environment, causing internal components to be + reloaded. operationId: delete_environment_environment_delete responses: '200': - content: - application/json: - schema: - $ref: '#/components/schemas/EnvironmentResponse' description: Successful Response - summary: Delete Environment - get: - description: Get the current state of the environment, i.e. initialization state. - operationId: get_environment_environment_get - responses: - '200': content: application/json: schema: $ref: '#/components/schemas/EnvironmentResponse' - description: Successful Response - summary: Get Environment /plans: get: + summary: Get Plans description: Retrieve information about all available plans. operationId: get_plans_plans_get responses: '200': + description: Successful Response content: application/json: schema: $ref: '#/components/schemas/PlanResponse' - description: Successful Response - summary: Get Plans /plans/{name}: get: + summary: Get Plan By Name description: Retrieve information about a plan by its (unique) name. operationId: get_plan_by_name_plans__name__get parameters: - - in: path - name: name - required: true - schema: - title: Name - type: string + - name: name + in: path + required: true + schema: + type: string + title: Name responses: '200': + description: Successful Response content: application/json: schema: $ref: '#/components/schemas/PlanModel' - description: Successful Response '422': + description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - description: Validation Error - summary: Get Plan By Name - /tasks: + /devices: get: - description: 'Retrieve tasks based on their status. - - The status of a newly created task is ''unstarted''.' - operationId: get_tasks_tasks_get - parameters: - - in: query - name: task_status - required: false - schema: - anyOf: - - type: string - - type: 'null' - title: Task Status + summary: Get Devices + description: Retrieve information about all available devices. + operationId: get_devices_devices_get responses: '200': + description: Successful Response content: application/json: schema: - $ref: '#/components/schemas/TasksListResponse' + $ref: '#/components/schemas/DeviceResponse' + /devices/{name}: + get: + summary: Get Device By Name + description: Retrieve information about a devices by its (unique) name. + operationId: get_device_by_name_devices__name__get + parameters: + - name: name + in: path + required: true + schema: + type: string + title: Name + responses: + '200': description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DeviceModel' '422': + description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - description: Validation Error - summary: Get Tasks + /tasks: post: + summary: Submit Task description: Submit a task to the worker. operationId: submit_task_tasks_post requestBody: + required: true content: application/json: + schema: + $ref: '#/components/schemas/Task' example: name: count params: detectors: - - x - schema: - $ref: '#/components/schemas/Task' - required: true + - x responses: '201': + description: Successful Response content: application/json: schema: $ref: '#/components/schemas/TaskResponse' - description: Successful Response '422': + description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + get: + summary: Get Tasks + description: |- + Retrieve tasks based on their status. + The status of a newly created task is 'unstarted'. + operationId: get_tasks_tasks_get + parameters: + - name: task_status + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Task Status + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TasksListResponse' + '422': description: Validation Error - summary: Submit Task + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /tasks/{task_id}: delete: + summary: Delete Submitted Task operationId: delete_submitted_task_tasks__task_id__delete parameters: - - in: path - name: task_id - required: true - schema: - title: Task Id - type: string + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id responses: '200': + description: Successful Response content: application/json: schema: $ref: '#/components/schemas/TaskResponse' - description: Successful Response '422': + description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - description: Validation Error - summary: Delete Submitted Task get: + summary: Get Task description: Retrieve a task operationId: get_task_tasks__task_id__get parameters: - - in: path - name: task_id - required: true - schema: - title: Task Id - type: string + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id responses: '200': + description: Successful Response content: application/json: schema: $ref: '#/components/schemas/TrackableTask' - description: Successful Response '422': + description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /worker/task: + get: + summary: Get Active Task + operationId: get_active_task_worker_task_get + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerTask' + put: + summary: Set Active Task + description: >- + Set a task to active status, the worker should begin it as soon as + possible. + + This will return an error response if the worker is not idle. + operationId: set_active_task_worker_task_put + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerTask' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerTask' + '409': + description: Conflict + worker: already active + '422': description: Validation Error - summary: Get Task + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /worker/state: get: + summary: Get State description: Get the State of the Worker operationId: get_state_worker_state_get responses: '200': + description: Successful Response content: application/json: schema: $ref: '#/components/schemas/WorkerState' - description: Successful Response - summary: Get State put: - description: "Request that the worker is put into a particular state.\nReturns\ - \ the state of the worker at the end of the call.\n\n- **The following transitions\ - \ are allowed and return 202: Accepted**\n- If the worker is **PAUSED**, new_state\ - \ may be **RUNNING** to resume.\n- If the worker is **RUNNING**, new_state\ - \ may be **PAUSED** to pause:\n - If defer is False (default): pauses and\ - \ rewinds to the previous checkpoint\n - If defer is True: waits until\ - \ the next checkpoint to pause\n - **If the task has no checkpoints, the\ - \ task will instead be Aborted**\n- If the worker is **RUNNING/PAUSED**, new_state\ - \ may be **STOPPING** to stop.\n Stop marks any currently open Runs in\ - \ the Task as a success and ends the task.\n- If the worker is **RUNNING/PAUSED**,\ - \ new_state may be **ABORTING** to abort.\n Abort marks any currently open\ - \ Runs in the Task as a Failure and ends the task.\n - If reason is set,\ - \ the reason will be passed as the reason for the Run failure.\n- **All other\ - \ transitions return 400: Bad Request**" + summary: Set State + description: >- + Request that the worker is put into a particular state. + + Returns the state of the worker at the end of the call. + + + - **The following transitions are allowed and return 202: Accepted** + + - If the worker is **PAUSED**, new_state may be **RUNNING** to resume. + + - If the worker is **RUNNING**, new_state may be **PAUSED** to pause: + - If defer is False (default): pauses and rewinds to the previous checkpoint + - If defer is True: waits until the next checkpoint to pause + - **If the task has no checkpoints, the task will instead be Aborted** + - If the worker is **RUNNING/PAUSED**, new_state may be **STOPPING** to + stop. + Stop marks any currently open Runs in the Task as a success and ends the task. + - If the worker is **RUNNING/PAUSED**, new_state may be **ABORTING** to + abort. + Abort marks any currently open Runs in the Task as a Failure and ends the task. + - If reason is set, the reason will be passed as the reason for the Run failure. + - **All other transitions return 400: Bad Request** operationId: set_state_worker_state_put requestBody: content: @@ -485,59 +293,259 @@ paths: required: true responses: '202': + description: Successful Response content: application/json: schema: $ref: '#/components/schemas/WorkerState' - description: Successful Response detail: Transition requested '400': description: Bad Request detail: Transition not allowed '422': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' description: Validation Error - summary: Set State - /worker/task: - get: - operationId: get_active_task_worker_task_get - responses: - '200': - content: - application/json: - schema: - $ref: '#/components/schemas/WorkerTask' - description: Successful Response - summary: Get Active Task - put: - description: 'Set a task to active status, the worker should begin it as soon - as possible. - - This will return an error response if the worker is not idle.' - operationId: set_active_task_worker_task_put - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/WorkerTask' - required: true - responses: - '200': - content: - application/json: - schema: - $ref: '#/components/schemas/WorkerTask' - description: Successful Response - '409': - description: Conflict - worker: already active - '422': content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - description: Validation Error - summary: Set Active Task +components: + schemas: + DeviceModel: + properties: + name: + type: string + title: Name + description: Name of the device + protocols: + items: + type: string + type: array + title: Protocols + description: Protocols that a device conforms to, indicating its capabilities + additionalProperties: false + type: object + required: + - name + - protocols + title: DeviceModel + description: Representation of a device + DeviceResponse: + properties: + devices: + items: + $ref: '#/components/schemas/DeviceModel' + type: array + title: Devices + description: Devices available to use in plans + additionalProperties: false + type: object + required: + - devices + title: DeviceResponse + description: Response to a query for devices + EnvironmentResponse: + properties: + initialized: + type: boolean + title: Initialized + description: blueapi context initialized + error_message: + anyOf: + - type: string + minLength: 1 + - type: 'null' + title: Error Message + description: If present - error loading context + additionalProperties: false + type: object + required: + - initialized + title: EnvironmentResponse + description: State of internal environment. + HTTPValidationError: + properties: + detail: + items: + $ref: '#/components/schemas/ValidationError' + type: array + title: Detail + type: object + title: HTTPValidationError + PlanModel: + properties: + name: + type: string + title: Name + description: Name of the plan + description: + anyOf: + - type: string + - type: 'null' + title: Description + description: Docstring of the plan + schema: + anyOf: + - type: object + - type: 'null' + title: Schema + description: Schema of the plan's parameters + additionalProperties: false + type: object + required: + - name + title: PlanModel + description: Representation of a plan + PlanResponse: + properties: + plans: + items: + $ref: '#/components/schemas/PlanModel' + type: array + title: Plans + description: Plans available to use by a worker + additionalProperties: false + type: object + required: + - plans + title: PlanResponse + description: Response to a query for plans + StateChangeRequest: + properties: + new_state: + $ref: '#/components/schemas/WorkerState' + defer: + type: boolean + title: Defer + description: Should worker defer Pausing until the next checkpoint + default: false + reason: + anyOf: + - type: string + - type: 'null' + title: Reason + description: The reason for the current run to be aborted + additionalProperties: false + type: object + required: + - new_state + title: StateChangeRequest + description: Request to change the state of the worker. + Task: + properties: + name: + type: string + title: Name + description: Name of plan to run + params: + type: object + title: Params + description: Values for parameters to plan, if any + additionalProperties: false + type: object + required: + - name + title: Task + description: Task that will run a plan + TaskResponse: + properties: + task_id: + type: string + title: Task Id + description: Unique identifier for the task + additionalProperties: false + type: object + required: + - task_id + title: TaskResponse + description: Acknowledgement that a task has started, includes its ID + TasksListResponse: + properties: + tasks: + items: + $ref: '#/components/schemas/TrackableTask' + type: array + title: Tasks + description: List of tasks + additionalProperties: false + type: object + required: + - tasks + title: TasksListResponse + description: Diagnostic information on the tasks + TrackableTask: + properties: + task_id: + type: string + title: Task Id + task: + title: Task + is_complete: + type: boolean + title: Is Complete + default: false + is_pending: + type: boolean + title: Is Pending + default: true + errors: + items: + type: string + type: array + title: Errors + additionalProperties: false + type: object + required: + - task_id + - task + title: TrackableTask + description: A representation of a task that the worker recognizes + ValidationError: + properties: + loc: + items: + anyOf: + - type: string + - type: integer + type: array + title: Location + msg: + type: string + title: Message + type: + type: string + title: Error Type + type: object + required: + - loc + - msg + - type + title: ValidationError + WorkerState: + type: string + enum: + - IDLE + - RUNNING + - PAUSING + - PAUSED + - HALTING + - STOPPING + - ABORTING + - SUSPENDING + - PANICKED + - UNKNOWN + title: WorkerState + description: The state of the Worker. + WorkerTask: + properties: + task_id: + anyOf: + - type: string + - type: 'null' + title: Task Id + description: The ID of the current task, None if the worker is idle + additionalProperties: false + type: object + required: + - task_id + title: WorkerTask + description: Worker's active task ID, can be None diff --git a/pyproject.toml b/pyproject.toml index ce567eb49..526a898b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "dls-dodal>=1.24.0", "super-state-machine", # See GH issue 553 "GitPython", + "bluesky-stomp>=0.1.0" ] dynamic = ["version"] license.file = "LICENSE" diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 23890069a..f70bec1df 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -49,7 +49,7 @@ def worker() -> TaskWorker: @lru_cache -def messaging_template() -> MessagingTemplate | None: +def messaging_template() -> MessagingTemplate: stomp_config = config().stomp if stomp_config is not None: template = MessagingTemplate.for_broker( @@ -68,14 +68,8 @@ def messaging_template() -> MessagingTemplate | None: task_worker.data_events: event_topic, } ) - try: - template.connect() - return template - except Exception as ex: - logging.exception(msg="Failed to connect to message bus", exc_info=ex) - return None - else: - return None + template.connect() + return template def setup(config: ApplicationConfig) -> None: diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index aac76e1cc..0a5c25070 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -312,8 +312,6 @@ def set_state( and new_state in _ALLOWED_TRANSITIONS[current_state] ): if new_state == WorkerState.PAUSED: - if state_change_request.defer is None: - state_change_request.defer = False runner.run(interface.pause_worker, state_change_request.defer) elif new_state == WorkerState.RUNNING: runner.run(interface.resume_worker) diff --git a/src/blueapi/service/model.py b/src/blueapi/service/model.py index 8a7ffb899..8193a9d48 100644 --- a/src/blueapi/service/model.py +++ b/src/blueapi/service/model.py @@ -128,7 +128,7 @@ class StateChangeRequest(BlueapiBaseModel): """ new_state: WorkerState = Field() - defer: bool | None = Field( + defer: bool = Field( description="Should worker defer Pausing until the next checkpoint", default=False, ) diff --git a/tests/service/test_interface.py b/tests/service/test_interface.py index a053f9242..6a08eafc2 100644 --- a/tests/service/test_interface.py +++ b/tests/service/test_interface.py @@ -1,4 +1,3 @@ -import logging import uuid from dataclasses import dataclass from unittest.mock import MagicMock, patch @@ -274,11 +273,3 @@ def test_get_task_by_id(context_mock: MagicMock): def test_stomp_config(): interface.set_config(ApplicationConfig(stomp=StompConfig())) assert interface.messaging_template() is not None - - -@pytest.mark.stomp -def test_stomp_connection_failure_with_invalid_config(caplog): - caplog.at_level(logging.exception) - interface.set_config(ApplicationConfig(stomp=StompConfig(host="Not Exists"))) - assert interface.messaging_template() is None - assert "Failed to connect to message bus" in caplog.text From db6dcd2a7616dd918a15fb1731e5bb662bdb2cb4 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Fri, 30 Aug 2024 14:25:58 +0100 Subject: [PATCH 14/22] removed activemq and rabbitmq from --- .github/workflows/_test.yml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/.github/workflows/_test.yml b/.github/workflows/_test.yml index 9efe01979..ddf12cef8 100644 --- a/.github/workflows/_test.yml +++ b/.github/workflows/_test.yml @@ -24,18 +24,7 @@ jobs: run: runs-on: ${{ inputs.runs-on }} - services: - activemq: - image: rmohr/activemq:5.14.5-alpine - ports: - - 61613:61613 - steps: - - name: Start RabbitMQ - uses: namoshek/rabbitmq-github-action@v1 - with: - ports: "61614:61613" - plugins: rabbitmq_stomp - name: Checkout uses: actions/checkout@v4 From b820c131d6ab8eae67428583553cb5807417e95d Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Fri, 30 Aug 2024 16:00:59 +0100 Subject: [PATCH 15/22] delete stomp test --- pyproject.toml | 3 --- tests/service/test_interface.py | 7 ------- tests/test_cli.py | 19 ------------------- 3 files changed, 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 526a898b1..545ce0ae4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,9 +91,6 @@ addopts = """ filterwarnings = ["error", "ignore::DeprecationWarning"] # Doctest python code in docs, python code in src docstrings, test functions in tests testpaths = "docs src tests" -markers = [ - "stomp: marks tests that require a message bus configured for stomp" -] asyncio_mode = "auto" [tool.coverage.run] diff --git a/tests/service/test_interface.py b/tests/service/test_interface.py index 6a08eafc2..ab992e8d2 100644 --- a/tests/service/test_interface.py +++ b/tests/service/test_interface.py @@ -5,7 +5,6 @@ import pytest from ophyd.sim import SynAxis -from blueapi.config import ApplicationConfig, StompConfig from blueapi.core import MsgGenerator from blueapi.core.context import BlueskyContext from blueapi.service import interface @@ -267,9 +266,3 @@ def test_get_task_by_id(context_mock: MagicMock): is_pending=True, errors=[], ) - - -@pytest.mark.stomp -def test_stomp_config(): - interface.set_config(ApplicationConfig(stomp=StompConfig())) - assert interface.messaging_template() is not None diff --git a/tests/test_cli.py b/tests/test_cli.py index da09f5be5..950010bd9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -136,25 +136,6 @@ def test_cannot_run_plans_without_stomp_config(runner: CliRunner): ) -@pytest.mark.stomp -def test_valid_stomp_config_for_listener(runner: CliRunner): - result = runner.invoke( - main, - [ - "-c", - "tests/example_yaml/valid_stomp_config.yaml", - "controller", - "listen", - ], - input="\n", - ) - assert ( - "Subscribing to all bluesky events from localhost:61613\nPress enter to exit" - in result.output - ) - assert result.exit_code == 0 - - @responses.activate def test_get_env( runner: CliRunner, From ed41e6e75c048ee56bf93794d0e859ef82397992 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Mon, 2 Sep 2024 12:17:06 +0100 Subject: [PATCH 16/22] added stomp test --- tests/test_cli.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/test_cli.py b/tests/test_cli.py index 950010bd9..e6cfb8057 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -136,6 +136,21 @@ def test_cannot_run_plans_without_stomp_config(runner: CliRunner): ) +def test_valid_stomp_config_for_listener(runner: CliRunner): + result = runner.invoke( + main, + [ + "-c", + "tests/example_yaml/valid_stomp_config.yaml", + "controller", + "listen", + ], + input="\n", + ) + assert "Subscribing to all bluesky events from localhost:61613" in result.output + assert result.exit_code == 1 + + @responses.activate def test_get_env( runner: CliRunner, From cfba6ed99638241be837d342a3ddee6f10e921d6 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Mon, 2 Sep 2024 12:30:42 +0100 Subject: [PATCH 17/22] added rabbitmq to CI --- .github/workflows/_test.yml | 5 +++++ tests/test_cli.py | 7 +++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/.github/workflows/_test.yml b/.github/workflows/_test.yml index ddf12cef8..c2fb1738b 100644 --- a/.github/workflows/_test.yml +++ b/.github/workflows/_test.yml @@ -25,6 +25,11 @@ jobs: runs-on: ${{ inputs.runs-on }} steps: + - name: Start RabbitMQ + uses: namoshek/rabbitmq-github-action@v1 + with: + ports: "61614:61613" + plugins: rabbitmq_stomp - name: Checkout uses: actions/checkout@v4 diff --git a/tests/test_cli.py b/tests/test_cli.py index e6cfb8057..ea1554843 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -147,8 +147,11 @@ def test_valid_stomp_config_for_listener(runner: CliRunner): ], input="\n", ) - assert "Subscribing to all bluesky events from localhost:61613" in result.output - assert result.exit_code == 1 + assert ( + "Subscribing to all bluesky events from localhost:61613/nPress enter to exit" + in result.output + ) + assert result.exit_code == 0 @responses.activate From d3ab4e75e946597c52e8bb7b99e6b64f5170ad49 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Mon, 2 Sep 2024 12:44:07 +0100 Subject: [PATCH 18/22] fixed typo --- tests/test_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index ea1554843..a55013a34 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -148,7 +148,7 @@ def test_valid_stomp_config_for_listener(runner: CliRunner): input="\n", ) assert ( - "Subscribing to all bluesky events from localhost:61613/nPress enter to exit" + "Subscribing to all bluesky events from localhost:61613\nPress enter to exit" in result.output ) assert result.exit_code == 0 From 8e45cff5090dea43728daa7a7e572f4410a77815 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Tue, 3 Sep 2024 10:15:10 +0100 Subject: [PATCH 19/22] added mock connection --- .github/workflows/_test.yml | 6 ------ tests/test_cli.py | 20 +++++++++++++++++++- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/.github/workflows/_test.yml b/.github/workflows/_test.yml index c2fb1738b..8fc0c8ee3 100644 --- a/.github/workflows/_test.yml +++ b/.github/workflows/_test.yml @@ -25,12 +25,6 @@ jobs: runs-on: ${{ inputs.runs-on }} steps: - - name: Start RabbitMQ - uses: namoshek/rabbitmq-github-action@v1 - with: - ports: "61614:61613" - plugins: rabbitmq_stomp - - name: Checkout uses: actions/checkout@v4 with: diff --git a/tests/test_cli.py b/tests/test_cli.py index a55013a34..7a9297302 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -8,10 +8,12 @@ import pytest import responses +from bluesky_stomp.messaging import MessagingTemplate from click.testing import CliRunner from pydantic import BaseModel, ValidationError from requests.exceptions import ConnectionError from responses import matchers +from stomp.connect import StompConnection11 as Connection from blueapi import __version__ from blueapi.cli.cli import main @@ -28,6 +30,16 @@ ) +@pytest.fixture +def mock_connection() -> Mock: + return Mock(spec=Connection) + + +@pytest.fixture +def template(mock_connection: Mock) -> MessagingTemplate: + return MessagingTemplate(conn=mock_connection) + + @pytest.fixture def runner(): return CliRunner() @@ -136,7 +148,13 @@ def test_cannot_run_plans_without_stomp_config(runner: CliRunner): ) -def test_valid_stomp_config_for_listener(runner: CliRunner): +@patch("blueapi.cli.cli.MessagingTemplate.for_broker") +def test_valid_stomp_config_for_listener( + template: MessagingTemplate, + runner: CliRunner, + mock_connection: Mock, +): + mock_connection.is_connected.return_value = True result = runner.invoke( main, [ From 3d2d53b595e87479da6d31265b897e20c05ecedb Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Tue, 3 Sep 2024 10:48:46 +0100 Subject: [PATCH 20/22] changed patch --- tests/test_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 7a9297302..e9f71a69f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -148,7 +148,7 @@ def test_cannot_run_plans_without_stomp_config(runner: CliRunner): ) -@patch("blueapi.cli.cli.MessagingTemplate.for_broker") +@patch("blueapi.cli.cli.MessagingTemplate") def test_valid_stomp_config_for_listener( template: MessagingTemplate, runner: CliRunner, From 993270252e0170ee54451b5ce2e5fe939f51d5c0 Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Tue, 3 Sep 2024 11:11:43 +0100 Subject: [PATCH 21/22] added mock test for interface --- tests/service/test_interface.py | 25 ++++++++++++++++++++++++- tests/test_cli.py | 4 ++-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/service/test_interface.py b/tests/service/test_interface.py index ab992e8d2..002d2d308 100644 --- a/tests/service/test_interface.py +++ b/tests/service/test_interface.py @@ -1,10 +1,13 @@ import uuid from dataclasses import dataclass -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch import pytest +from bluesky_stomp.messaging import MessagingTemplate from ophyd.sim import SynAxis +from stomp.connect import StompConnection11 as Connection +from blueapi.config import ApplicationConfig, StompConfig from blueapi.core import MsgGenerator from blueapi.core.context import BlueskyContext from blueapi.service import interface @@ -14,6 +17,18 @@ from blueapi.worker.task_worker import TrackableTask +@pytest.fixture +def mock_connection() -> Mock: + return Mock(spec=Connection) + + +@pytest.fixture +def template(mock_connection: Mock) -> MessagingTemplate: + template = MessagingTemplate(conn=mock_connection) + template.disconnect = MagicMock() + return template + + @pytest.fixture(autouse=True) def ensure_worker_stopped(): """This saves every test having to call this at the end. @@ -266,3 +281,11 @@ def test_get_task_by_id(context_mock: MagicMock): is_pending=True, errors=[], ) + + +def test_stomp_config(template: MessagingTemplate): + with patch( + "blueapi.service.interface.MessagingTemplate.for_broker", return_value=template + ): + interface.set_config(ApplicationConfig(stomp=StompConfig())) + assert interface.messaging_template() is not None diff --git a/tests/test_cli.py b/tests/test_cli.py index e9f71a69f..8c75cced7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -166,8 +166,8 @@ def test_valid_stomp_config_for_listener( input="\n", ) assert ( - "Subscribing to all bluesky events from localhost:61613\nPress enter to exit" - in result.output + result.output + == "Subscribing to all bluesky events from localhost:61613\nPress enter to exit" ) assert result.exit_code == 0 From 7437ea534236136e2cee1dbed1b95df87ba80b3a Mon Sep 17 00:00:00 2001 From: Zoheb Shaikh Date: Thu, 5 Sep 2024 11:21:02 +0100 Subject: [PATCH 22/22] made openapi yaml changes --- docs/reference/openapi.yaml | 802 +++++++++++++++---------------- src/blueapi/service/interface.py | 8 +- 2 files changed, 401 insertions(+), 409 deletions(-) diff --git a/docs/reference/openapi.yaml b/docs/reference/openapi.yaml index 64f5d4c9e..a9fdf63cc 100644 --- a/docs/reference/openapi.yaml +++ b/docs/reference/openapi.yaml @@ -1,289 +1,479 @@ -openapi: 3.1.0 +components: + schemas: + DeviceModel: + additionalProperties: false + description: Representation of a device + properties: + name: + description: Name of the device + title: Name + type: string + protocols: + description: Protocols that a device conforms to, indicating its capabilities + items: + type: string + title: Protocols + type: array + required: + - name + - protocols + title: DeviceModel + type: object + DeviceResponse: + additionalProperties: false + description: Response to a query for devices + properties: + devices: + description: Devices available to use in plans + items: + $ref: '#/components/schemas/DeviceModel' + title: Devices + type: array + required: + - devices + title: DeviceResponse + type: object + EnvironmentResponse: + additionalProperties: false + description: State of internal environment. + properties: + error_message: + anyOf: + - minLength: 1 + type: string + - type: 'null' + description: If present - error loading context + title: Error Message + initialized: + description: blueapi context initialized + title: Initialized + type: boolean + required: + - initialized + title: EnvironmentResponse + type: object + HTTPValidationError: + properties: + detail: + items: + $ref: '#/components/schemas/ValidationError' + title: Detail + type: array + title: HTTPValidationError + type: object + PlanModel: + additionalProperties: false + description: Representation of a plan + properties: + description: + anyOf: + - type: string + - type: 'null' + description: Docstring of the plan + title: Description + name: + description: Name of the plan + title: Name + type: string + schema: + anyOf: + - type: object + - type: 'null' + description: Schema of the plan's parameters + title: Schema + required: + - name + title: PlanModel + type: object + PlanResponse: + additionalProperties: false + description: Response to a query for plans + properties: + plans: + description: Plans available to use by a worker + items: + $ref: '#/components/schemas/PlanModel' + title: Plans + type: array + required: + - plans + title: PlanResponse + type: object + StateChangeRequest: + additionalProperties: false + description: Request to change the state of the worker. + properties: + defer: + default: false + description: Should worker defer Pausing until the next checkpoint + title: Defer + type: boolean + new_state: + $ref: '#/components/schemas/WorkerState' + reason: + anyOf: + - type: string + - type: 'null' + description: The reason for the current run to be aborted + title: Reason + required: + - new_state + title: StateChangeRequest + type: object + Task: + additionalProperties: false + description: Task that will run a plan + properties: + name: + description: Name of plan to run + title: Name + type: string + params: + description: Values for parameters to plan, if any + title: Params + type: object + required: + - name + title: Task + type: object + TaskResponse: + additionalProperties: false + description: Acknowledgement that a task has started, includes its ID + properties: + task_id: + description: Unique identifier for the task + title: Task Id + type: string + required: + - task_id + title: TaskResponse + type: object + TasksListResponse: + additionalProperties: false + description: Diagnostic information on the tasks + properties: + tasks: + description: List of tasks + items: + $ref: '#/components/schemas/TrackableTask' + title: Tasks + type: array + required: + - tasks + title: TasksListResponse + type: object + TrackableTask: + additionalProperties: false + description: A representation of a task that the worker recognizes + properties: + errors: + items: + type: string + title: Errors + type: array + is_complete: + default: false + title: Is Complete + type: boolean + is_pending: + default: true + title: Is Pending + type: boolean + task: + title: Task + task_id: + title: Task Id + type: string + required: + - task_id + - task + title: TrackableTask + type: object + ValidationError: + properties: + loc: + items: + anyOf: + - type: string + - type: integer + title: Location + type: array + msg: + title: Message + type: string + type: + title: Error Type + type: string + required: + - loc + - msg + - type + title: ValidationError + type: object + WorkerState: + description: The state of the Worker. + enum: + - IDLE + - RUNNING + - PAUSING + - PAUSED + - HALTING + - STOPPING + - ABORTING + - SUSPENDING + - PANICKED + - UNKNOWN + title: WorkerState + type: string + WorkerTask: + additionalProperties: false + description: Worker's active task ID, can be None + properties: + task_id: + anyOf: + - type: string + - type: 'null' + description: The ID of the current task, None if the worker is idle + title: Task Id + required: + - task_id + title: WorkerTask + type: object info: title: BlueAPI Control version: 0.0.5 +openapi: 3.1.0 paths: - /environment: + /devices: get: - summary: Get Environment - description: Get the current state of the environment, i.e. initialization state. - operationId: get_environment_environment_get + description: Retrieve information about all available devices. + operationId: get_devices_devices_get + responses: + '200': + content: + application/json: + schema: + $ref: '#/components/schemas/DeviceResponse' + description: Successful Response + summary: Get Devices + /devices/{name}: + get: + description: Retrieve information about a devices by its (unique) name. + operationId: get_device_by_name_devices__name__get + parameters: + - in: path + name: name + required: true + schema: + title: Name + type: string responses: '200': + content: + application/json: + schema: + $ref: '#/components/schemas/DeviceModel' description: Successful Response + '422': content: application/json: schema: - $ref: '#/components/schemas/EnvironmentResponse' + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Get Device By Name + /environment: delete: - summary: Delete Environment - description: >- - Delete the current environment, causing internal components to be - reloaded. + description: Delete the current environment, causing internal components to + be reloaded. operationId: delete_environment_environment_delete responses: '200': + content: + application/json: + schema: + $ref: '#/components/schemas/EnvironmentResponse' description: Successful Response + summary: Delete Environment + get: + description: Get the current state of the environment, i.e. initialization state. + operationId: get_environment_environment_get + responses: + '200': content: application/json: schema: $ref: '#/components/schemas/EnvironmentResponse' + description: Successful Response + summary: Get Environment /plans: get: - summary: Get Plans description: Retrieve information about all available plans. operationId: get_plans_plans_get responses: '200': - description: Successful Response content: application/json: schema: $ref: '#/components/schemas/PlanResponse' + description: Successful Response + summary: Get Plans /plans/{name}: get: - summary: Get Plan By Name description: Retrieve information about a plan by its (unique) name. operationId: get_plan_by_name_plans__name__get parameters: - - name: name - in: path - required: true - schema: - type: string - title: Name + - in: path + name: name + required: true + schema: + title: Name + type: string responses: '200': - description: Successful Response content: application/json: schema: $ref: '#/components/schemas/PlanModel' + description: Successful Response '422': - description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /devices: - get: - summary: Get Devices - description: Retrieve information about all available devices. - operationId: get_devices_devices_get - responses: - '200': - description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/DeviceResponse' - /devices/{name}: + description: Validation Error + summary: Get Plan By Name + /tasks: get: - summary: Get Device By Name - description: Retrieve information about a devices by its (unique) name. - operationId: get_device_by_name_devices__name__get + description: 'Retrieve tasks based on their status. + + The status of a newly created task is ''unstarted''.' + operationId: get_tasks_tasks_get parameters: - - name: name - in: path - required: true - schema: - type: string - title: Name + - in: query + name: task_status + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Task Status responses: '200': - description: Successful Response content: application/json: schema: - $ref: '#/components/schemas/DeviceModel' + $ref: '#/components/schemas/TasksListResponse' + description: Successful Response '422': - description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' - /tasks: + description: Validation Error + summary: Get Tasks post: - summary: Submit Task description: Submit a task to the worker. operationId: submit_task_tasks_post requestBody: - required: true content: application/json: - schema: - $ref: '#/components/schemas/Task' example: name: count params: detectors: - - x + - x + schema: + $ref: '#/components/schemas/Task' + required: true responses: '201': - description: Successful Response content: application/json: schema: $ref: '#/components/schemas/TaskResponse' - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' - get: - summary: Get Tasks - description: |- - Retrieve tasks based on their status. - The status of a newly created task is 'unstarted'. - operationId: get_tasks_tasks_get - parameters: - - name: task_status - in: query - required: false - schema: - anyOf: - - type: string - - type: 'null' - title: Task Status - responses: - '200': description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/TasksListResponse' '422': - description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Submit Task /tasks/{task_id}: delete: - summary: Delete Submitted Task operationId: delete_submitted_task_tasks__task_id__delete parameters: - - name: task_id - in: path - required: true - schema: - type: string - title: Task Id + - in: path + name: task_id + required: true + schema: + title: Task Id + type: string responses: '200': - description: Successful Response content: application/json: schema: $ref: '#/components/schemas/TaskResponse' + description: Successful Response '422': - description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Delete Submitted Task get: - summary: Get Task description: Retrieve a task operationId: get_task_tasks__task_id__get parameters: - - name: task_id - in: path - required: true - schema: - type: string - title: Task Id + - in: path + name: task_id + required: true + schema: + title: Task Id + type: string responses: '200': - description: Successful Response content: application/json: schema: $ref: '#/components/schemas/TrackableTask' - '422': - description: Validation Error - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' - /worker/task: - get: - summary: Get Active Task - operationId: get_active_task_worker_task_get - responses: - '200': description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/WorkerTask' - put: - summary: Set Active Task - description: >- - Set a task to active status, the worker should begin it as soon as - possible. - - This will return an error response if the worker is not idle. - operationId: set_active_task_worker_task_put - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/WorkerTask' - required: true - responses: - '200': - description: Successful Response - content: - application/json: - schema: - $ref: '#/components/schemas/WorkerTask' - '409': - description: Conflict - worker: already active '422': - description: Validation Error content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + summary: Get Task /worker/state: get: - summary: Get State description: Get the State of the Worker operationId: get_state_worker_state_get responses: '200': - description: Successful Response content: application/json: schema: $ref: '#/components/schemas/WorkerState' + description: Successful Response + summary: Get State put: - summary: Set State - description: >- - Request that the worker is put into a particular state. - - Returns the state of the worker at the end of the call. - - - - **The following transitions are allowed and return 202: Accepted** - - - If the worker is **PAUSED**, new_state may be **RUNNING** to resume. - - - If the worker is **RUNNING**, new_state may be **PAUSED** to pause: - - If defer is False (default): pauses and rewinds to the previous checkpoint - - If defer is True: waits until the next checkpoint to pause - - **If the task has no checkpoints, the task will instead be Aborted** - - If the worker is **RUNNING/PAUSED**, new_state may be **STOPPING** to - stop. - Stop marks any currently open Runs in the Task as a success and ends the task. - - If the worker is **RUNNING/PAUSED**, new_state may be **ABORTING** to - abort. - Abort marks any currently open Runs in the Task as a Failure and ends the task. - - If reason is set, the reason will be passed as the reason for the Run failure. - - **All other transitions return 400: Bad Request** + description: "Request that the worker is put into a particular state.\nReturns\ + \ the state of the worker at the end of the call.\n\n- **The following transitions\ + \ are allowed and return 202: Accepted**\n- If the worker is **PAUSED**, new_state\ + \ may be **RUNNING** to resume.\n- If the worker is **RUNNING**, new_state\ + \ may be **PAUSED** to pause:\n - If defer is False (default): pauses and\ + \ rewinds to the previous checkpoint\n - If defer is True: waits until\ + \ the next checkpoint to pause\n - **If the task has no checkpoints, the\ + \ task will instead be Aborted**\n- If the worker is **RUNNING/PAUSED**, new_state\ + \ may be **STOPPING** to stop.\n Stop marks any currently open Runs in\ + \ the Task as a success and ends the task.\n- If the worker is **RUNNING/PAUSED**,\ + \ new_state may be **ABORTING** to abort.\n Abort marks any currently open\ + \ Runs in the Task as a Failure and ends the task.\n - If reason is set,\ + \ the reason will be passed as the reason for the Run failure.\n- **All other\ + \ transitions return 400: Bad Request**" operationId: set_state_worker_state_put requestBody: content: @@ -293,259 +483,59 @@ paths: required: true responses: '202': - description: Successful Response content: application/json: schema: $ref: '#/components/schemas/WorkerState' + description: Successful Response detail: Transition requested '400': description: Bad Request detail: Transition not allowed '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' description: Validation Error + summary: Set State + /worker/task: + get: + operationId: get_active_task_worker_task_get + responses: + '200': + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerTask' + description: Successful Response + summary: Get Active Task + put: + description: 'Set a task to active status, the worker should begin it as soon + as possible. + + This will return an error response if the worker is not idle.' + operationId: set_active_task_worker_task_put + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerTask' + required: true + responses: + '200': + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerTask' + description: Successful Response + '409': + description: Conflict + worker: already active + '422': content: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' -components: - schemas: - DeviceModel: - properties: - name: - type: string - title: Name - description: Name of the device - protocols: - items: - type: string - type: array - title: Protocols - description: Protocols that a device conforms to, indicating its capabilities - additionalProperties: false - type: object - required: - - name - - protocols - title: DeviceModel - description: Representation of a device - DeviceResponse: - properties: - devices: - items: - $ref: '#/components/schemas/DeviceModel' - type: array - title: Devices - description: Devices available to use in plans - additionalProperties: false - type: object - required: - - devices - title: DeviceResponse - description: Response to a query for devices - EnvironmentResponse: - properties: - initialized: - type: boolean - title: Initialized - description: blueapi context initialized - error_message: - anyOf: - - type: string - minLength: 1 - - type: 'null' - title: Error Message - description: If present - error loading context - additionalProperties: false - type: object - required: - - initialized - title: EnvironmentResponse - description: State of internal environment. - HTTPValidationError: - properties: - detail: - items: - $ref: '#/components/schemas/ValidationError' - type: array - title: Detail - type: object - title: HTTPValidationError - PlanModel: - properties: - name: - type: string - title: Name - description: Name of the plan - description: - anyOf: - - type: string - - type: 'null' - title: Description - description: Docstring of the plan - schema: - anyOf: - - type: object - - type: 'null' - title: Schema - description: Schema of the plan's parameters - additionalProperties: false - type: object - required: - - name - title: PlanModel - description: Representation of a plan - PlanResponse: - properties: - plans: - items: - $ref: '#/components/schemas/PlanModel' - type: array - title: Plans - description: Plans available to use by a worker - additionalProperties: false - type: object - required: - - plans - title: PlanResponse - description: Response to a query for plans - StateChangeRequest: - properties: - new_state: - $ref: '#/components/schemas/WorkerState' - defer: - type: boolean - title: Defer - description: Should worker defer Pausing until the next checkpoint - default: false - reason: - anyOf: - - type: string - - type: 'null' - title: Reason - description: The reason for the current run to be aborted - additionalProperties: false - type: object - required: - - new_state - title: StateChangeRequest - description: Request to change the state of the worker. - Task: - properties: - name: - type: string - title: Name - description: Name of plan to run - params: - type: object - title: Params - description: Values for parameters to plan, if any - additionalProperties: false - type: object - required: - - name - title: Task - description: Task that will run a plan - TaskResponse: - properties: - task_id: - type: string - title: Task Id - description: Unique identifier for the task - additionalProperties: false - type: object - required: - - task_id - title: TaskResponse - description: Acknowledgement that a task has started, includes its ID - TasksListResponse: - properties: - tasks: - items: - $ref: '#/components/schemas/TrackableTask' - type: array - title: Tasks - description: List of tasks - additionalProperties: false - type: object - required: - - tasks - title: TasksListResponse - description: Diagnostic information on the tasks - TrackableTask: - properties: - task_id: - type: string - title: Task Id - task: - title: Task - is_complete: - type: boolean - title: Is Complete - default: false - is_pending: - type: boolean - title: Is Pending - default: true - errors: - items: - type: string - type: array - title: Errors - additionalProperties: false - type: object - required: - - task_id - - task - title: TrackableTask - description: A representation of a task that the worker recognizes - ValidationError: - properties: - loc: - items: - anyOf: - - type: string - - type: integer - type: array - title: Location - msg: - type: string - title: Message - type: - type: string - title: Error Type - type: object - required: - - loc - - msg - - type - title: ValidationError - WorkerState: - type: string - enum: - - IDLE - - RUNNING - - PAUSING - - PAUSED - - HALTING - - STOPPING - - ABORTING - - SUSPENDING - - PANICKED - - UNKNOWN - title: WorkerState - description: The state of the Worker. - WorkerTask: - properties: - task_id: - anyOf: - - type: string - - type: 'null' - title: Task Id - description: The ID of the current task, None if the worker is idle - additionalProperties: false - type: object - required: - - task_id - title: WorkerTask - description: Worker's active task ID, can be None + description: Validation Error + summary: Set Active Task diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index f70bec1df..72d546831 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -49,7 +49,7 @@ def worker() -> TaskWorker: @lru_cache -def messaging_template() -> MessagingTemplate: +def messaging_template() -> MessagingTemplate | None: stomp_config = config().stomp if stomp_config is not None: template = MessagingTemplate.for_broker( @@ -70,6 +70,8 @@ def messaging_template() -> MessagingTemplate: ) template.connect() return template + else: + return None def setup(config: ApplicationConfig) -> None: @@ -160,9 +162,9 @@ def get_worker_state() -> WorkerState: return worker().state -def pause_worker(defer: bool) -> None: +def pause_worker(defer: bool | None) -> None: """Command the worker to pause""" - worker().pause(defer) + worker().pause(defer or False) def resume_worker() -> None: