Skip to content

Commit

Permalink
Merge pull request #300 from DiamondLightSource/298_disconnect_zocalo…
Browse files Browse the repository at this point in the history
…_on_unstage

#298 Disconnect from zocalo on unstaging results device, to clear subscriptions
  • Loading branch information
DominicOram authored Jan 22, 2024
2 parents b999320 + 78cfb35 commit 7815d43
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 15 deletions.
4 changes: 4 additions & 0 deletions src/dodal/devices/zocalo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
ZocaloTrigger,
)
from dodal.devices.zocalo.zocalo_results import (
NoResultsFromZocalo,
NoZocaloSubscription,
XrcResult,
ZocaloResults,
get_processing_result,
Expand All @@ -13,6 +15,8 @@
"ZocaloTrigger",
"get_processing_result",
"ZOCALO_READING_PLAN_NAME",
"NoResultsFromZocalo",
"NoZocaloSubscription",
]

ZOCALO_READING_PLAN_NAME = "zocalo reading"
51 changes: 40 additions & 11 deletions src/dodal/devices/zocalo/zocalo_results.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
from collections import OrderedDict
from enum import Enum
from queue import Empty, Queue
from typing import Any, Generator, Sequence, Tuple, TypedDict, Union
from typing import Any, Generator, Optional, Sequence, Tuple, TypedDict, Union

import bluesky.plan_stubs as bps
import numpy as np
Expand All @@ -16,21 +17,26 @@
from dodal.devices.zocalo.zocalo_interaction import _get_zocalo_connection
from dodal.log import LOGGER

DEFAULT_TIMEOUT = 180


class NoResultsFromZocalo(Exception):
pass


class NoZocaloSubscription(Exception):
pass


class SortKeys(str, Enum):
max_count = "max_count"
total_count = "total_count"
n_voxels = "n_voxels"


DEFAULT_TIMEOUT = 180
DEFAULT_SORT_KEY = SortKeys.max_count
ZOCALO_READING_PLAN_NAME = "zocalo reading"
CLEAR_QUEUE_WAIT_S = 2.0
ZOCALO_STAGE_GROUP = "clear zocalo queue"


