Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZocaloResults: add parameter to use results from GPU #763

Merged
merged 22 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
22e7263
ZocaloResults: add parameter to use results from GPU
olliesilvester Aug 29, 2024
92af2e9
Warn if CPU results arrived before GPU results
olliesilvester Aug 29, 2024
90e8385
Update src/dodal/devices/zocalo/zocalo_results.py
olliesilvester Aug 29, 2024
4871d73
Correct typo
olliesilvester Aug 29, 2024
f181325
Update with new criteria and add tests
olliesilvester Sep 2, 2024
0632f5a
more tests for codecov
olliesilvester Sep 2, 2024
9b1b4c9
remove extra comments
olliesilvester Sep 3, 2024
99b8f49
Change toggle name
olliesilvester Sep 3, 2024
149c2bd
use deepdiff to get differences between gpu and cpu results
olliesilvester Sep 5, 2024
984a134
Review response and simplify deepdiff
olliesilvester Sep 6, 2024
560c600
Spell better
olliesilvester Sep 6, 2024
fc3bd5c
improve test
olliesilvester Sep 6, 2024
7d6c7f8
Merge branch 'main' into 559_zocalo_results_multiple_sources
olliesilvester Sep 6, 2024
bd35cab
better comment
olliesilvester Sep 6, 2024
71f6e13
Merge remote-tracking branch 'origin/main' into 559_zocalo_results_mu…
olliesilvester Sep 6, 2024
0c0b45d
fix linting
olliesilvester Sep 6, 2024
ad506a1
Move deepdiff to regular dependancy
olliesilvester Sep 6, 2024
362a89e
Always use GPU results
olliesilvester Sep 19, 2024
286b7bc
Merge remote-tracking branch 'origin/main' into 559_zocalo_results_mu…
olliesilvester Sep 19, 2024
de5cc4b
Add new test
olliesilvester Sep 19, 2024
ff848a6
Merge branch 'main' into 559_zocalo_results_multiple_sources
olliesilvester Sep 20, 2024
45a3bc1
Merge branch 'main' into 559_zocalo_results_multiple_sources
DominicOram Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies = [
"aiofiles",
"aiohttp",
"redis",
"deepdiff",
]

dynamic = ["version"]
Expand Down
124 changes: 114 additions & 10 deletions src/dodal/devices/zocalo/zocalo_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import workflows.recipe
import workflows.transport
from bluesky.protocols import Descriptor, Triggerable
from deepdiff import DeepDiff
from numpy.typing import NDArray
from ophyd_async.core import (
AsyncStatus,
Expand Down Expand Up @@ -37,6 +38,11 @@ class SortKeys(str, Enum):
n_voxels = "n_voxels"


class ZocaloSource(str, Enum):
CPU = "CPU"
GPU = "GPU"


DEFAULT_TIMEOUT = 180
DEFAULT_SORT_KEY = SortKeys.max_count
ZOCALO_READING_PLAN_NAME = "zocalo reading"
Expand All @@ -60,12 +66,50 @@ def bbox_size(result: XrcResult):
]


def get_dict_differences(
olliesilvester marked this conversation as resolved.
Show resolved Hide resolved
dict1: dict, dict1_source: str, dict2: dict, dict2_source: str
) -> str | None:
"""Returns a string containing dict1 and dict2 if there are differences between them, greater than a
1e-5 tolerance. If dictionaries are identical, return None"""

diff = DeepDiff(dict1, dict2, math_epsilon=1e-5, ignore_numeric_type_changes=True)

if diff:
return f"Zocalo results from {dict1_source} and {dict2_source} are not identical.\n Results from {dict1_source}: {dict1}\n Results from {dict2_source}: {dict2}"


def source_from_results(results):
return (
ZocaloSource.GPU.value
if results["recipe_parameters"].get("gpu")
else ZocaloSource.CPU.value
)


class ZocaloResults(StandardReadable, Triggerable):
"""An ophyd device which can wait for results from a Zocalo job. These jobs should
be triggered from a plan-subscribed callback using the run_start() and run_end()
methods on dodal.devices.zocalo.ZocaloTrigger.

See https://github.com/DiamondLightSource/dodal/wiki/How-to-Interact-with-Zocalo"""
See https://diamondlightsource.github.io/dodal/main/how-to/zocalo.html

Args:
name (str): Name of the device

zocalo_environment (str): How zocalo is configured. Defaults to i03's development configuration

channel (str): Name for the results Queue

sort_key (str): How results are ranked. Defaults to sorting by highest counts

timeout_s (float): Maximum time to wait for the Queue to be filled by an object, starting
from when the ZocaloResults device is triggered

prefix (str): EPICS PV prefix for the device

use_cpu_and_gpu (bool): When True, ZocaloResults will wait for results from the CPU and the GPU, compare them, and provide a warning if the results differ. When False, ZocaloResults will only use results from the CPU

