diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 79b85ff41..b66046bc2 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -5,6 +5,10 @@ "dockerfile": "../Dockerfile", "target": "developer" }, + "containerEnv": { + "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL": "http/protobuf", + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://127.0.0.1:4318", + }, "remoteEnv": { // Allow X11 apps to run inside the container "DISPLAY": "${localEnv:DISPLAY}" diff --git a/.github/pages/make_switcher.py b/.github/pages/make_switcher.py index 6d90f4905..14577cce6 100755 --- a/.github/pages/make_switcher.py +++ b/.github/pages/make_switcher.py @@ -52,7 +52,7 @@ def get_versions(ref: str, add: str | None) -> list[str]: return versions -def write_json(path: Path, repository: str, versions: str): +def write_json(path: Path, repository: str, versions: list[str]): org, repo_name = repository.split("/") struct = [ {"version": version, "url": f"https://{org}.github.io/{repo_name}/{version}/"} diff --git a/.vscode/launch.json b/.vscode/launch.json index 12992fef3..8291038c7 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -25,9 +25,10 @@ "request": "launch", "justMyCode": false, "module": "blueapi", - "args": [ - "serve" - ] + "args": "serve", + "env": { + "OTLP_EXPORT_ENABLED": "false" + }, }, { "name": "Blueapi Server (Custom Config)", @@ -35,7 +36,10 @@ "request": "launch", "justMyCode": false, "module": "blueapi", - "args": "--config ${input:config_path} serve" + "args": "--config ${input:config_path} serve", + "env": { + "OTLP_EXPORT_ENABLED": "false" + }, }, { "name": "Blueapi Controller", @@ -43,7 +47,21 @@ "request": "launch", "justMyCode": false, "module": "blueapi", - "args": "controller ${input:controller_args}" + "args": "controller ${input:controller_args}", + "env": { + "OTLP_EXPORT_ENABLED": "false" + }, + }, + { + "name": "Blueapi Controller (Custom Config)", + "type": "debugpy", + "request": "launch", + "justMyCode": false, + "module": "blueapi", + "args": "-c ${input:config_path} controller ${input:controller_args}", + "env": { + "OTLP_EXPORT_ENABLED": "false" + }, }, ], "inputs": [ @@ -56,7 +74,8 @@ { "id": "config_path", "type": "promptString", - "description": "Server config file path", - } + "description": "Path to configuration YAML file", + "default": "" + }, ] } \ No newline at end of file diff --git a/dev-requirements.txt b/dev-requirements.txt index 24ff1d64c..cdcad8fd0 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -9,8 +9,11 @@ annotated-types==0.7.0 anyio==4.6.2.post1 appdirs==1.4.4 asciitree==0.3.3 +asgiref==3.8.1 asttokens==2.4.1 +async-timeout==4.0.3 attrs==24.2.0 +autodoc_pydantic==2.2.0 babel==2.16.0 beautifulsoup4==4.12.3 bidict==0.23.1 @@ -20,6 +23,7 @@ bluesky-kafka==0.10.0 bluesky-live==0.0.8 bluesky-stomp==0.1.2 boltons==24.0.0 +bump-pydantic==0.8.0 cachetools==5.5.0 caproto==1.1.1 certifi==2024.8.30 @@ -53,11 +57,14 @@ docopt==0.6.2 doct==1.1.0 docutils==0.21.2 dunamai==1.22.0 +email_validator==2.2.0 entrypoints==0.4 epicscorelibs==7.0.7.99.1.1 event-model==1.21.0 +exceptiongroup==1.2.2 executing==2.1.0 fastapi==0.115.3 +fastapi-cli==0.0.5 fasteners==0.19 filelock==3.16.1 flexcache==0.3 @@ -68,12 +75,15 @@ fsspec==2024.10.0 funcy==2.0 gitdb==4.0.11 GitPython==3.1.43 +googleapis-common-protos==1.65.0 graypy==2.1.0 +grpcio==1.66.2 h11==0.14.0 h5py==3.12.1 HeapDict==1.0.1 historydict==1.2.6 httpcore==1.0.6 +httptools==0.6.1 httpx==0.27.2 humanize==4.11.0 identify==2.6.1 @@ -86,14 +96,18 @@ iniconfig==2.0.0 intake==0.6.4 ipython==8.18.0 ipywidgets==8.1.5 +itsdangerous==2.2.0 jedi==0.19.1 Jinja2==3.1.4 jinja2-ansible-filters==1.3.2 +jsonpointer==3.0.0 jsonschema==4.23.0 jsonschema-specifications==2024.10.1 jupyterlab_widgets==3.0.13 kiwisolver==1.4.7 ldap3==2.9.1 +libcst==1.4.0 +livereload==2.7.0 locket==1.0.0 lz4==4.3.3 markdown-it-py==3.0.0 @@ -118,8 +132,21 @@ nose2==0.15.1 nslsii==0.10.5 numcodecs==0.13.1 numpy==1.26.4 +observability-utils==0.1.2 opencv-python-headless==4.10.0.84 opentelemetry-api==1.27.0 +opentelemetry-distro==0.48b0 +opentelemetry-exporter-otlp==1.27.0 +opentelemetry-exporter-otlp-proto-common==1.27.0 +opentelemetry-exporter-otlp-proto-grpc==1.27.0 +opentelemetry-exporter-otlp-proto-http==1.27.0 +opentelemetry-instrumentation==0.48b0 +opentelemetry-instrumentation-asgi==0.48b0 +opentelemetry-instrumentation-fastapi==0.48b0 +opentelemetry-proto==1.27.0 +opentelemetry-sdk==1.27.0 +opentelemetry-semantic-conventions==0.48b0 +opentelemetry-util-http==0.48b0 ophyd==1.9.0 ophyd-async==0.6.0 orderly-set==5.2.2 @@ -146,6 +173,7 @@ pre_commit==4.0.1 prettytable==3.11.0 prompt-toolkit==3.0.36 propcache==0.2.0 +protobuf==4.25.5 psutil==6.1.0 ptyprocess==0.7.0 pure_eval==0.2.3 @@ -154,6 +182,7 @@ py==1.11.0 pyasn1==0.6.1 pycryptodome==3.21.0 pydantic==2.9.2 +pydantic-extra-types==2.9.0 pydantic-settings==2.6.0 pydantic_core==2.23.4 pydantic_numpy==5.0.2 @@ -170,22 +199,25 @@ pytest-cov==5.0.0 pytest-random-order==1.1.1 python-dateutil==2.9.0.post0 python-dotenv==1.0.1 +python-multipart==0.0.9 pytz==2024.2 PyYAML==6.0.2 +pyyaml-include==2.1 questionary==2.0.1 redis==5.2.0 redis-json-dict==0.2.1 referencing==0.35.1 requests==2.32.3 responses==0.25.3 +rich==13.7.1 rpds-py==0.20.0 ruamel.yaml==0.18.6 ruamel.yaml.clib==0.2.12 ruff==0.7.1 scanspec==0.7.6 semver==3.0.2 -setuptools==75.2.0 setuptools-dso==2.11 +shellingham==1.5.4 six==1.16.0 slicerator==1.1.0 smmap==5.0.1 @@ -197,6 +229,8 @@ sphinx-autobuild==2024.10.3 sphinx-autodoc-typehints==2.3.0 sphinx-click==6.0.0 sphinx-copybutton==0.5.2 +sphinx-jsonschema==1.19.1 +sphinx-pydantic==0.1.1 sphinx_design==0.6.1 sphinx_mdinclude==0.6.2 sphinxcontrib-applehelp==2.0.0 @@ -216,11 +250,14 @@ suitcase-msgpack==0.3.0 suitcase-utils==0.5.4 super-state-machine==2.0.2 tifffile==2024.9.20 +tomli==2.0.1 toolz==1.0.0 +tornado==6.4.1 tox==3.28.0 tox-direct==0.4 tqdm==4.66.5 traitlets==5.14.3 +typer==0.12.4 types-aiofiles==24.1.0.20240626 types-mock==5.1.0.20240425 types-PyYAML==6.0.12.20240917 @@ -230,8 +267,10 @@ typing-inspect==0.9.0 typing_extensions==4.12.2 tzdata==2024.2 tzlocal==5.2 +ujson==5.10.0 urllib3==2.2.3 uvicorn==0.32.0 +uvloop==0.19.0 virtualenv==20.27.0 watchfiles==0.24.0 wcwidth==0.2.13 diff --git a/docs/reference/openapi.yaml b/docs/reference/openapi.yaml index a9fdf63cc..57a4d9352 100644 --- a/docs/reference/openapi.yaml +++ b/docs/reference/openapi.yaml @@ -179,6 +179,10 @@ components: default: true title: Is Pending type: boolean + request_id: + default: '' + title: Request Id + type: string task: title: Task task_id: diff --git a/helm/blueapi/values.yaml b/helm/blueapi/values.yaml index d32e9f1eb..fe35c33e2 100644 --- a/helm/blueapi/values.yaml +++ b/helm/blueapi/values.yaml @@ -76,13 +76,26 @@ listener: resources: {} # Additional envVars to mount to the pod as a String -extraEnvVars: [] +extraEnvVars: | + - name: OTLP_EXPORT_ENABLED + value: {{ .Values.tracing.otlp.export_enabled }} + - name: OTEL_EXPORTER_OTLP_TRACES_PROTOCOL + value: {{ .Values.tracing.otlp.protocol }} + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: "{{ .Values.tracing.otlp.host }}:{{ .Values.tracing.otlp.port }}" # - name: RABBITMQ_PASSWORD # valueFrom: # secretKeyRef: # name: rabbitmq-password # key: rabbitmq-password +tracing: + otlp: + export_enabled: false + protocol: http/protobuf + host: https://daq-services-jaeger # replace with central instance + port: 4318 + # Config for the worker goes here, will be mounted into a config file worker: api: diff --git a/pyproject.toml b/pyproject.toml index c55616ba4..e356d52e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dependencies = [ "pyepics", "aioca", "pydantic>=2.0", - "scanspec>=0.7.2", + "scanspec>=0.7.6", "pydantic-settings", "stomp-py", "aiohttp", @@ -27,12 +27,15 @@ dependencies = [ "fastapi>=0.112.0", "uvicorn", "requests", - "dls-bluesky-core", #requires ophyd-async + "dls-bluesky-core", #requires ophyd-async "dls-dodal>=1.31.0", "super-state-machine", # https://github.com/DiamondLightSource/blueapi/issues/553 "GitPython", "bluesky-stomp>=0.1.2", "event-model==1.21", # https://github.com/DiamondLightSource/blueapi/issues/684 + "opentelemetry-distro>=0.48b0", + "opentelemetry-instrumentation-fastapi>=0.48b0", + "observability-utils>=0.1.2", ] dynamic = ["version"] license.file = "LICENSE" @@ -80,7 +83,9 @@ write_to = "src/blueapi/_version.py" [tool.mypy] ignore_missing_imports = true # Ignore missing stubs in imported modules -namespace_packages = false # rely only on __init__ files to determine fully qualified module names. + +# necessary for tracing sdk to work with mypy, set false once migraion to pyright complete +namespace_packages = true [tool.pytest.ini_options] # Run pytest with all our checkers, and don't spam us with massive tracebacks on error diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index e898c5b81..c98800c20 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -9,6 +9,7 @@ from bluesky.callbacks.best_effort import BestEffortCallback from bluesky_stomp.messaging import MessageContext, StompClient from bluesky_stomp.models import Broker +from observability_utils.tracing import setup_tracing from pydantic import ValidationError from requests.exceptions import ConnectionError @@ -18,14 +19,7 @@ from blueapi.client.event_bus import AnyEvent, BlueskyStreamingError, EventBusClient from blueapi.client.rest import BlueskyRemoteControlError from blueapi.config import ApplicationConfig, ConfigLoader -from blueapi.core import DataEvent -from blueapi.service.main import start -from blueapi.service.openapi import ( - DOCS_SCHEMA_LOCATION, - generate_schema, - print_schema_as_yaml, - write_schema_as_yaml, -) +from blueapi.core import OTLP_EXPORT_ENABLED, DataEvent from blueapi.worker import ProgressEvent, Task, WorkerEvent from .scratch import setup_scratch @@ -72,6 +66,16 @@ def main(ctx: click.Context, config: Path | None | tuple[Path, ...]) -> None: help="[Development only] update the schema in the documentation", ) def schema(output: Path | None = None, update: bool = False) -> None: + """Only import the service functions when starting the service or generating + the schema, not the controller as a new FastAPI app will be started each time. + """ + from blueapi.service.openapi import ( + DOCS_SCHEMA_LOCATION, + generate_schema, + print_schema_as_yaml, + write_schema_as_yaml, + ) + """Generate the schema for the REST API""" schema = generate_schema() @@ -89,6 +93,16 @@ def start_application(obj: dict): """Run a worker that accepts plans to run""" config: ApplicationConfig = obj["config"] + """Only import the service functions when starting the service or generating + the schema, not the controller as a new FastAPI app will be started each time. + """ + from blueapi.service.main import start + + """ + Set up basic automated instrumentation for the FastAPI app, creating the + observability context. + """ + setup_tracing("BlueAPI", OTLP_EXPORT_ENABLED) start(config) @@ -103,6 +117,7 @@ def start_application(obj: dict): def controller(ctx: click.Context, output: str) -> None: """Client utility for controlling and introspecting the worker""" + setup_tracing("BlueAPICLI", OTLP_EXPORT_ENABLED) if ctx.invoked_subcommand is None: print("Please invoke subcommand!") return diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 4468bf10e..486f3b86d 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -3,6 +3,10 @@ from bluesky_stomp.messaging import MessageContext, StompClient from bluesky_stomp.models import Broker +from observability_utils.tracing import ( + get_tracer, + start_as_current_span, +) from blueapi.config import ApplicationConfig from blueapi.core.bluesky_types import DataEvent @@ -22,6 +26,8 @@ from .event_bus import AnyEvent, BlueskyStreamingError, EventBusClient, OnAnyEvent from .rest import BlueapiRestClient, BlueskyRemoteControlError +TRACER = get_tracer("client") + class BlueapiClient: """Unified client for controlling blueapi""" @@ -53,6 +59,7 @@ def from_config(cls, config: ApplicationConfig) -> "BlueapiClient": events = None return cls(rest, events) + @start_as_current_span(TRACER) def get_plans(self) -> PlanResponse: """ List plans available @@ -62,6 +69,7 @@ def get_plans(self) -> PlanResponse: """ return self._rest.get_plans() + @start_as_current_span(TRACER, "name") def get_plan(self, name: str) -> PlanModel: """ Get details of a single plan @@ -74,6 +82,7 @@ def get_plan(self, name: str) -> PlanModel: """ return self._rest.get_plan(name) + @start_as_current_span(TRACER) def get_devices(self) -> DeviceResponse: """ List devices available @@ -84,6 +93,7 @@ def get_devices(self) -> DeviceResponse: return self._rest.get_devices() + @start_as_current_span(TRACER, "name") def get_device(self, name: str) -> DeviceModel: """ Get details of a single device @@ -97,6 +107,7 @@ def get_device(self, name: str) -> DeviceModel: return self._rest.get_device(name) + @start_as_current_span(TRACER) def get_state(self) -> WorkerState: """ Get current state of the blueapi worker @@ -107,6 +118,7 @@ def get_state(self) -> WorkerState: return self._rest.get_state() + @start_as_current_span(TRACER, "defer") def pause(self, defer: bool = False) -> WorkerState: """ Pause execution of the current task, if any @@ -122,6 +134,7 @@ def pause(self, defer: bool = False) -> WorkerState: return self._rest.set_state(WorkerState.PAUSED, defer=defer) + @start_as_current_span(TRACER) def resume(self) -> WorkerState: """ Resume plan execution if previously paused @@ -133,6 +146,7 @@ def resume(self) -> WorkerState: return self._rest.set_state(WorkerState.RUNNING, defer=False) + @start_as_current_span(TRACER, "task_id") def get_task(self, task_id: str) -> TrackableTask[Task]: """ Get a task stored by the worker @@ -146,6 +160,7 @@ def get_task(self, task_id: str) -> TrackableTask[Task]: assert task_id, "Task ID not provided!" return self._rest.get_task(task_id) + @start_as_current_span(TRACER) def get_all_tasks(self) -> TasksListResponse: """ Get a list of all task stored by the worker @@ -156,6 +171,7 @@ def get_all_tasks(self) -> TasksListResponse: return self._rest.get_all_tasks() + @start_as_current_span(TRACER) def get_active_task(self) -> WorkerTask: """ Get the currently active task, if any @@ -167,6 +183,7 @@ def get_active_task(self) -> WorkerTask: return self._rest.get_active_task() + @start_as_current_span(TRACER, "task", "timeout") def run_task( self, task: Task, @@ -229,6 +246,7 @@ def inner_on_event(event: AnyEvent, ctx: MessageContext) -> None: self.start_task(WorkerTask(task_id=task_id)) return complete.result(timeout=timeout) + @start_as_current_span(TRACER, "task") def create_and_start_task(self, task: Task) -> TaskResponse: """ Create a new task and instruct the worker to start it @@ -251,6 +269,7 @@ def create_and_start_task(self, task: Task) -> TaskResponse: f"but {worker_response.task_id} was started instead" ) + @start_as_current_span(TRACER, "task") def create_task(self, task: Task) -> TaskResponse: """ Create a new task, does not start execution @@ -264,6 +283,7 @@ def create_task(self, task: Task) -> TaskResponse: return self._rest.create_task(task) + @start_as_current_span(TRACER) def clear_task(self, task_id: str) -> TaskResponse: """ Delete a stored task on the worker @@ -277,6 +297,7 @@ def clear_task(self, task_id: str) -> TaskResponse: return self._rest.clear_task(task_id) + @start_as_current_span(TRACER, "task") def start_task(self, task: WorkerTask) -> WorkerTask: """ Instruct the worker to start a stored task immediately @@ -290,6 +311,7 @@ def start_task(self, task: WorkerTask) -> WorkerTask: return self._rest.update_worker_task(task) + @start_as_current_span(TRACER, "reason") def abort(self, reason: str | None = None) -> WorkerState: """ Abort the plan currently being executed, if any. @@ -310,6 +332,7 @@ def abort(self, reason: str | None = None) -> WorkerState: reason=reason, ) + @start_as_current_span(TRACER) def stop(self) -> WorkerState: """ Stop execution of the current plan early. @@ -323,6 +346,7 @@ def stop(self) -> WorkerState: return self._rest.cancel_current_task(WorkerState.STOPPING) + @start_as_current_span(TRACER) def get_environment(self) -> EnvironmentResponse: """ Get details of the worker environment @@ -334,6 +358,7 @@ def get_environment(self) -> EnvironmentResponse: return self._rest.get_environment() + @start_as_current_span(TRACER, "timeout", "polling_interval") def reload_environment( self, timeout: float | None = None, @@ -366,6 +391,7 @@ def reload_environment( polling_interval, ) + @start_as_current_span(TRACER, "timeout", "polling_interval") def _wait_for_reload( self, status: EnvironmentResponse, diff --git a/src/blueapi/client/rest.py b/src/blueapi/client/rest.py index 2ec60a1c7..e6837fec6 100644 --- a/src/blueapi/client/rest.py +++ b/src/blueapi/client/rest.py @@ -2,6 +2,11 @@ from typing import Any, Literal, TypeVar import requests +from observability_utils.tracing import ( + get_context_propagator, + get_tracer, + start_as_current_span, +) from pydantic import TypeAdapter from blueapi.config import RestConfig @@ -19,6 +24,8 @@ T = TypeVar("T") +TRACER = get_tracer("rest") + class BlueskyRemoteControlError(Exception): def __init__(self, message: str) -> None: @@ -118,6 +125,7 @@ def delete_environment(self) -> EnvironmentResponse: "/environment", EnvironmentResponse, method="DELETE" ) + @start_as_current_span(TRACER, "method", "data", "suffix") def _request_and_deserialize( self, suffix: str, @@ -127,10 +135,12 @@ def _request_and_deserialize( get_exception: Callable[[requests.Response], Exception | None] = _exception, ) -> T: url = self._url(suffix) + # Get the trace context to propagate to the REST API + carr = get_context_propagator() if data: - response = requests.request(method, url, json=data) + response = requests.request(method, url, json=data, headers=carr) else: - response = requests.request(method, url) + response = requests.request(method, url, headers=carr) exception = get_exception(response) if exception is not None: raise exception diff --git a/src/blueapi/core/__init__.py b/src/blueapi/core/__init__.py index 7b2306b0f..15e3b2602 100644 --- a/src/blueapi/core/__init__.py +++ b/src/blueapi/core/__init__.py @@ -1,3 +1,5 @@ +from os import environ + from .bluesky_event_loop import configure_bluesky_event_loop from .bluesky_types import ( BLUESKY_PROTOCOLS, @@ -14,6 +16,8 @@ from .context import BlueskyContext from .event import EventPublisher, EventStream +OTLP_EXPORT_ENABLED = environ.get("OTLP_EXPORT_ENABLED") == "true" + __all__ = [ "Plan", "PlanGenerator", diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 2f4651f6a..80c0a330e 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -1,3 +1,4 @@ +from collections.abc import Awaitable, Callable from contextlib import asynccontextmanager from fastapi import ( @@ -10,6 +11,15 @@ Response, status, ) +from observability_utils.tracing import ( + add_span_attributes, + get_tracer, + start_as_current_span, +) +from opentelemetry.context import attach +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.propagate import get_global_textmap +from opentelemetry.trace import get_tracer_provider from pydantic import ValidationError from starlette.responses import JSONResponse from super_state_machine.errors import TransitionError @@ -36,6 +46,8 @@ RUNNER: WorkerDispatcher | None = None +CONTEXT_HEADER = "traceparent" + def _runner() -> WorkerDispatcher: """Intended to be used only with FastAPI Depends""" @@ -75,6 +87,8 @@ async def lifespan(app: FastAPI): version=REST_API_VERSION, ) +TRACER = get_tracer("interface") + @app.exception_handler(KeyError) async def on_key_error_404(_: Request, __: KeyError): @@ -85,6 +99,7 @@ async def on_key_error_404(_: Request, __: KeyError): @app.get("/environment", response_model=EnvironmentResponse) +@start_as_current_span(TRACER, "runner") def get_environment( runner: WorkerDispatcher = Depends(_runner), ) -> EnvironmentResponse: @@ -105,6 +120,7 @@ async def delete_environment( @app.get("/plans", response_model=PlanResponse) +@start_as_current_span(TRACER) def get_plans(runner: WorkerDispatcher = Depends(_runner)): """Retrieve information about all available plans.""" return PlanResponse(plans=runner.run(interface.get_plans)) @@ -114,12 +130,14 @@ def get_plans(runner: WorkerDispatcher = Depends(_runner)): "/plans/{name}", response_model=PlanModel, ) +@start_as_current_span(TRACER, "name") def get_plan_by_name(name: str, runner: WorkerDispatcher = Depends(_runner)): """Retrieve information about a plan by its (unique) name.""" return runner.run(interface.get_plan, name) @app.get("/devices", response_model=DeviceResponse) +@start_as_current_span(TRACER) def get_devices(runner: WorkerDispatcher = Depends(_runner)): """Retrieve information about all available devices.""" return DeviceResponse(devices=runner.run(interface.get_devices)) @@ -129,6 +147,7 @@ def get_devices(runner: WorkerDispatcher = Depends(_runner)): "/devices/{name}", response_model=DeviceModel, ) +@start_as_current_span(TRACER, "name") def get_device_by_name(name: str, runner: WorkerDispatcher = Depends(_runner)): """Retrieve information about a devices by its (unique) name.""" return runner.run(interface.get_device, name) @@ -142,6 +161,7 @@ def get_device_by_name(name: str, runner: WorkerDispatcher = Depends(_runner)): response_model=TaskResponse, status_code=status.HTTP_201_CREATED, ) +@start_as_current_span(TRACER, "request", "task.name", "task.params") def submit_task( request: Request, response: Response, @@ -171,6 +191,7 @@ def submit_task( @app.delete("/tasks/{task_id}", status_code=status.HTTP_200_OK) +@start_as_current_span(TRACER, "task_id") def delete_submitted_task( task_id: str, runner: WorkerDispatcher = Depends(_runner), @@ -178,6 +199,7 @@ def delete_submitted_task( return TaskResponse(task_id=runner.run(interface.clear_task, task_id)) +@start_as_current_span(TRACER, "v") def validate_task_status(v: str) -> TaskStatusEnum: v_upper = v.upper() if v_upper not in TaskStatusEnum.__members__: @@ -186,6 +208,7 @@ def validate_task_status(v: str) -> TaskStatusEnum: @app.get("/tasks", response_model=TasksListResponse, status_code=status.HTTP_200_OK) +@start_as_current_span(TRACER) def get_tasks( task_status: str | None = None, runner: WorkerDispatcher = Depends(_runner), @@ -196,6 +219,7 @@ def get_tasks( """ tasks = [] if task_status: + add_span_attributes({"status": task_status}) try: desired_status = validate_task_status(task_status) except ValueError as e: @@ -215,6 +239,7 @@ def get_tasks( response_model=WorkerTask, responses={status.HTTP_409_CONFLICT: {"worker": "already active"}}, ) +@start_as_current_span(TRACER, "task.task_id") def set_active_task( task: WorkerTask, runner: WorkerDispatcher = Depends(_runner), @@ -234,6 +259,7 @@ def set_active_task( "/tasks/{task_id}", response_model=TrackableTask, ) +@start_as_current_span(TRACER, "task_id") def get_task( task_id: str, runner: WorkerDispatcher = Depends(_runner), @@ -246,6 +272,7 @@ def get_task( @app.get("/worker/task") +@start_as_current_span(TRACER) def get_active_task(runner: WorkerDispatcher = Depends(_runner)) -> WorkerTask: active = runner.run(interface.get_active_task) if active is not None: @@ -255,6 +282,7 @@ def get_active_task(runner: WorkerDispatcher = Depends(_runner)) -> WorkerTask: @app.get("/worker/state") +@start_as_current_span(TRACER) def get_state(runner: WorkerDispatcher = Depends(_runner)) -> WorkerState: """Get the State of the Worker""" return runner.run(interface.get_worker_state) @@ -283,6 +311,7 @@ def get_state(runner: WorkerDispatcher = Depends(_runner)) -> WorkerState: status.HTTP_202_ACCEPTED: {"detail": "Transition requested"}, }, ) +@start_as_current_span(TRACER, "state_change_request.new_state") def set_state( state_change_request: StateChangeRequest, response: Response, @@ -307,6 +336,7 @@ def set_state( """ current_state = runner.run(interface.get_worker_state) new_state = state_change_request.new_state + add_span_attributes({"current_state": current_state}) if ( current_state in _ALLOWED_TRANSITIONS and new_state in _ALLOWED_TRANSITIONS[current_state] @@ -330,6 +360,7 @@ def set_state( return runner.run(interface.get_worker_state) +@start_as_current_span(TRACER, "config") def start(config: ApplicationConfig): import uvicorn from uvicorn.config import LOGGING_CONFIG @@ -341,6 +372,12 @@ def start(config: ApplicationConfig): "%(asctime)s %(levelprefix)s %(client_addr)s" + " - '%(request_line)s' %(status_code)s" ) + FastAPIInstrumentor().instrument_app( + app, + tracer_provider=get_tracer_provider(), + http_capture_headers_server_request=[",*"], + http_capture_headers_server_response=[",*"], + ) app.state.config = config uvicorn.run(app, host=config.api.host, port=config.api.port) @@ -350,3 +387,19 @@ async def add_api_version_header(request: Request, call_next): response = await call_next(request) response.headers["X-API-Version"] = REST_API_VERSION return response + + +@app.middleware("http") +async def inject_propagated_observability_context( + request: Request, call_next: Callable[[Request], Awaitable[Response]] +) -> Response: + """Middleware to extract the any prorpagated observability context from the + HTTP headers and attatch it to the local one. + """ + if CONTEXT_HEADER in request.headers: + ctx = get_global_textmap().extract( + {CONTEXT_HEADER: request.headers[CONTEXT_HEADER]} + ) + attach(ctx) + response = await call_next(request) + return response diff --git a/src/blueapi/service/runner.py b/src/blueapi/service/runner.py index e122bf7d7..658fe8a5e 100644 --- a/src/blueapi/service/runner.py +++ b/src/blueapi/service/runner.py @@ -7,6 +7,14 @@ from multiprocessing.pool import Pool as PoolClass from typing import Any, ParamSpec, TypeVar +from observability_utils.tracing import ( + add_span_attributes, + get_context_propagator, + get_tracer, + start_as_current_span, +) +from opentelemetry.context import attach +from opentelemetry.propagate import get_global_textmap from pydantic import TypeAdapter from blueapi.config import ApplicationConfig @@ -17,6 +25,7 @@ set_start_method("spawn", force=True) LOGGER = logging.getLogger(__name__) +TRACER = get_tracer("runner") P = ParamSpec("P") T = TypeVar("T") @@ -50,13 +59,18 @@ def __init__( initialized=False, ) + @start_as_current_span(TRACER) def reload(self): """Reload the subprocess to account for any changes in python modules""" self.stop() self.start() LOGGER.info("Runner reloaded") + @start_as_current_span(TRACER) def start(self): + add_span_attributes( + {"_use_subprocess": self._use_subprocess, "_config": self._config} + ) try: if self._use_subprocess: self._subprocess = Pool(initializer=_init_worker, processes=1) @@ -69,6 +83,7 @@ def start(self): ) LOGGER.exception(e) + @start_as_current_span(TRACER) def stop(self): try: self.run(teardown) @@ -86,18 +101,33 @@ def stop(self): ) LOGGER.exception(e) - def run(self, function: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T: + @start_as_current_span(TRACER, "function", "args", "kwargs") + def run( + self, + function: Callable[P, T], + *args: P.args, + **kwargs: P.kwargs, + ) -> T: + """Calls the supplied function, which is modified to accept a dict as it's new + first param, before being passed to the subprocess runner, or just run in place. + """ + add_span_attributes({"use_subprocess": self._use_subprocess}) if self._use_subprocess: return self._run_in_subprocess(function, *args, **kwargs) else: return function(*args, **kwargs) + @start_as_current_span(TRACER, "function", "args", "kwargs") def _run_in_subprocess( self, function: Callable[P, T], *args: P.args, **kwargs: P.kwargs, ) -> T: + """Call the supplied function, passing the current Span ID, if one + exists,from the observability context inro the _rpc caller function. + When this is deserialized in and run by the subprocess, this will allow + its functions to use the corresponding span as their parent span.""" if self._subprocess is None: raise InvalidRunnerStateError("Subprocess runner has not been started") if not (hasattr(function, "__name__") and hasattr(function, "__module__")): @@ -115,6 +145,7 @@ def _run_in_subprocess( function.__module__, function.__name__, return_type, + get_context_propagator(), *args, ), kwargs, @@ -137,9 +168,13 @@ def _rpc( module_name: str, function_name: str, expected_type: type[T] | None, + carrier: dict[str, Any] | None, *args: Any, **kwargs: Any, ) -> T: + if carrier: + ctx = get_global_textmap().extract(carrier) + attach(ctx) mod = import_module(module_name) func: Callable[P, T] = _validate_function( mod.__dict__.get(function_name, None), function_name diff --git a/src/blueapi/worker/task_worker.py b/src/blueapi/worker/task_worker.py index 3f41bd22f..2c8ba35d7 100644 --- a/src/blueapi/worker/task_worker.py +++ b/src/blueapi/worker/task_worker.py @@ -9,10 +9,21 @@ from typing import Any, Generic, TypeVar from bluesky.protocols import Status +from observability_utils.tracing import ( + add_span_attributes, + get_trace_context, + get_tracer, + setup_tracing, + start_as_current_span, +) +from opentelemetry.baggage import get_baggage +from opentelemetry.context import Context +from opentelemetry.trace import SpanKind from pydantic import Field from super_state_machine.errors import TransitionError from blueapi.core import ( + OTLP_EXPORT_ENABLED, BlueskyContext, DataEvent, EventPublisher, @@ -36,8 +47,11 @@ from .worker_errors import WorkerAlreadyStartedError, WorkerBusyError LOGGER = logging.getLogger(__name__) +TRACER = get_tracer("task_worker") +""" Initialise a Tracer for this module provided by the app's global TracerProvider. """ DEFAULT_START_STOP_TIMEOUT: float = 30.0 +WORKER_THREAD_STATE = "worker thread state" T = TypeVar("T") @@ -49,6 +63,7 @@ class TrackableTask(BlueapiBaseModel, Generic[T]): task_id: str task: T + request_id: str = "" is_complete: bool = False is_pending: bool = True errors: list[str] = Field(default_factory=list) @@ -83,6 +98,7 @@ class TaskWorker: _started: Event _stopping: Event _stopped: Event + _current_task_otel_context: Context | None def __init__( self, @@ -111,11 +127,15 @@ def __init__( self._stopped = Event() self._stopped.set() self._broadcast_statuses = broadcast_statuses + self._current_task_otel_context = None + setup_tracing("BlueAPIWorker", OTLP_EXPORT_ENABLED) + @start_as_current_span(TRACER, "task_id") def clear_task(self, task_id: str) -> str: task = self._tasks.pop(task_id) return task.task_id + @start_as_current_span(TRACER) def cancel_active_task( self, failure: bool = False, @@ -127,16 +147,21 @@ def cancel_active_task( raise TransitionError("Attempted to cancel while no active Task") if failure: self._ctx.run_engine.abort(reason) + add_span_attributes({"Task aborted": reason}) else: self._ctx.run_engine.stop() + add_span_attributes({"Task stopped": reason}) return self._current.task_id + @start_as_current_span(TRACER) def get_tasks(self) -> list[TrackableTask]: return list(self._tasks.values()) + @start_as_current_span(TRACER, "task_id") def get_task_by_id(self, task_id: str) -> TrackableTask | None: return self._tasks.get(task_id) + @start_as_current_span(TRACER, "status") def get_tasks_by_status(self, status: TaskStatusEnum) -> list[TrackableTask]: if status == TaskStatusEnum.RUNNING: return [ @@ -150,9 +175,14 @@ def get_tasks_by_status(self, status: TaskStatusEnum) -> list[TrackableTask]: return [task for task in self._tasks.values() if task.is_complete] return [] + @start_as_current_span(TRACER) def get_active_task(self) -> TrackableTask[Task] | None: - return self._current + current = self._current + if current is not None: + add_span_attributes({"Active Task": current.task_id}) + return current + @start_as_current_span(TRACER, "task_id") def begin_task(self, task_id: str) -> None: task = self._tasks.get(task_id) if task is not None: @@ -160,13 +190,23 @@ def begin_task(self, task_id: str) -> None: else: raise KeyError(f"No pending task with ID {task_id}") + @start_as_current_span(TRACER, "task.name", "task.params") def submit_task(self, task: Task) -> str: task.prepare_params(self._ctx) # Will raise if parameters are invalid task_id: str = str(uuid.uuid4()) - trackable_task = TrackableTask(task_id=task_id, task=task) + add_span_attributes({"TaskId": task_id}) + trackable_task = TrackableTask( + task_id=task_id, request_id=str(get_baggage("correlation_id")), task=task + ) self._tasks[task_id] = trackable_task return task_id + @start_as_current_span( + TRACER, + "trackable_task.task_id", + "trackable_task.task.name", + "trackable_task.task.params", + ) def _submit_trackable_task(self, trackable_task: TrackableTask) -> None: if self.state is not WorkerState.IDLE: raise WorkerBusyError(f"Worker is in state {self.state}") @@ -182,7 +222,9 @@ def mark_task_as_started(event: WorkerEvent, _: str | None) -> None: LOGGER.info(f"Submitting: {trackable_task}") try: + self._current_task_otel_context = get_trace_context() sub = self.worker_events.subscribe(mark_task_as_started) + """ Cache the current trace context as the one for this task id """ self._task_channel.put_nowait(trackable_task) task_started.wait(timeout=5.0) if not task_started.is_set(): @@ -193,13 +235,17 @@ def mark_task_as_started(event: WorkerEvent, _: str | None) -> None: finally: self.worker_events.unsubscribe(sub) + @start_as_current_span(TRACER) def start(self) -> None: if self._started.is_set(): raise WorkerAlreadyStartedError("Worker is already running") self._wait_until_stopped() - run_worker_in_own_thread(self) + fut = run_worker_in_own_thread(self) self._wait_until_started() + add_span_attributes({WORKER_THREAD_STATE: fut._state}) + + @start_as_current_span(TRACER) def stop(self) -> None: LOGGER.info("Attempting to stop worker") @@ -208,15 +254,18 @@ def stop(self) -> None: self._task_channel.put(KillSignal()) else: LOGGER.info("Stopping worker: nothing to do") - LOGGER.info("Stopped") self._wait_until_stopped() + add_span_attributes({WORKER_THREAD_STATE: "STOPPED"}) + LOGGER.info("Stopped") + @start_as_current_span(TRACER) def _wait_until_started(self) -> None: if not self._started.wait(timeout=self._start_stop_timeout): raise TimeoutError( f"Worker did not start within {self._start_stop_timeout} seconds" ) + @start_as_current_span(TRACER) def _wait_until_stopped(self) -> None: if not self._stopped.wait(timeout=self._start_stop_timeout): raise TimeoutError( @@ -227,6 +276,7 @@ def _wait_until_stopped(self) -> None: def state(self) -> WorkerState: return self._state + @start_as_current_span(TRACER) def run(self) -> None: LOGGER.info("Worker starting") self._ctx.run_engine.state_hook = self._on_state_change @@ -242,38 +292,58 @@ def run(self) -> None: self._stopping.clear() self._stopped.set() + @start_as_current_span(TRACER, "defer") def pause(self, defer=False): LOGGER.info("Requesting to pause the worker") self._ctx.run_engine.request_pause(defer) + @start_as_current_span(TRACER) def resume(self): LOGGER.info("Requesting to resume the worker") self._ctx.run_engine.resume() + @start_as_current_span(TRACER) def _cycle_with_error_handling(self) -> None: try: self._cycle() except Exception as ex: self._report_error(ex) + @start_as_current_span(TRACER) def _cycle(self) -> None: try: LOGGER.info("Awaiting task") next_task: TrackableTask | KillSignal = self._task_channel.get() if isinstance(next_task, TrackableTask): - LOGGER.info(f"Got new task: {next_task}") - self._current = next_task # Informing mypy that the task is not None - self._current.is_pending = False - self._current.task.do_task(self._ctx) + if self._current_task_otel_context is not None: + with TRACER.start_as_current_span( + "_cycle", + context=self._current_task_otel_context, + kind=SpanKind.SERVER, + ): + LOGGER.info(f"Got new task: {next_task}") + self._current = ( + next_task # Informing mypy that the task is not None + ) + + self._current_task_otel_context = get_trace_context() + add_span_attributes({"next_task.task_id": next_task.task_id}) + + self._current.is_pending = False + self._current.task.do_task(self._ctx) elif isinstance(next_task, KillSignal): # If we receive a kill signal we begin to shut the worker down. # Note that the kill signal is explicitly not a type of task as we don't # want it to be part of the worker's public API self._stopping.set() + add_span_attributes({"server shutting down": "true"}) else: raise KeyError(f"Unknown command: {next_task}") except Exception as err: self._report_error(err) + finally: + if self._current_task_otel_context is not None: + self._current_task_otel_context = None if self._current is not None: self._current.is_complete = True @@ -294,6 +364,7 @@ def progress_events(self) -> EventStream[ProgressEvent, int]: def data_events(self) -> EventStream[DataEvent, int]: return self._data_events + @start_as_current_span(TRACER, "raw_new_state", "raw_old_state") def _on_state_change( self, raw_new_state: RawRunEngineState, @@ -314,6 +385,7 @@ def _report_error(self, err: Exception) -> None: self._current.errors.append(str(err)) self._errors.append(str(err)) + @start_as_current_span(TRACER) def _report_status( self, ) -> None: @@ -327,6 +399,13 @@ def _report_status( task_failed=bool(self._current.errors), ) correlation_id = self._current.task_id + add_span_attributes( + { + "task_id": self._current.task_id, + "task_complete": self._current.is_complete, + "task_failed": self._current.errors, + } + ) else: task_status = None correlation_id = None @@ -341,10 +420,35 @@ def _report_status( def _on_document(self, name: str, document: Mapping[str, Any]) -> None: if self._current is not None: - correlation_id = self._current.task_id - self._data_events.publish( - DataEvent(name=name, doc=document), correlation_id - ) + if self._current_task_otel_context is not None: + with TRACER.start_as_current_span( + "_on_document", + context=self._current_task_otel_context, + kind=SpanKind.PRODUCER, + ): + """ + Start a new span but inject the context cached when the current task + was created. This will make the documents received part of the same + trace. + """ + add_span_attributes( + { + "task_id": self._current.task_id, + "name": name, + "document": str(document), + } + ) + + correlation_id = self._current.request_id + self._data_events.publish( + DataEvent(name=name, doc=document), correlation_id + ) + else: + raise ValueError( + "There is no context set for tracing despite the fact that a task" + " is running, something has gone wrong..." + ) + else: raise KeyError( "Trying to emit a document despite the fact that the RunEngine is idle" diff --git a/tests/conftest.py b/tests/conftest.py index 838d4b219..006dc6a57 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,16 @@ import asyncio +from typing import cast # Based on https://docs.pytest.org/en/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option # noqa: E501 import pytest from bluesky import RunEngine from bluesky.run_engine import TransitionError +from observability_utils.tracing import setup_tracing +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import get_tracer_provider + +from tests.unit_tests.utils.test_tracing import JsonObjectSpanExporter @pytest.fixture(scope="function") @@ -24,3 +31,17 @@ def clean_event_loop(): request.addfinalizer(clean_event_loop) return RE + + +@pytest.fixture +def provider() -> TracerProvider: + setup_tracing("test", False) + return cast(TracerProvider, get_tracer_provider()) + + +@pytest.fixture +def exporter(provider: TracerProvider) -> JsonObjectSpanExporter: + exporter = JsonObjectSpanExporter() + processor = BatchSpanProcessor(exporter) + provider.add_span_processor(processor) + return exporter diff --git a/tests/system_tests/plans.json b/tests/system_tests/plans.json index b5d0f76e2..d0bba9319 100644 --- a/tests/system_tests/plans.json +++ b/tests/system_tests/plans.json @@ -141,7 +141,7 @@ }, "radius": { "description": "Radius of the circle", - "exclusiveMinimum": 0, + "exclusiveMinimum": 0.0, "title": "Radius", "type": "number" }, @@ -170,19 +170,11 @@ "description": "Abstract baseclass for a combination of two regions, left and right.", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -207,19 +199,11 @@ "description": "Concatenate two Specs together, running one after the other.\n\nEach Dimension of left and right must contain the same axes. Typically\nformed using `Spec.concat`.\n\n.. example_spec::\n\n from scanspec.specs import Line\n\n spec = Line(\"x\", 1, 3, 3).concat(Line(\"x\", 4, 5, 5))", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The left-hand Spec to Concat, midpoints will appear earlier" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The right-hand Spec to Concat, midpoints will appear later" }, "gap": { @@ -256,19 +240,11 @@ "description": "A point is in DifferenceOf(a, b) if in a and not in b.\n\nTypically created with the ``-`` operator.\n\n>>> r = Range(\"x\", 0.5, 2.5) - Range(\"x\", 1.5, 3.5)\n>>> r.mask({\"x\": np.array([0, 1, 2, 3, 4])})\narray([False, True, False, False, False])", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -312,18 +288,18 @@ }, "x_radius": { "description": "The radius along the x axis of the ellipse", - "exclusiveMinimum": 0, + "exclusiveMinimum": 0.0, "title": "X Radius", "type": "number" }, "y_radius": { "description": "The radius along the y axis of the ellipse", - "exclusiveMinimum": 0, + "exclusiveMinimum": 0.0, "title": "Y Radius", "type": "number" }, "angle": { - "default": 0, + "default": 0.0, "description": "The angle of the ellipse (degrees)", "title": "Angle", "type": "number" @@ -354,19 +330,11 @@ "description": "A point is in IntersectionOf(a, b) if in both a and b.\n\nTypically created with the ``&`` operator.\n\n>>> r = Range(\"x\", 0.5, 2.5) & Range(\"x\", 1.5, 3.5)\n>>> r.mask({\"x\": np.array([0, 1, 2, 3, 4])})\narray([False, False, True, False, False])", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -434,19 +402,11 @@ "description": "Restrict Spec to only midpoints that fall inside the given Region.\n\nTypically created with the ``&`` operator. It also pushes down the\n``& | ^ -`` operators to its `Region` to avoid the need for brackets on\ncombinations of Regions.\n\nIf a Region spans multiple Frames objects, they will be squashed together.\n\n.. example_spec::\n\n from scanspec.regions import Circle\n from scanspec.specs import Line\n\n spec = Line(\"y\", 1, 3, 3) * Line(\"x\", 3, 5, 5) & Circle(\"x\", \"y\", 4, 2, 1.2)\n\nSee Also: `why-squash-can-change-path`", "properties": { "spec": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The Spec containing the source midpoints" }, "region": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The Region that midpoints will be inside" }, "check_path_changes": { @@ -526,19 +486,11 @@ "description": "Outer product of two Specs, nesting inner within outer.\n\nThis means that inner will run in its entirety at each point in outer.\n\n.. example_spec::\n\n from scanspec.specs import Line\n\n spec = Line(\"y\", 1, 2, 3) * Line(\"x\", 3, 4, 12)", "properties": { "outer": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "Will be executed once" }, "inner": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "Will be executed len(outer) times" }, "type": { @@ -627,7 +579,7 @@ "type": "number" }, "angle": { - "default": 0, + "default": 0.0, "description": "Clockwise rotation angle of the rectangle", "title": "Angle", "type": "number" @@ -739,11 +691,7 @@ "description": "Run the Spec in reverse on every other iteration when nested.\n\nTypically created with the ``~`` operator.\n\n.. example_spec::\n\n from scanspec.specs import Line\n\n spec = Line(\"y\", 1, 3, 3) * ~Line(\"x\", 3, 5, 5)", "properties": { "spec": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The Spec to run in reverse every other iteration" }, "type": { @@ -850,7 +798,7 @@ "type": "integer" }, "rotate": { - "default": 0, + "default": 0.0, "description": "How much to rotate the angle of the spiral", "title": "Rotate", "type": "number" @@ -882,11 +830,7 @@ "description": "Squash a stack of Frames together into a single expanded Frames object.\n\nSee Also:\n `why-squash-can-change-path`\n\n.. example_spec::\n\n from scanspec.specs import Line, Squash\n\n spec = Squash(Line(\"y\", 1, 2, 3) * Line(\"x\", 0, 1, 4))", "properties": { "spec": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The Spec to squash the dimensions of" }, "check_path_changes": { @@ -953,19 +897,11 @@ "description": "A point is in SymmetricDifferenceOf(a, b) if in either a or b, but not both.\n\nTypically created with the ``^`` operator.\n\n>>> r = Range(\"x\", 0.5, 2.5) ^ Range(\"x\", 1.5, 3.5)\n>>> r.mask({\"x\": np.array([0, 1, 2, 3, 4])})\narray([False, True, False, True, False])", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -990,19 +926,11 @@ "description": "A point is in UnionOf(a, b) if in either a or b.\n\nTypically created with the ``|`` operator\n\n>>> r = Range(\"x\", 0.5, 2.5) | Range(\"x\", 1.5, 3.5)\n>>> r.mask({\"x\": np.array([0, 1, 2, 3, 4])})\narray([False, True, True, True, False])", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The left-hand Region to combine" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Region" - } - ], + "$ref": "#/$defs/Region", "description": "The right-hand Region to combine" }, "type": { @@ -1027,19 +955,11 @@ "description": "Run two Specs in parallel, merging their midpoints together.\n\nTypically formed using `Spec.zip`.\n\nStacks of Frames are merged by:\n\n- If right creates a stack of a single Frames object of size 1, expand it to\n the size of the fastest Frames object created by left\n- Merge individual Frames objects together from fastest to slowest\n\nThis means that Zipping a Spec producing stack [l2, l1] with a Spec\nproducing stack [r1] will assert len(l1)==len(r1), and produce\nstack [l2, l1.zip(r1)].\n\n.. example_spec::\n\n from scanspec.specs import Line\n\n spec = Line(\"z\", 1, 2, 3) * Line(\"y\", 3, 4, 5).zip(Line(\"x\", 4, 5, 5))", "properties": { "left": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The left-hand Spec to Zip, will appear earlier in axes" }, "right": { - "allOf": [ - { - "$ref": "#/$defs/Spec" - } - ], + "$ref": "#/$defs/Spec", "description": "The right-hand Spec to Zip, will appear later in axes" }, "type": { diff --git a/tests/unit_tests/client/test_client.py b/tests/unit_tests/client/test_client.py index f2ea986fd..fc110a1b6 100644 --- a/tests/unit_tests/client/test_client.py +++ b/tests/unit_tests/client/test_client.py @@ -3,6 +3,10 @@ import pytest from bluesky_stomp.messaging import MessageContext +from tests.unit_tests.utils.test_tracing import ( + JsonObjectSpanExporter, + asserting_span_exporter, +) from blueapi.client.client import BlueapiClient from blueapi.client.event_bus import AnyEvent, BlueskyStreamingError, EventBusClient @@ -426,3 +430,149 @@ def callback(on_event: Callable[[AnyEvent, MessageContext], None]): client_with_events.run_task(Task(name="foo"), on_event=mock_on_event) mock_on_event.assert_called_once_with(COMPLETE_EVENT) + + +def test_get_plans_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with asserting_span_exporter(exporter, "get_plans"): + client.get_plans() + + +def test_get_plan_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with asserting_span_exporter(exporter, "get_plan", "name"): + client.get_plan("foo") + + +def test_get_devices_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with asserting_span_exporter(exporter, "get_devices"): + client.get_devices() + + +def test_get_device_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with asserting_span_exporter(exporter, "get_device", "name"): + client.get_device("foo") + + +def test_get_state_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with asserting_span_exporter(exporter, "get_state"): + client.get_state() + + +def test_get_task_span_ok(exporter: JsonObjectSpanExporter, client: BlueapiClient): + with asserting_span_exporter(exporter, "get_task", "task_id"): + client.get_task("foo") + + +def test_get_all_tasks_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, +): + with asserting_span_exporter(exporter, "get_all_tasks"): + client.get_all_tasks() + + +def test_create_task_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with asserting_span_exporter(exporter, "create_task", "task"): + client.create_task(task=Task(name="foo")) + + +def test_clear_task_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with asserting_span_exporter(exporter, "clear_task"): + client.clear_task(task_id="foo") + + +def test_get_active_task_span_ok( + exporter: JsonObjectSpanExporter, client: BlueapiClient +): + with asserting_span_exporter(exporter, "get_active_task"): + client.get_active_task() + + +def test_start_task_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with asserting_span_exporter(exporter, "start_task", "task"): + client.start_task(task=WorkerTask(task_id="bar")) + + +def test_create_and_start_task_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + mock_rest.create_task.return_value = TaskResponse(task_id="baz") + mock_rest.update_worker_task.return_value = TaskResponse(task_id="baz") + with asserting_span_exporter(exporter, "create_and_start_task", "task"): + client.create_and_start_task(Task(name="baz")) + + +def test_get_environment_span_ok( + exporter: JsonObjectSpanExporter, client: BlueapiClient +): + with asserting_span_exporter(exporter, "get_environment"): + client.get_environment() + + +def test_reload_environment_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with asserting_span_exporter(exporter, "reload_environment"): + client.reload_environment() + + +def test_abort_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with asserting_span_exporter(exporter, "abort", "reason"): + client.abort(reason="foo") + + +def test_stop_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with asserting_span_exporter(exporter, "stop"): + client.stop() + + +def test_pause_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with asserting_span_exporter(exporter, "pause"): + client.pause(defer=True) + + +def test_resume_span_ok( + exporter: JsonObjectSpanExporter, + client: BlueapiClient, + mock_rest: Mock, +): + with asserting_span_exporter(exporter, "resume"): + client.resume() + + +def test_cannot_run_task_span_ok( + exporter: JsonObjectSpanExporter, client: BlueapiClient +): + with pytest.raises( + RuntimeError, + match="Cannot run plans without Stomp configuration to track progress", + ): + with asserting_span_exporter(exporter, "grun_task"): + client.run_task(Task(name="foo")) diff --git a/tests/unit_tests/service/test_interface.py b/tests/unit_tests/service/test_interface.py index 36c6b69e9..654881916 100644 --- a/tests/unit_tests/service/test_interface.py +++ b/tests/unit_tests/service/test_interface.py @@ -1,6 +1,6 @@ import uuid from dataclasses import dataclass -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import ANY, MagicMock, Mock, patch import pytest from bluesky_stomp.messaging import StompClient @@ -274,8 +274,9 @@ def test_get_task_by_id(context_mock: MagicMock): task_id = interface.submit_task(Task(name="my_plan")) - assert interface.get_task_by_id(task_id) == TrackableTask( + assert interface.get_task_by_id(task_id) == TrackableTask.model_construct( task_id=task_id, + request_id=ANY, task=Task(name="my_plan", params={}), is_complete=False, is_pending=True, diff --git a/tests/unit_tests/service/test_rest_api.py b/tests/unit_tests/service/test_rest_api.py index 6bb08cb01..decf2a483 100644 --- a/tests/unit_tests/service/test_rest_api.py +++ b/tests/unit_tests/service/test_rest_api.py @@ -252,6 +252,7 @@ def test_get_tasks(get_tasks_mock: MagicMock, client: TestClient) -> None: "errors": [], "is_complete": False, "is_pending": True, + "request_id": "", "task": {"name": "sleep", "params": {"time": 0.0}}, "task_id": "0", }, @@ -259,6 +260,7 @@ def test_get_tasks(get_tasks_mock: MagicMock, client: TestClient) -> None: "errors": [], "is_complete": False, "is_pending": True, + "request_id": "", "task": {"name": "first_task", "params": {}}, "task_id": "1", }, @@ -288,6 +290,7 @@ def test_get_tasks_by_status( "errors": [], "is_complete": True, "is_pending": False, + "request_id": "", "task": {"name": "third_task", "params": {}}, "task_id": "3", } @@ -379,6 +382,7 @@ def test_get_task(get_task_by_id: MagicMock, client: TestClient): "errors": [], "is_complete": False, "is_pending": True, + "request_id": "", "task": {"name": "third_task", "params": {}}, "task_id": f"{task_id}", } @@ -404,6 +408,7 @@ def test_get_all_tasks(get_all_tasks: MagicMock, client: TestClient): "task": {"name": "third_task", "params": {}}, "is_complete": False, "is_pending": True, + "request_id": "", "errors": [], } ] diff --git a/tests/unit_tests/service/test_runner.py b/tests/unit_tests/service/test_runner.py index af65e9b0a..82e06bc24 100644 --- a/tests/unit_tests/service/test_runner.py +++ b/tests/unit_tests/service/test_runner.py @@ -5,6 +5,10 @@ import pytest from ophyd import Callable from pydantic import BaseModel, ValidationError +from tests.unit_tests.utils.test_tracing import ( + JsonObjectSpanExporter, + asserting_span_exporter, +) from blueapi.service import interface from blueapi.service.model import EnvironmentResponse @@ -244,3 +248,10 @@ def test_accepts_return_type( rpc_function: Callable[[], Any], ): started_runner.run(rpc_function) + + +def test_run_span_ok( + exporter: JsonObjectSpanExporter, started_runner: WorkerDispatcher +): + with asserting_span_exporter(exporter, "run", "function", "args", "kwargs"): + started_runner.run(interface.get_plans) diff --git a/tests/unit_tests/utils/test_tracing.py b/tests/unit_tests/utils/test_tracing.py new file mode 100644 index 000000000..7cd83640c --- /dev/null +++ b/tests/unit_tests/utils/test_tracing.py @@ -0,0 +1,52 @@ +from collections.abc import Callable, Sequence +from concurrent.futures import Future +from contextlib import contextmanager +from typing import IO + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import ( + SpanExporter, + SpanExportResult, +) + + +class JsonObjectSpanExporter(SpanExporter): + """A custom span exporter to allow spans created by open telemetry tracing code to + be examined and verified during normal testing + """ + + def __init__( + self, + service_name: str | None = "Test", + out: IO | None = None, + formatter: Callable[[ReadableSpan], str] | None = None, + ): + self.service_name = service_name + self.top_span: Future = Future() + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + if self.top_span is not None and not self.top_span.done(): + self.top_span.set_result(spans[-1]) + return SpanExportResult.SUCCESS + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + +@contextmanager +def asserting_span_exporter( + exporter: JsonObjectSpanExporter, func_name: str, *span_args: str +): + """Use as a with block around the function under test decorated with + start_as_current_span to check span creation and content. + + params: + func_name: The name of the function being tested + span_args: The arguments specified in its start_as_current_span decorator + """ + yield + if exporter.top_span is not None: + span = exporter.top_span.result(timeout=5.0) + assert span.name == func_name + for param in span_args: + assert param in span.attributes.keys() diff --git a/tests/unit_tests/worker/test_task_worker.py b/tests/unit_tests/worker/test_task_worker.py index 96777db9b..9d9028a18 100644 --- a/tests/unit_tests/worker/test_task_worker.py +++ b/tests/unit_tests/worker/test_task_worker.py @@ -4,9 +4,13 @@ from concurrent.futures import Future from queue import Full from typing import Any, TypeVar -from unittest.mock import MagicMock, patch +from unittest.mock import ANY, MagicMock, patch import pytest +from tests.unit_tests.utils.test_tracing import ( + JsonObjectSpanExporter, + asserting_span_exporter, +) from blueapi.config import EnvironmentConfig, Source, SourceKind from blueapi.core import BlueskyContext, EventStream, MsgGenerator @@ -111,20 +115,34 @@ def test_multi_start(inert_worker: TaskWorker) -> None: inert_worker.stop() -def test_submit_task(worker: TaskWorker) -> None: +def test_submit_task( + worker: TaskWorker, +) -> None: assert worker.get_tasks() == [] task_id = worker.submit_task(_SIMPLE_TASK) - assert worker.get_tasks() == [TrackableTask(task_id=task_id, task=_SIMPLE_TASK)] + assert worker.get_tasks() == [ + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) + ] def test_submit_multiple_tasks(worker: TaskWorker) -> None: assert worker.get_tasks() == [] task_id_1 = worker.submit_task(_SIMPLE_TASK) - assert worker.get_tasks() == [TrackableTask(task_id=task_id_1, task=_SIMPLE_TASK)] + assert worker.get_tasks() == [ + TrackableTask.model_construct( + task_id=task_id_1, request_id=ANY, task=_SIMPLE_TASK + ) + ] task_id_2 = worker.submit_task(_LONG_TASK) assert worker.get_tasks() == [ - TrackableTask(task_id=task_id_1, task=_SIMPLE_TASK), - TrackableTask(task_id=task_id_2, task=_LONG_TASK), + TrackableTask.model_construct( + task_id=task_id_1, request_id=ANY, task=_SIMPLE_TASK + ), + TrackableTask.model_construct( + task_id=task_id_2, request_id=ANY, task=_LONG_TASK + ), ] @@ -136,27 +154,43 @@ def test_stop_with_task_pending(inert_worker: TaskWorker) -> None: def test_restart_leaves_task_pending(worker: TaskWorker) -> None: task_id = worker.submit_task(_SIMPLE_TASK) - assert worker.get_tasks() == [TrackableTask(task_id=task_id, task=_SIMPLE_TASK)] + assert worker.get_tasks() == [ + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) + ] worker.stop() worker.start() - assert worker.get_tasks() == [TrackableTask(task_id=task_id, task=_SIMPLE_TASK)] + assert worker.get_tasks() == [ + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) + ] def test_submit_before_start_pending(inert_worker: TaskWorker) -> None: task_id = inert_worker.submit_task(_SIMPLE_TASK) inert_worker.start() assert inert_worker.get_tasks() == [ - TrackableTask(task_id=task_id, task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) ] inert_worker.stop() assert inert_worker.get_tasks() == [ - TrackableTask(task_id=task_id, task=_SIMPLE_TASK) + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) ] def test_clear_task(worker: TaskWorker) -> None: task_id = worker.submit_task(_SIMPLE_TASK) - assert worker.get_tasks() == [TrackableTask(task_id=task_id, task=_SIMPLE_TASK)] + assert worker.get_tasks() == [ + TrackableTask.model_construct( + task_id=task_id, request_id=ANY, task=_SIMPLE_TASK + ) + ] assert worker.clear_task(task_id) assert worker.get_tasks() == [] @@ -487,3 +521,36 @@ def test_get_tasks_by_status(worker: TaskWorker, status, expected_task_ids): result_ids = [task_id for task_id, task in worker._tasks.items() if task in result] assert result_ids == expected_task_ids + + +def test_start_span_ok( + exporter: JsonObjectSpanExporter, inert_worker: TaskWorker +) -> None: + with asserting_span_exporter(exporter, "start"): + inert_worker.start() + inert_worker.stop() + + +def test_stop_span_ok( + exporter: JsonObjectSpanExporter, inert_worker: TaskWorker +) -> None: + inert_worker.start() + with asserting_span_exporter(exporter, "stop"): + inert_worker.stop() + + +def test_submit_task_span_ok( + exporter: JsonObjectSpanExporter, + worker: TaskWorker, +) -> None: + assert worker.get_tasks() == [] + with asserting_span_exporter(exporter, "submit_task", "task.name", "task.params"): + worker.submit_task(_SIMPLE_TASK) + + +def test_clear_task_span_ok( + exporter: JsonObjectSpanExporter, worker: TaskWorker +) -> None: + with pytest.raises(KeyError): + with asserting_span_exporter(exporter, "clear_task", "task_id"): + worker.clear_task("foo")