Skip to content

Commit

Permalink
moved tests and added data checks
Browse files Browse the repository at this point in the history
  • Loading branch information
ZohebShaikh committed Dec 10, 2024
1 parent cf12279 commit 1ac462a
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 1,110 deletions.
Empty file.
62 changes: 62 additions & 0 deletions tests/system_tests/test_data/data_profiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from collections.abc import Mapping
from dataclasses import dataclass
from pprint import pprint
from typing import Any

import h5py as h5
import numpy as np


@dataclass
class ExpectedDataset:
shape: tuple[int, ...]
value: np.ndarray | None = None


def validate_data(
actual_data: h5.HLObject,
expected: ExpectedDataset | Mapping[str, Any],
total: bool = False,
) -> None:
print(f"Validating {actual_data} against the following tree:")
pprint(expected)

if isinstance(expected, dict):
if total:
assert set(expected.keys()) == set(actual_data.keys()) # type: ignore
else:
assert set(expected.keys()).issubset(set(actual_data.keys())) # type: ignore
for name, dataset_or_group in actual_data.items(): # type: ignore
child = expected.get(name)
if name is not None:
validate_data(dataset_or_group, child) # type: ignore
elif total:
raise AssertionError(
f"{actual_data} has a child called {name} that is not expected"
)
elif isinstance(expected, ExpectedDataset):
name = actual_data.name
_assert_is_dataset(actual_data, expected)
assert (
actual_data.shape == expected.shape # type: ignore
), f"{name}: {actual_data.shape} should be {expected.shape}" # type: ignore
if expected.value is not None:
arr = np.array(actual_data)
assert np.equal(
arr, expected.value
), f"{name}: {arr} should be {expected.value}"


def _assert_is_dataset(
maybe_dataset: h5.HLObject, expected_dataset: ExpectedDataset
) -> None:
valid_types = [h5.Dataset]
name = maybe_dataset.name
for t in valid_types:
if isinstance(maybe_dataset, t):
return
raise AssertionError(
f"{maybe_dataset} of type {type(maybe_dataset)} is"
f" not one of the valid dataset types: {valid_types}."
f" The following was expected: {name}: {expected_dataset}"
)
42 changes: 42 additions & 0 deletions tests/system_tests/test_data/eventual_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import time
from contextlib import contextmanager
from pathlib import Path

import h5py as h5

# Currently the nexus writer has no way of letting us know when it is done with a file.
# This repeated try will have to do.


@contextmanager
def hdf5_file_with_backoff(
path: Path,
max_attempts: int = 5,
interval: float = 0.5,
):
f = _open_file_with_backoff(
path,
max_attempts,
interval,
)
try:
yield f
finally:
f.close()


def _open_file_with_backoff(
path: Path,
max_attempts: int = 5,
interval: float = 0.5,
) -> h5.File:
while max_attempts > 0:
try:
return h5.File(str(path))
except BlockingIOError as ex:
if max_attempts > 1:
max_attempts -= 1
time.sleep(0.5)
else:
raise ex
raise Exception("Failed to open file")
102 changes: 102 additions & 0 deletions tests/system_tests/test_plans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import os
from pathlib import Path

import pytest
from bluesky_stomp.models import BasicAuthentication

from blueapi.client.client import BlueapiClient
from blueapi.client.event_bus import AnyEvent
from blueapi.config import ApplicationConfig, RestConfig, StompConfig
from blueapi.worker.event import TaskStatus, WorkerEvent, WorkerState
from blueapi.worker.task import Task

BEAMLINE = os.environ.get("BEAMLINE", "p46")

DISABLE_SIDE_EFFECTS = bool(os.environ.get("DISABLE_SIDE_EFFECTS", 0))
DISABLE_SIDE_EFFECTS_MESSAGE = """
This test would cause side effects on the beamline, it has been disabled
so as not to interfere with operation. To run tests that may interfere with
the beamline export DISABLE_SIDE_EFFECTS=0
"""
disable_side_effects = pytest.mark.skipif(
DISABLE_SIDE_EFFECTS, reason=DISABLE_SIDE_EFFECTS_MESSAGE
)

VISIT_DIRECTORY: Path = {
"p46": Path("/exports/mybeamline/p46/data/2024/cm11111-1/"),
"p47": Path("/exports/mybeamline/p47/data/2024/cm11111-1/"),
}[BEAMLINE]

VISIT_NOT_MOUNTED = not (VISIT_DIRECTORY.exists() and VISIT_DIRECTORY.is_dir())

VISIT_NOT_MOUNTED_MESSAGE = f"""
This test inspects data so it has to run on a machine that mounts
{VISIT_DIRECTORY}
"""


@pytest.fixture
def training_rig_config() -> ApplicationConfig:
return ApplicationConfig(
stomp=StompConfig(
host="daq-rabbitmq",
auth=BasicAuthentication(username="guest", password="guest"), # type: ignore
),
api=RestConfig(host="p46-blueapi.diamond.ac.uk", port=443, protocol="https"),
)


@pytest.fixture
def client(training_rig_config) -> BlueapiClient:
return BlueapiClient.from_config(config=training_rig_config)


STEP_SCAN = Task(
name="plan_step_scan",
params={
"detectors": ["det"],
"motor": "sample_stage",
},
)


@disable_side_effects
def test_step_scan_task(client: BlueapiClient, plan: str = "plan_step_scan"):
assert client.get_plan(plan), f"In {plan} is available"

all_events: list[AnyEvent] = []

def on_event(event: AnyEvent):
all_events.append(event)

client.run_task(STEP_SCAN, on_event=on_event)
assert isinstance(all_events[0], WorkerEvent) and all_events[0].task_status
task_id = all_events[0].task_status.task_id
assert all_events == [
WorkerEvent(
state=WorkerState.RUNNING,
task_status=TaskStatus(
task_id=task_id,
task_complete=False,
task_failed=False,
),
),
WorkerEvent(
state=WorkerState.IDLE,
task_status=TaskStatus(
task_id=task_id,
task_complete=False,
task_failed=False,
),
),
WorkerEvent(
state=WorkerState.IDLE,
task_status=TaskStatus(
task_id=task_id,
task_complete=True,
task_failed=False,
),
),
]

assert client.get_state() is WorkerState.IDLE
31 changes: 0 additions & 31 deletions tests/system_tests_training_rigs/p4-devices.json

This file was deleted.

Loading

0 comments on commit 1ac462a

Please sign in to comment.