Skip to content

Commit

Permalink
Run BlueskyContext and Worker in subprocess (#343)
Browse files Browse the repository at this point in the history
Create SubprocessHandler as additional BlueskyHandler implementation to
forward calls to a subprocess. This is to allow a new REST endpoint with
the result of reloading the plans and devices without having to restart
the service.

Additionally the API version is incremented due to new schema and a
reference to delete_task replaced with clear_pending_task which was
missed in a previous PR.

Further improvements required in later PRs.
  • Loading branch information
joeshannon authored and ZohebShaikh committed May 7, 2024
1 parent 2f10be1 commit 99762fd
Show file tree
Hide file tree
Showing 10 changed files with 494 additions and 10 deletions.
34 changes: 33 additions & 1 deletion docs/user/reference/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ components:
- devices
title: DeviceResponse
type: object
EnvironmentResponse:
additionalProperties: false
description: State of internal environment.
properties:
initialized:
description: blueapi context initialized
title: Initialized
type: boolean
required:
- initialized
title: EnvironmentResponse
type: object
HTTPValidationError:
properties:
detail:
Expand Down Expand Up @@ -197,7 +209,7 @@ components:
type: object
info:
title: BlueAPI Control
version: 0.0.4
version: 0.0.5
openapi: 3.0.2
paths:
/devices:
Expand Down Expand Up @@ -237,6 +249,26 @@ paths:
$ref: '#/components/schemas/HTTPValidationError'
description: Validation Error
summary: Get Device By Name
/environment:
delete:
operationId: delete_environment_environment_delete
responses:
'200':
content:
application/json:
schema: {}
description: Successful Response
summary: Delete Environment
get:
operationId: get_environment_environment_get
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/EnvironmentResponse'
description: Successful Response
summary: Get Environment
/plans:
get:
description: Retrieve information about all available plans.
Expand Down
2 changes: 1 addition & 1 deletion src/blueapi/cli/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def create_task(self, task: RunPlan) -> TaskResponse:
data=task.dict(),
)

def delete_task(self, task_id: str) -> TaskResponse:
def clear_pending_task(self, task_id: str) -> TaskResponse:
return self._request_and_deserialize(
f"/tasks/{task_id}", TaskResponse, method="DELETE"
)
Expand Down
8 changes: 7 additions & 1 deletion src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Handler(BlueskyHandler):
_worker: Worker
_config: ApplicationConfig
_messaging_template: MessagingTemplate
_initialized: bool = False

def __init__(
self,
Expand Down Expand Up @@ -64,6 +65,7 @@ def start(self) -> None:
)

self._messaging_template.connect()
self._initialized = True

def _publish_event_streams(
self, streams_to_destinations: Mapping[EventStream, str]
Expand All @@ -79,6 +81,7 @@ def _publish_event_stream(self, stream: EventStream, destination: str) -> None:
)

def stop(self) -> None:
self._initialized = False
self._worker.stop()
if self._messaging_template.is_connected():
self._messaging_template.disconnect()
Expand Down Expand Up @@ -134,6 +137,10 @@ def pending_tasks(self) -> List[TrackableTask]:
def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
return self._worker.get_pending_task(task_id)

@property
def initialized(self) -> bool:
return self._initialized


HANDLER: Optional[Handler] = None

Expand All @@ -145,7 +152,6 @@ def setup_handler(

provider = None
plan_wrappers = []

if config:
visit_service_client: VisitServiceClientBase
if config.env.data_writing.visit_service_url is not None:
Expand Down
18 changes: 18 additions & 0 deletions src/blueapi/service/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,21 @@ def pending_tasks(self) -> List[TrackableTask]:
def get_pending_task(self, task_id: str) -> Optional[TrackableTask]:
"""Returns a task matching the task ID supplied,
if the worker knows of it"""

@abstractmethod
def start(self):
"""Start the handler"""

@abstractmethod
def stop(self):
"""Stop the handler"""

@property
@abstractmethod
def initialized(self) -> bool:
"""Handler initialization state"""


class HandlerNotStartedError(Exception):
def __init__(self, message):
super().__init__(message)
64 changes: 59 additions & 5 deletions src/blueapi/service/main.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,61 @@
from contextlib import asynccontextmanager
from typing import Dict, Set

from fastapi import Body, Depends, FastAPI, HTTPException, Request, Response, status
from typing import Dict, Optional, Set

from fastapi import (
BackgroundTasks,
Body,
Depends,
FastAPI,
HTTPException,
Request,
Response,
status,
)
from pydantic import ValidationError
from starlette.responses import JSONResponse
from super_state_machine.errors import TransitionError

from blueapi.config import ApplicationConfig
from blueapi.worker import RunPlan, TrackableTask, WorkerState

from .handler import get_handler, setup_handler, teardown_handler
from .handler_base import BlueskyHandler
from .model import (
DeviceModel,
DeviceResponse,
EnvironmentResponse,
PlanModel,
PlanResponse,
StateChangeRequest,
TaskResponse,
WorkerTask,
)
from .subprocess_handler import SubprocessHandler

REST_API_VERSION = "0.0.5"

HANDLER: Optional[BlueskyHandler] = None


def get_handler() -> BlueskyHandler:
if HANDLER is None:
raise ValueError()
return HANDLER


def setup_handler(config: Optional[ApplicationConfig] = None):
global HANDLER
handler = SubprocessHandler(config)
handler.start()

HANDLER = handler

REST_API_VERSION = "0.0.4"

def teardown_handler():
global HANDLER
if HANDLER is None:
return
HANDLER.stop()
HANDLER = None


@asynccontextmanager
Expand Down Expand Up @@ -49,6 +83,26 @@ async def on_key_error_404(_: Request, __: KeyError):
)


@app.get("/environment", response_model=EnvironmentResponse)
def get_environment(
handler: BlueskyHandler = Depends(get_handler),
) -> EnvironmentResponse:
return EnvironmentResponse(initialized=handler.initialized)


@app.delete("/environment")
async def delete_environment(
background_tasks: BackgroundTasks,
handler: BlueskyHandler = Depends(get_handler),
):
def restart_handler(handler: BlueskyHandler):
handler.stop()
handler.start()

if handler.initialized:
background_tasks.add_task(restart_handler, handler)


@app.get("/plans", response_model=PlanResponse)
def get_plans(handler: BlueskyHandler = Depends(get_handler)):
"""Retrieve information about all available plans."""
Expand Down
8 changes: 8 additions & 0 deletions src/blueapi/service/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,11 @@ class StateChangeRequest(BlueapiBaseModel):
description="The reason for the current run to be aborted",
default=None,
)


class EnvironmentResponse(BlueapiBaseModel):
"""
State of internal environment.
"""

initialized: bool = Field(description="blueapi context initialized")
Loading

0 comments on commit 99762fd

Please sign in to comment.