Skip to content

Commit

Permalink
Run BlueskyContext and Worker in subprocess
Browse files Browse the repository at this point in the history
This approach is possibly not the best design as it results in four
definitions of all the functions.
  • Loading branch information
joeshannon committed Dec 13, 2023
1 parent c5bf8f9 commit 16bbbb9
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/blueapi/service/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,11 @@ def pending_tasks(self) -> List[TrackableTask]:
def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
"""Returns a task matching the task ID supplied,
if the worker knows of it"""

@abstractmethod
def start(self):
"""Start the handler"""

@abstractmethod
def stop(self):
"""Stop the handler"""
35 changes: 33 additions & 2 deletions src/blueapi/service/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from contextlib import asynccontextmanager
from typing import Dict, Set
from typing import Dict, Optional, Set

from fastapi import Body, Depends, FastAPI, HTTPException, Request, Response, status
from pydantic import ValidationError
Expand All @@ -9,7 +9,6 @@
from blueapi.config import ApplicationConfig
from blueapi.worker import RunPlan, TrackableTask, WorkerState

from .handler import get_handler, setup_handler, teardown_handler
from .handler_base import BlueskyHandler
from .model import (
DeviceModel,
Expand All @@ -20,9 +19,34 @@
TaskResponse,
WorkerTask,
)
from .subprocess_handler import SubprocessHandler

REST_API_VERSION = "0.0.4"

HANDLER: Optional[BlueskyHandler] = None


def get_handler() -> BlueskyHandler:
if HANDLER is None:
raise ValueError()
return HANDLER


def setup_handler(config: Optional[ApplicationConfig] = None):
global HANDLER
handler = SubprocessHandler(config)
handler.start()

HANDLER = handler


def teardown_handler():
global HANDLER
if HANDLER is None:
return
HANDLER.stop()
HANDLER = None


@asynccontextmanager
async def lifespan(app: FastAPI):
Expand All @@ -49,6 +73,13 @@ async def on_key_error_404(_: Request, __: KeyError):
)


@app.put("/reload")
def update_task(handler: BlueskyHandler = Depends(get_handler)):
handler.stop()
handler.start()
print("Reloaded")


@app.get("/plans", response_model=PlanResponse)
def get_plans(handler: BlueskyHandler = Depends(get_handler)):
"""Retrieve information about all available plans."""
Expand Down
143 changes: 143 additions & 0 deletions src/blueapi/service/subprocess_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from multiprocessing import Pool
from typing import List, Optional

from blueapi.config import ApplicationConfig
from blueapi.service.facade import BlueskyHandler
from blueapi.service.handler import get_handler, setup_handler, teardown_handler
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import WorkerState
from blueapi.worker.task import RunPlan
from blueapi.worker.worker import TrackableTask


class SubprocessHandler(BlueskyHandler):
_config: ApplicationConfig
_subprocess: Pool

def __init__(
self,
config: Optional[ApplicationConfig] = None,
) -> None:
self._config = config or ApplicationConfig()
self._subprocess = None

def start(self):
if self._subprocess is not None:
raise ValueError("Subprocess already running")
self._subprocess = Pool(processes=1)
self._subprocess.apply(setup_handler, [self._config])

def stop(self):
self._subprocess.apply(teardown_handler)
self._subprocess.terminate()
self._subprocess = None

def reload_context(self):
self.stop()
self.start()

@property
def plans(self) -> List[PlanModel]:
return self._subprocess.apply(plans)

def get_plan(self, name: str) -> PlanModel:
return self._subprocess.apply(get_plan, [name])

@property
def devices(self) -> List[DeviceModel]:
return self._subprocess.apply(devices)

def get_device(self, name: str) -> DeviceModel:
return self._subprocess.apply(get_device, [name])

def submit_task(self, task: RunPlan) -> str:
return self._subprocess.apply(submit_task, [task])

def delete_task(self, task_id: str) -> str:
return self._subprocess.apply(delete_task, [task_id])

def begin_task(self, task: WorkerTask) -> WorkerTask:
return self._subprocess.apply(begin_task, [task])

@property
def active_task(self) -> Optional[TrackableTask]:
return self._subprocess.apply(active_task)

@property
def state(self) -> WorkerState:
return self._subprocess.apply(state)

def pause_worker(self, defer: Optional[bool]) -> None:
return self._subprocess.apply(pause_worker, [defer])

def resume_worker(self) -> None:
return self._subprocess.apply(resume_worker)

def cancel_active_task(self, failure: bool, reason: Optional[str]) -> None:
return self._subprocess.apply(cancel_active_task, [failure, reason])

@property
def pending_tasks(self) -> List[TrackableTask]:
return self._subprocess.apply(pending_tasks)

def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
return self._subprocess.apply(get_pending_task, [task_id])


# Free functions (passed to subprocess) for each of the methods required by Handler


def plans() -> List[PlanModel]:
return get_handler().plans


def get_plan(name: str):
return get_handler().get_plan(name)


def devices() -> List[DeviceModel]:
return get_handler().devices


def get_device(name: str) -> DeviceModel:
return get_handler().get_device(name)


def submit_task(task: RunPlan) -> str:
return get_handler().submit_task(task)


def delete_task(task_id: str) -> str:
return get_handler().delete_task(task_id)


def begin_task(task: WorkerTask) -> WorkerTask:
return get_handler().begin_task(task)


def active_task() -> Optional[TrackableTask]:
return get_handler().active_task


def state() -> WorkerState:
return get_handler().state


def pause_worker(defer: Optional[bool]) -> None:
return get_handler().pause_worker(defer)


def resume_worker() -> None:
return get_handler().resume_worker()


def cancel_active_task(failure: bool, reason: Optional[str]) -> None:
return get_handler().cancel_active_task()


def pending_tasks() -> List[TrackableTask]:
return get_handler().pending_tasks


def get_pending_task(task_id: str) -> Optional[TrackableTask]:
return get_handler().get_pending_task()

0 comments on commit 16bbbb9

Please sign in to comment.