Skip to content

Commit

Permalink
Replace interface singleton class with cached functions
Browse files Browse the repository at this point in the history
Replace with lru_cached functions. These will return the same value
each time they are called. This simplifies the interface module.

The tests and checks for working initialisation have been removed as
they are no longer appropriate.
  • Loading branch information
joeshannon committed Jul 31, 2024
1 parent 0a4abfe commit 37a6530
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 219 deletions.
189 changes: 80 additions & 109 deletions src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import logging
from collections.abc import Mapping
from functools import lru_cache
from typing import Any

from blueapi.config import ApplicationConfig
from blueapi.core.context import BlueskyContext
from blueapi.core.event import EventStream
from blueapi.messaging.base import MessagingTemplate
from blueapi.messaging.stomptemplate import StompMessagingTemplate
from blueapi.service.model import (
DeviceModel,
PlanModel,
WorkerTask,
)
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import TaskStatusEnum, WorkerState
from blueapi.worker.reworker import TaskWorker
from blueapi.worker.task import Task
Expand All @@ -20,61 +18,77 @@
context and worker"""


class InitialisationException(Exception):
pass
_CONFIG: ApplicationConfig = ApplicationConfig()


class _Singleton:
context: BlueskyContext
worker: Worker
messaging_template: MessagingTemplate | None = None
initialized = False
def config() -> ApplicationConfig:
return _CONFIG


def start_worker(
config: ApplicationConfig,
bluesky_context: BlueskyContext | None = None,
worker: TaskWorker | None = None,
) -> None:
"""Creates and starts a worker with supplied config"""
if _Singleton.initialized:
raise InitialisationException(
"Worker is already running. To reload call stop first"
)
if bluesky_context is None:
_Singleton.context = BlueskyContext()
_Singleton.context.with_config(config.env)
else:
_Singleton.context = bluesky_context
def set_config(new_config: ApplicationConfig):
global _CONFIG

if worker is None:
_Singleton.worker = TaskWorker(
_Singleton.context,
broadcast_statuses=config.env.events.broadcast_status_events,
)
else:
_Singleton.worker = worker
if config.stomp is not None:
_Singleton.messaging_template = StompMessagingTemplate.autoconfigured(
config.stomp
)
_CONFIG = new_config

# Start worker and setup events
_Singleton.worker.start()
if _Singleton.messaging_template is not None:
event_topic = _Singleton.messaging_template.destinations.topic(
"public.worker.event"
)

@lru_cache
def context() -> BlueskyContext:
ctx = BlueskyContext()
ctx.with_config(config().env)
return ctx


@lru_cache
def worker() -> Worker:
worker = TaskWorker(
context(),
broadcast_statuses=config().env.events.broadcast_status_events,
)
worker.start()
return worker


@lru_cache
def messaging_template() -> MessagingTemplate | None:
stomp_config = config().stomp
if stomp_config is not None:
template = StompMessagingTemplate.autoconfigured(stomp_config)

task_worker = worker()
event_topic = template.destinations.topic("public.worker.event")

_publish_event_streams(
{
_Singleton.worker.worker_events: event_topic,
_Singleton.worker.progress_events: event_topic,
_Singleton.worker.data_events: event_topic,
task_worker.worker_events: event_topic,
task_worker.progress_events: event_topic,
task_worker.data_events: event_topic,
}
)
_Singleton.messaging_template.connect()
_Singleton.initialized = True
template.connect()
return template
else:
return None


def setup(config: ApplicationConfig) -> None:
"""Creates and starts a worker with supplied config"""

set_config(config)

# Eagerly initialize worker and messaging connection

logging.basicConfig(level=config.logging.level)
worker()
messaging_template()


def teardown() -> None:
worker().stop()
if (template := messaging_template()) is not None:
template.disconnect()
context.cache_clear()
worker.cache_clear()
messaging_template.cache_clear()


def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -> None:
Expand All @@ -84,130 +98,87 @@ def _publish_event_streams(streams_to_destinations: Mapping[EventStream, str]) -

def _publish_event_stream(stream: EventStream, destination: str) -> None:
def forward_message(event: Any, correlation_id: str | None) -> None:
if _Singleton.messaging_template is not None:
_Singleton.messaging_template.send(destination, event, None, correlation_id)
if (template := messaging_template()) is not None:
template.send(destination, event, None, correlation_id)

stream.subscribe(forward_message)


def stop_worker() -> None:
if not _Singleton.initialized:
raise InitialisationException(
"Cannot stop worker as it hasn't been started yet"
)
_Singleton.initialized = False
_Singleton.worker.stop()
if (
_Singleton.messaging_template is not None
and _Singleton.messaging_template.is_connected()
):
_Singleton.messaging_template.disconnect()


def get_plans() -> list[PlanModel]:
"""Get all available plans in the BlueskyContext"""
_ensure_worker_started()
return [PlanModel.from_plan(plan) for plan in _Singleton.context.plans.values()]
return [PlanModel.from_plan(plan) for plan in context().plans.values()]


def get_plan(name: str) -> PlanModel:
"""Get plan by name from the BlueskyContext"""
_ensure_worker_started()
return PlanModel.from_plan(_Singleton.context.plans[name])
return PlanModel.from_plan(context().plans[name])


def get_devices() -> list[DeviceModel]:
"""Get all available devices in the BlueskyContext"""
_ensure_worker_started()
return [
DeviceModel.from_device(device)
for device in _Singleton.context.devices.values()
]
return [DeviceModel.from_device(device) for device in context().devices.values()]


def get_device(name: str) -> DeviceModel:
"""Retrieve device by name from the BlueskyContext"""
_ensure_worker_started()
return DeviceModel.from_device(_Singleton.context.devices[name])
return DeviceModel.from_device(context().devices[name])


def submit_task(task: Task) -> str:
"""Submit a task to be run on begin_task"""
_ensure_worker_started()
return _Singleton.worker.submit_task(task)
return worker().submit_task(task)


def clear_task(task_id: str) -> str:
"""Remove a task from the worker"""
_ensure_worker_started()
return _Singleton.worker.clear_task(task_id)
return worker().clear_task(task_id)


def begin_task(task: WorkerTask) -> WorkerTask:
"""Trigger a task. Will fail if the worker is busy"""
_ensure_worker_started()
if task.task_id is not None:
_Singleton.worker.begin_task(task.task_id)
worker().begin_task(task.task_id)
return task


def get_tasks_by_status(status: TaskStatusEnum) -> list[TrackableTask]:
"""Retrieve a list of tasks based on their status."""
_ensure_worker_started()
return _Singleton.worker.get_tasks_by_status(status)
return worker().get_tasks_by_status(status)


def get_active_task() -> TrackableTask | None:
"""Task the worker is currently running"""
_ensure_worker_started()
return _Singleton.worker.get_active_task()
return worker().get_active_task()


def get_worker_state() -> WorkerState:
"""State of the worker"""
_ensure_worker_started()
return _Singleton.worker.state
return worker().state


def pause_worker(defer: bool | None) -> None:
"""Command the worker to pause"""
_ensure_worker_started()
_Singleton.worker.pause(defer)
worker().pause(defer)


def resume_worker() -> None:
"""Command the worker to resume"""
_ensure_worker_started()
_Singleton.worker.resume()
worker().resume()


def cancel_active_task(failure: bool, reason: str | None) -> str:
"""Remove the currently active task from the worker if there is one
Returns the task_id of the active task"""
_ensure_worker_started()
return _Singleton.worker.cancel_active_task(failure, reason)
return worker().cancel_active_task(failure, reason)


def get_tasks() -> list[TrackableTask]:
"""Return a list of all tasks on the worker,
any one of which can be triggered with begin_task"""
_ensure_worker_started()
return _Singleton.worker.get_tasks()
return worker().get_tasks()


def get_task_by_id(task_id: str) -> TrackableTask | None:
"""Returns a task matching the task ID supplied,
if the worker knows of it"""
_ensure_worker_started()
return _Singleton.worker.get_task_by_id(task_id)


def get_state() -> bool:
"""Initialization state"""
return _Singleton.initialized


def _ensure_worker_started() -> None:
if _Singleton.initialized:
return
raise InitialisationException("Worker must be stared before it is used")
return worker().get_task_by_id(task_id)
6 changes: 1 addition & 5 deletions src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,8 @@ async def delete_environment(
) -> EnvironmentResponse:
"""Delete the current environment, causing internal components to be reloaded."""

def restart_runner(runner: WorkerDispatcher):
runner.stop()
runner.start()

if runner.state.initialized or runner.state.error_message is not None:
background_tasks.add_task(restart_runner, runner)
background_tasks.add_task(runner.reload)
return EnvironmentResponse(initialized=False)


Expand Down
Loading

0 comments on commit 37a6530

Please sign in to comment.