Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Obs trace WIP do not merge #586

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
// Make sure SELinux does not disable with access to host filesystems like tmp
"--security-opt=label=disable"
],
"mounts": [
"source=/scratch/athena/observability-utils,target=/scratch/athena/observability-utils,type=bind,consistency=cached"
],
// Mount the parent as /workspaces so we can pip install peers as editable
"workspaceMount": "source=${localWorkspaceFolder}/..,target=/workspaces,type=bind",
// After the container is created, install the python project in editable form
Expand Down
2 changes: 1 addition & 1 deletion .github/pages/make_switcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_versions(ref: str, add: str | None) -> list[str]:
return versions


def write_json(path: Path, repository: str, versions: str):
def write_json(path: Path, repository: str, versions: list[str]):
org, repo_name = repository.split("/")
struct = [
{"version": version, "url": f"https://{org}.github.io/{repo_name}/{version}/"}
Expand Down
7 changes: 7 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
RUN python -m venv /venv
ENV PATH=/venv/bin:$PATH

# enable opentelemetry support
ENV OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf
ENV OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318
ENV OTEL_EXPORTER_OTLP_INSECURE=true
ENV OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST=".*"
ENV OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE=".*"

# The build stage installs the context into the venv
FROM developer as build
COPY . /context
Expand Down
25 changes: 23 additions & 2 deletions helm/blueapi/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,34 @@ listener:
resources: {}

# Additional envVars to mount to the pod as a String
extraEnvVars: []
extraEnvVars: |
- name: OTEL_EXPORTER_OTLP_TRACES_PROTOCOL
value: {{ .Values.jaeger.otlp.protocol }}
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "{{ .Values.jaeger.otlp.host }}:{{ .Values.jaeger.otlp.port }}"
- name: OTEL_EXPORTER_OTLP_INSECURE
value: "{{ .Values.jaeger.otlp.insecure }}"
- name: OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST
value: {{ .Values.jaeger.otlp.request.headers }}
- name: OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE
value: {{ .Values.jaeger.otlp.response.headers }}
# - name: RABBITMQ_PASSWORD
# valueFrom:
# secretKeyRef:
# name: rabbitmq-password
# key: rabbitmq-password

jaeger:
otlp:
protocol: http/protobuf
insecure: true
host: http://localhost
port: 4318
request:
headers: ".*"
response:
headers: ".*"

# Config for the worker goes here, will be mounted into a config file
worker:
api:
Expand Down Expand Up @@ -110,6 +131,6 @@ worker:
# - name: "dodal"
# remote_url: https://github.com/DiamondLightSource/dodal.git

# Mount path for scratch area from host machine, setting
# Mount path for scratch area from host machine, setting
# this effectively enables scratch area management
scratchHostPath: "" # example: /usr/local/blueapi-software-scratch
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"dls-dodal>=1.24.0",
"super-state-machine", # See GH issue 553
"GitPython",
# "observability-utils",
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
4 changes: 4 additions & 0 deletions src/blueapi/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import click
from bluesky.callbacks.best_effort import BestEffortCallback
from observability_utils.tracing import setup_tracing
from pydantic import ValidationError
from requests.exceptions import ConnectionError

Expand Down Expand Up @@ -84,6 +85,8 @@ def schema(output: Path | None = None, update: bool = False) -> None:
@click.pass_obj
def start_application(obj: dict):
"""Run a worker that accepts plans to run"""

setup_tracing("BlueAPI") # initialise TracerProvider for server app
config: ApplicationConfig = obj["config"]

start(config)
Expand All @@ -100,6 +103,7 @@ def start_application(obj: dict):
def controller(ctx: click.Context, output: str) -> None:
"""Client utility for controlling and introspecting the worker"""

setup_tracing("BlueAPICLI") # initialise TracerProvider for controller app
if ctx.invoked_subcommand is None:
print("Please invoke subcommand!")
return
Expand Down
25 changes: 25 additions & 0 deletions src/blueapi/client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import time
from concurrent.futures import Future

from observability_utils.tracing import (
get_tracer,
start_as_current_span,
)

from blueapi.config import ApplicationConfig
from blueapi.core.bluesky_types import DataEvent
from blueapi.messaging import MessageContext, StompMessagingTemplate
Expand All @@ -19,6 +24,8 @@
from .event_bus import AnyEvent, BlueskyStreamingError, EventBusClient, OnAnyEvent
from .rest import BlueapiRestClient, BlueskyRemoteControlError

TRACER = get_tracer("client")


class BlueapiClient:
"""Unified client for controlling blueapi"""
Expand All @@ -44,6 +51,7 @@ def from_config(cls, config: ApplicationConfig) -> "BlueapiClient":
events = None
return cls(rest, events)

@start_as_current_span(TRACER)
def get_plans(self) -> PlanResponse:
"""
List plans available
Expand All @@ -53,6 +61,7 @@ def get_plans(self) -> PlanResponse:
"""
return self._rest.get_plans()

@start_as_current_span(TRACER, "name")
def get_plan(self, name: str) -> PlanModel:
"""
Get details of a single plan
Expand All @@ -65,6 +74,7 @@ def get_plan(self, name: str) -> PlanModel:
"""
return self._rest.get_plan(name)

@start_as_current_span(TRACER)
def get_devices(self) -> DeviceResponse:
"""
List devices available
Expand All @@ -75,6 +85,7 @@ def get_devices(self) -> DeviceResponse:

return self._rest.get_devices()

@start_as_current_span(TRACER, "name")
def get_device(self, name: str) -> DeviceModel:
"""
Get details of a single device
Expand All @@ -88,6 +99,7 @@ def get_device(self, name: str) -> DeviceModel:

return self._rest.get_device(name)

@start_as_current_span(TRACER)
def get_state(self) -> WorkerState:
"""
Get current state of the blueapi worker
Expand All @@ -98,6 +110,7 @@ def get_state(self) -> WorkerState:

return self._rest.get_state()

@start_as_current_span(TRACER, "defer")
def pause(self, defer: bool = False) -> WorkerState:
"""
Pause execution of the current task, if any
Expand All @@ -113,6 +126,7 @@ def pause(self, defer: bool = False) -> WorkerState:

return self._rest.set_state(WorkerState.PAUSED, defer=defer)

@start_as_current_span(TRACER)
def resume(self) -> WorkerState:
"""
Resume plan execution if previously paused
Expand All @@ -124,6 +138,7 @@ def resume(self) -> WorkerState:

return self._rest.set_state(WorkerState.RUNNING, defer=False)

@start_as_current_span(TRACER, "task_id")
def get_task(self, task_id: str) -> TrackableTask[Task]:
"""
Get a task stored by the worker
Expand All @@ -137,6 +152,7 @@ def get_task(self, task_id: str) -> TrackableTask[Task]:

return self._rest.get_task(task_id)

@start_as_current_span(TRACER)
def get_active_task(self) -> WorkerTask:
"""
Get the currently active task, if any
Expand All @@ -148,6 +164,7 @@ def get_active_task(self) -> WorkerTask:

return self._rest.get_active_task()

@start_as_current_span(TRACER, "task", "timeout")
def run_task(
self,
task: Task,
Expand Down Expand Up @@ -232,6 +249,7 @@ def create_and_start_task(self, task: Task) -> TaskResponse:
f"but {worker_response.task_id} was started instead"
)

@start_as_current_span(TRACER, "task")
def create_task(self, task: Task) -> TaskResponse:
"""
Create a new task, does not start execution
Expand All @@ -245,6 +263,7 @@ def create_task(self, task: Task) -> TaskResponse:

return self._rest.create_task(task)

@start_as_current_span(TRACER)
def clear_task(self, task_id: str) -> TaskResponse:
"""
Delete a stored task on the worker
Expand All @@ -258,6 +277,7 @@ def clear_task(self, task_id: str) -> TaskResponse:

return self._rest.clear_task(task_id)

@start_as_current_span(TRACER, "task")
def start_task(self, task: WorkerTask) -> WorkerTask:
"""
Instruct the worker to start a stored task immediately
Expand All @@ -271,6 +291,7 @@ def start_task(self, task: WorkerTask) -> WorkerTask:

return self._rest.update_worker_task(task)

@start_as_current_span(TRACER, "reason")
def abort(self, reason: str | None = None) -> WorkerState:
"""
Abort the plan currently being executed, if any.
Expand All @@ -291,6 +312,7 @@ def abort(self, reason: str | None = None) -> WorkerState:
reason=reason,
)

@start_as_current_span(TRACER)
def stop(self) -> WorkerState:
"""
Stop execution of the current plan early.
Expand All @@ -304,6 +326,7 @@ def stop(self) -> WorkerState:

return self._rest.cancel_current_task(WorkerState.STOPPING)

@start_as_current_span(TRACER)
def get_environment(self) -> EnvironmentResponse:
"""
Get details of the worker environment
Expand All @@ -315,6 +338,7 @@ def get_environment(self) -> EnvironmentResponse:

return self._rest.get_environment()

@start_as_current_span(TRACER, "timeout", "polling_interval")
def reload_environment(
self,
timeout: float | None = None,
Expand Down Expand Up @@ -347,6 +371,7 @@ def reload_environment(
polling_interval,
)

@start_as_current_span(TRACER, "timeout", "polling_interval")
def _wait_for_reload(
self,
status: EnvironmentResponse,
Expand Down
11 changes: 10 additions & 1 deletion src/blueapi/client/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
from typing import Any, Literal, TypeVar

import requests
from observability_utils.tracing import (
get_context_propagator,
get_tracer,
start_as_current_span,
)
from pydantic import parse_obj_as

from blueapi.config import RestConfig
Expand All @@ -18,6 +23,8 @@

T = TypeVar("T")

TRACER = get_tracer("rest")


class BlueskyRemoteControlError(Exception):
def __init__(self, message: str) -> None:
Expand Down Expand Up @@ -114,6 +121,7 @@ def delete_environment(self) -> EnvironmentResponse:
"/environment", EnvironmentResponse, method="DELETE"
)

@start_as_current_span(TRACER, "method", "data", "suffix")
def _request_and_deserialize(
self,
suffix: str,
Expand All @@ -123,8 +131,9 @@ def _request_and_deserialize(
get_exception: Callable[[requests.Response], Exception | None] = _exception,
) -> T:
url = self._url(suffix)
carr = get_context_propagator()
if data:
response = requests.request(method, url, json=data)
response = requests.request(method, url, json=data, headers=carr)
else:
response = requests.request(method, url)
exception = get_exception(response)
Expand Down
8 changes: 4 additions & 4 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class BasicAuthentication(BaseModel):
i.e. ${foo} or ${FOO} are replaced with the value of FOO
"""

username: str = "guest"
passcode: str = "guest"
username: str = "test" # "guest"
passcode: str = "test" # "guest"

@validator("username", "passcode")
def get_from_env(cls, v: str):
Expand All @@ -48,7 +48,7 @@ class StompConfig(BaseModel):

host: str = "localhost"
port: int = 61613
auth: BasicAuthentication | None = None
auth: BasicAuthentication | None = BasicAuthentication()


class WorkerEventConfig(BlueapiBaseModel):
Expand Down Expand Up @@ -101,7 +101,7 @@ class ApplicationConfig(BlueapiBaseModel):
config tree.
"""

stomp: StompConfig | None = None
stomp: StompConfig | None = StompConfig()
env: EnvironmentConfig = Field(default_factory=EnvironmentConfig)
logging: LoggingConfig = Field(default_factory=LoggingConfig)
api: RestConfig = Field(default_factory=RestConfig)
Expand Down
Loading
Loading