class XrcResult(TypedDict):
Expand Down Expand Up @@ -71,7 +77,7 @@ def __init__(
self.timeout_s = timeout_s
self._prefix = prefix
self._raw_results_received: Queue = Queue()
self._subscription_run: bool = False
self.subscription: Optional[int] = None

self.results = create_soft_signal_r(list[XrcResult], "results", self.name)
self.centres_of_mass = create_soft_signal_r(
Expand Down Expand Up @@ -106,17 +112,40 @@ def _clear_old_results(self):
LOGGER.info("Clearing queue")
self._raw_results_received = Queue()

def stage(self):
@AsyncStatus.wrap
async def stage(self):
"""Stages the Zocalo device by: subscribing to the queue, doing a background
sleep for a few seconds to wait for any stale messages to be recieved, then
clearing the queue. Plans using this device should wait on ZOCALO_STAGE_GROUP
before triggering processing for the experiment"""

LOGGER.info("Subscribing to results queue")
self._subscribe_to_results()
await asyncio.sleep(CLEAR_QUEUE_WAIT_S)
self._clear_old_results()

@AsyncStatus.wrap
async def unstage(self):
transport = _get_zocalo_connection(self.zocalo_environment)
if self.subscription:
LOGGER.info("Disconnecting from Zocalo")
transport.disconnect()
self.subscription = None

@AsyncStatus.wrap
async def trigger(self):
"""Returns an AsyncStatus waiting for results to be received from Zocalo."""
LOGGER.info("Zocalo trigger called")
if not self._subscription_run:
LOGGER.info("subscription not initialised, subscribing to queue")
self._subscribe_to_results()
self._subscription_run = True
msg = (
"This device must be staged to subscribe to the Zocalo queue, and "
"unstaged at the end of the experiment to avoid consuming results not "
"meant for it"
)
if not self.subscription:
LOGGER.warning(
msg # AsyncStatus exception messages are poorly propagated, remove after https://github.com/bluesky/ophyd-async/issues/103
)
raise NoZocaloSubscription(msg)

try:
LOGGER.info(
Expand Down Expand Up @@ -200,14 +229,14 @@ def _receive_result(
{"results": results, "ispyb_ids": recipe_parameters}
)

subscription = workflows.recipe.wrap_subscribe(
self.subscription = workflows.recipe.wrap_subscribe(
transport,
self.channel,
_receive_result,
acknowledgement=True,
allow_non_recipe_messages=False,
)
LOGGER.info(f"Made zocalo queue subscription: {bool(subscription)}.")
LOGGER.info(f"Made zocalo queue subscription: {self.subscription}.")


def get_processing_result(
Expand Down
61 changes: 60 additions & 1 deletion tests/devices/system_tests/test_zocalo_results.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
import asyncio

import bluesky.plan_stubs as bps
import pytest
import pytest_asyncio
from bluesky.preprocessors import stage_decorator
from bluesky.run_engine import RunEngine
from bluesky.utils import FailedStatus

from dodal.devices.zocalo import XrcResult, ZocaloResults, ZocaloTrigger
import dodal.devices.zocalo.zocalo_results
from dodal.devices.zocalo import (
NoZocaloSubscription,
XrcResult,
ZocaloResults,
ZocaloTrigger,
)

TEST_RESULT_LARGE: XrcResult = {
"centre_of_mass": [1, 2, 3],
Expand All @@ -25,6 +35,7 @@ async def zocalo_device():
@pytest.mark.s03
@pytest.mark.asyncio
async def test_read_results_from_fake_zocalo(zocalo_device: ZocaloResults):
zocalo_device._subscribe_to_results()
zc = ZocaloTrigger("dev_artemis")
zc.run_start(0)
zc.run_end(0)
Expand All @@ -40,3 +51,51 @@ def plan():

results = await zocalo_device.read()
assert results["zocalo-results"]["value"][0] == TEST_RESULT_LARGE


@pytest.mark.s03
@pytest.mark.asyncio
async def test_stage_unstage_controls_read_results_from_fake_zocalo(
zocalo_device: ZocaloResults,
):
dodal.devices.zocalo.zocalo_results.CLEAR_QUEUE_WAIT_S = 0.05
zc = ZocaloTrigger("dev_artemis")
zocalo_device.timeout_s = 5

def plan():
yield from bps.open_run()
zc.run_start(0)
zc.run_end(0)
yield from bps.sleep(0.15)
yield from bps.trigger_and_read([zocalo_device])
yield from bps.close_run()

@stage_decorator([zocalo_device])
def plan_with_stage():
yield from plan()

# With stage, the plan should run normally
RE = RunEngine()
RE(plan_with_stage())
assert not zocalo_device.subscription
# Without stage, the plan should run fail because we didn't connect to Zocalo
with pytest.raises(FailedStatus) as e:
RE(plan())
assert isinstance(e.value.__cause__, NoZocaloSubscription)
# And the results generated by triggering in plan() shouldn't make it to the zocalo device
assert zocalo_device._raw_results_received.empty()

# But we triggered it, so the results should be in the RMQ queue
zocalo_device._subscribe_to_results()
await asyncio.sleep(1)

results = await zocalo_device.read()
assert results["zocalo-results"]["value"][0] == TEST_RESULT_LARGE
await zocalo_device.unstage()

# Generating some more results leaves them at RMQ
with pytest.raises(FailedStatus) as e:
RE(plan())
# But waiting for stage should clear them
RE(bps.stage(zocalo_device, wait=True))
assert zocalo_device._raw_results_received.empty()
9 changes: 6 additions & 3 deletions tests/devices/unit_tests/test_zocalo_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from dodal.devices.zocalo.zocalo_results import (
ZOCALO_READING_PLAN_NAME,
NoResultsFromZocalo,
NoZocaloSubscription,
XrcResult,
ZocaloResults,
get_processing_result,
Expand Down Expand Up @@ -176,11 +176,12 @@ def plan():
RE(plan())


@pytest.mark.asyncio
@patch(
"dodal.devices.zocalo.zocalo_results.workflows.recipe.wrap_subscribe", autospec=True
)
@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
def test_subscribe_only_called_once_on_first_trigger(
async def test_subscribe_only_on_called_stage(
mock_connection: MagicMock,
mock_wrap_subscribe: MagicMock,
):
Expand All @@ -189,6 +190,8 @@ def test_subscribe_only_called_once_on_first_trigger(
name="zocalo", zocalo_environment="dev_artemis", timeout_s=2
)
mock_wrap_subscribe.assert_not_called()
await zocalo_results.stage()
mock_wrap_subscribe.assert_called_once()
zocalo_results._raw_results_received.put([])
RE(bps.trigger(zocalo_results))
mock_wrap_subscribe.assert_called_once()
Expand All @@ -212,4 +215,4 @@ async def test_when_exception_caused_by_zocalo_message_then_exception_propagated
with pytest.raises(FailedStatus) as e:
RE(bps.trigger(zocalo_results, wait=True))

assert isinstance(e.value.__cause__, NoResultsFromZocalo)
assert isinstance(e.value.__cause__, NoZocaloSubscription)

0 comments on commit 7815d43

Please sign in to comment.