diff --git a/src/blueapi/service/handler_base.py b/src/blueapi/service/handler_base.py index faecda6e6..7bd292387 100644 --- a/src/blueapi/service/handler_base.py +++ b/src/blueapi/service/handler_base.py @@ -83,3 +83,11 @@ def pending_tasks(self) -> List[TrackableTask]: def get_pending_task(self, task_id: str) -> Optional[TrackableTask]: """Returns a task matching the task ID supplied, if the worker knows of it""" + + @abstractmethod + def start(self): + """Start the handler""" + + @abstractmethod + def stop(self): + """Stop the handler""" diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 7440c124b..6901b9e6f 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -1,5 +1,5 @@ from contextlib import asynccontextmanager -from typing import Dict, Set +from typing import Dict, Optional, Set from fastapi import Body, Depends, FastAPI, HTTPException, Request, Response, status from pydantic import ValidationError @@ -9,7 +9,6 @@ from blueapi.config import ApplicationConfig from blueapi.worker import RunPlan, TrackableTask, WorkerState -from .handler import get_handler, setup_handler, teardown_handler from .handler_base import BlueskyHandler from .model import ( DeviceModel, @@ -20,9 +19,34 @@ TaskResponse, WorkerTask, ) +from .subprocess_handler import SubprocessHandler REST_API_VERSION = "0.0.4" +HANDLER: Optional[BlueskyHandler] = None + + +def get_handler() -> BlueskyHandler: + if HANDLER is None: + raise ValueError() + return HANDLER + + +def setup_handler(config: Optional[ApplicationConfig] = None): + global HANDLER + handler = SubprocessHandler(config) + handler.start() + + HANDLER = handler + + +def teardown_handler(): + global HANDLER + if HANDLER is None: + return + HANDLER.stop() + HANDLER = None + @asynccontextmanager async def lifespan(app: FastAPI): @@ -49,6 +73,13 @@ async def on_key_error_404(_: Request, __: KeyError): ) +@app.put("/reload") +def update_task(handler: BlueskyHandler = Depends(get_handler)): + handler.stop() + handler.start() + print("Reloaded") + + @app.get("/plans", response_model=PlanResponse) def get_plans(handler: BlueskyHandler = Depends(get_handler)): """Retrieve information about all available plans.""" diff --git a/src/blueapi/service/subprocess_handler.py b/src/blueapi/service/subprocess_handler.py new file mode 100644 index 000000000..a9ff2d494 --- /dev/null +++ b/src/blueapi/service/subprocess_handler.py @@ -0,0 +1,143 @@ +from multiprocessing import Pool +from typing import List, Optional + +from blueapi.config import ApplicationConfig +from blueapi.service.facade import BlueskyHandler +from blueapi.service.handler import get_handler, setup_handler, teardown_handler +from blueapi.service.model import DeviceModel, PlanModel, WorkerTask +from blueapi.worker.event import WorkerState +from blueapi.worker.task import RunPlan +from blueapi.worker.worker import TrackableTask + + +class SubprocessHandler(BlueskyHandler): + _config: ApplicationConfig + _subprocess: Pool + + def __init__( + self, + config: Optional[ApplicationConfig] = None, + ) -> None: + self._config = config or ApplicationConfig() + self._subprocess = None + + def start(self): + if self._subprocess is not None: + raise ValueError("Subprocess already running") + self._subprocess = Pool(processes=1) + self._subprocess.apply(setup_handler, [self._config]) + + def stop(self): + self._subprocess.apply(teardown_handler) + self._subprocess.terminate() + self._subprocess = None + + def reload_context(self): + self.stop() + self.start() + + @property + def plans(self) -> List[PlanModel]: + return self._subprocess.apply(plans) + + def get_plan(self, name: str) -> PlanModel: + return self._subprocess.apply(get_plan, [name]) + + @property + def devices(self) -> List[DeviceModel]: + return self._subprocess.apply(devices) + + def get_device(self, name: str) -> DeviceModel: + return self._subprocess.apply(get_device, [name]) + + def submit_task(self, task: RunPlan) -> str: + return self._subprocess.apply(submit_task, [task]) + + def delete_task(self, task_id: str) -> str: + return self._subprocess.apply(delete_task, [task_id]) + + def begin_task(self, task: WorkerTask) -> WorkerTask: + return self._subprocess.apply(begin_task, [task]) + + @property + def active_task(self) -> Optional[TrackableTask]: + return self._subprocess.apply(active_task) + + @property + def state(self) -> WorkerState: + return self._subprocess.apply(state) + + def pause_worker(self, defer: Optional[bool]) -> None: + return self._subprocess.apply(pause_worker, [defer]) + + def resume_worker(self) -> None: + return self._subprocess.apply(resume_worker) + + def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None: + return self._subprocess.apply(cancel_active_task, [failure, reason]) + + @property + def pending_tasks(self) -> List[TrackableTask]: + return self._subprocess.apply(pending_tasks) + + def get_pending_task(self, task_id: str) -> Optional[TrackableTask]: + return self._subprocess.apply(get_pending_task, [task_id]) + + +# Free functions (passed to subprocess) for each of the methods required by Handler + + +def plans() -> List[PlanModel]: + return get_handler().plans + + +def get_plan(name: str): + return get_handler().get_plan(name) + + +def devices() -> List[DeviceModel]: + return get_handler().devices + + +def get_device(name: str) -> DeviceModel: + return get_handler().get_device(name) + + +def submit_task(task: RunPlan) -> str: + return get_handler().submit_task(task) + + +def delete_task(task_id: str) -> str: + return get_handler().delete_task(task_id) + + +def begin_task(task: WorkerTask) -> WorkerTask: + return get_handler().begin_task(task) + + +def active_task() -> Optional[TrackableTask]: + return get_handler().active_task + + +def state() -> WorkerState: + return get_handler().state + + +def pause_worker(defer: Optional[bool]) -> None: + return get_handler().pause_worker(defer) + + +def resume_worker() -> None: + return get_handler().resume_worker() + + +def cancel_active_task(failure: bool, reason: Optional[str]) -> None: + return get_handler().cancel_active_task() + + +def pending_tasks() -> List[TrackableTask]: + return get_handler().pending_tasks + + +def get_pending_task(task_id: str) -> Optional[TrackableTask]: + return get_handler().get_pending_task()