"""

def __init__(
self,
Expand All @@ -75,6 +119,7 @@ def __init__(
sort_key: str = DEFAULT_SORT_KEY.value,
timeout_s: float = DEFAULT_TIMEOUT,
prefix: str = "",
use_cpu_and_gpu: bool = False,
) -> None:
self.zocalo_environment = zocalo_environment
self.sort_key = SortKeys[sort_key]
Expand All @@ -83,6 +128,7 @@ def __init__(
self._prefix = prefix
self._raw_results_received: Queue = Queue()
self.transport: CommonTransport | None = None
self.use_cpu_and_gpu = use_cpu_and_gpu

self.results, self._results_setter = soft_signal_r_and_setter(
list[XrcResult], name="results"
Expand Down Expand Up @@ -111,14 +157,14 @@ def __init__(
)
super().__init__(name)

async def _put_results(self, results: Sequence[XrcResult], ispyb_ids):
async def _put_results(self, results: Sequence[XrcResult], recipe_parameters):
self._results_setter(list(results))
centres_of_mass = np.array([r["centre_of_mass"] for r in results])
bbox_sizes = np.array([bbox_size(r) for r in results])
self._com_setter(centres_of_mass)
self._bbox_setter(bbox_sizes)
self._ispyb_dcid_setter(ispyb_ids["dcid"])
self._ispyb_dcgid_setter(ispyb_ids["dcgid"])
self._ispyb_dcid_setter(recipe_parameters["dcid"])
self._ispyb_dcgid_setter(recipe_parameters["dcgid"])

def _clear_old_results(self):
LOGGER.info("Clearing queue")
Expand All @@ -127,7 +173,7 @@ def _clear_old_results(self):
@AsyncStatus.wrap
async def stage(self):
"""Stages the Zocalo device by: subscribing to the queue, doing a background
sleep for a few seconds to wait for any stale messages to be recieved, then
sleep for a few seconds to wait for any stale messages to be received, then
clearing the queue. Plans using this device should wait on ZOCALO_STAGE_GROUP
before triggering processing for the experiment"""

Expand Down Expand Up @@ -169,15 +215,65 @@ async def trigger(self):
)

raw_results = self._raw_results_received.get(timeout=self.timeout_s)
LOGGER.info(f"Zocalo: found {len(raw_results['results'])} crystals.")
source_of_first_results = source_from_results(raw_results)

# Wait for results from CPU and GPU, warn and continue if only GPU times out. Error if CPU times out
if self.use_cpu_and_gpu:
if source_of_first_results == ZocaloSource.CPU:
LOGGER.warning("Received zocalo results from CPU before GPU")
raw_results_two_sources = [raw_results]
try:
raw_results_two_sources.append(
self._raw_results_received.get(timeout=self.timeout_s / 2)
)
source_of_second_results = source_from_results(
raw_results_two_sources[1]
)

# Compare results from both sources and warn if they aren't the same
differences_str = get_dict_differences(
raw_results_two_sources[0]["results"][0],
source_of_first_results,
raw_results_two_sources[1]["results"][0],
source_of_second_results,
)
if differences_str:
LOGGER.warning(differences_str)

# Always use CPU results
raw_results = (
raw_results_two_sources[0]
if source_of_first_results == ZocaloSource.CPU
else raw_results_two_sources[1]
)

except Empty as err:
source_of_missing_results = (
ZocaloSource.CPU.value
if source_of_first_results == ZocaloSource.GPU.value
else ZocaloSource.GPU.value
)
if source_of_missing_results == ZocaloSource.GPU.value:
LOGGER.warning(
f"Zocalo results from {source_of_missing_results} timed out. Using results from {source_of_first_results}"
)
else:
LOGGER.error(
f"Zocalo results from {source_of_missing_results} timed out and GPU results not yet reliable"
)
raise err

LOGGER.info(
f"Zocalo results from {ZocaloSource.CPU.value} processing: found {len(raw_results['results'])} crystals."
)
# Sort from strongest to weakest in case of multiple crystals
await self._put_results(
sorted(
raw_results["results"],
key=lambda d: d[self.sort_key.value],
reverse=True,
),
raw_results["ispyb_ids"],
raw_results["recipe_parameters"],
)
except Empty as timeout_exception:
LOGGER.warning("Timed out waiting for zocalo results!")
Expand Down Expand Up @@ -241,9 +337,17 @@ def _receive_result(
self.transport.ack(header) # type: ignore # we create transport here

results = message.get("results", [])
self._raw_results_received.put(
{"results": results, "ispyb_ids": recipe_parameters}
)

if self.use_cpu_and_gpu:
self._raw_results_received.put(
{"results": results, "recipe_parameters": recipe_parameters}
)
else:
# Only add to queue if results are from CPU
if not recipe_parameters.get("gpu"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should: A unit test covering this would be good.

self._raw_results_received.put(
{"results": results, "recipe_parameters": recipe_parameters}
)

subscription = workflows.recipe.wrap_subscribe(
self.transport,
Expand Down
Loading
Loading