Skip to content

Commit

Permalink
[App] Add status endpoint, enable ready (#16075)
Browse files Browse the repository at this point in the history
Co-authored-by: thomas chaton <[email protected]>
  • Loading branch information
ethanwharris and tchaton authored Dec 19, 2022
1 parent f3157f3 commit 2a85d9b
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 58 deletions.
4 changes: 4 additions & 0 deletions examples/app_boring/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def __init__(self):
raise_exception=True,
)

@property
def ready(self) -> bool:
return self.dest_work.is_running

def run(self):
self.source_work.run()
if self.source_work.has_succeeded:
Expand Down
3 changes: 3 additions & 0 deletions src/lightning_app/components/serve/streamlit.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class ServeStreamlit(LightningWork, abc.ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.ready = False

self._process = None

@property
Expand Down Expand Up @@ -58,6 +60,7 @@ def run(self) -> None:
],
env=env,
)
self.ready = True
self._process.wait()

def on_exit(self) -> None:
Expand Down
25 changes: 22 additions & 3 deletions src/lightning_app/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from lightning_app.core.queues import QueuingSystem
from lightning_app.storage import Drive
from lightning_app.utilities.app_helpers import InMemoryStateStore, Logger, StateStore
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.cloud import is_running_in_cloud
from lightning_app.utilities.component import _context
from lightning_app.utilities.enum import ComponentContext, OpenAPITags
Expand Down Expand Up @@ -66,18 +67,24 @@ class SessionMiddleware:
lock = Lock()

app_spec: Optional[List] = None
app_status: Optional[AppStatus] = None

# In the future, this would be abstracted to support horizontal scaling.
responses_store = {}

logger = Logger(__name__)


# This can be replaced with a consumer that publishes states in a kv-store
# in a serverless architecture


class UIRefresher(Thread):
def __init__(self, api_publish_state_queue, api_response_queue, refresh_interval: float = 0.1) -> None:
def __init__(
self,
api_publish_state_queue,
api_response_queue,
refresh_interval: float = 0.1,
) -> None:
super().__init__(daemon=True)
self.api_publish_state_queue = api_publish_state_queue
self.api_response_queue = api_response_queue
Expand All @@ -98,7 +105,8 @@ def run(self):

def run_once(self):
try:
state = self.api_publish_state_queue.get(timeout=0)
global app_status
state, app_status = self.api_publish_state_queue.get(timeout=0)
with lock:
global_app_state_store.set_app_state(TEST_SESSION_UUID, state)
except queue.Empty:
Expand Down Expand Up @@ -326,6 +334,17 @@ async def upload_file(response: Response, filename: str, uploaded_file: UploadFi
return f"Successfully uploaded '{filename}' to the Drive"


@fastapi_service.get("/api/v1/status", response_model=AppStatus)
async def get_status() -> AppStatus:
"""Get the current status of the app and works."""
global app_status
if app_status is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="App status hasn't been reported yet."
)
return app_status


@fastapi_service.get("/healthz", status_code=200)
async def healthz(response: Response):
"""Health check endpoint used in the cloud FastAPI servers to check the status periodically."""
Expand Down
38 changes: 26 additions & 12 deletions src/lightning_app/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
_should_dispatch_app,
Logger,
)
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.commands.base import _process_requests
from lightning_app.utilities.component import _convert_paths_after_init, _validate_root_flow
from lightning_app.utilities.enum import AppStage, CacheCallsKeys
Expand Down Expand Up @@ -140,6 +141,7 @@ def __init__(
self.exception = None
self.collect_changes: bool = True

self.status: Optional[AppStatus] = None
# TODO: Enable ready locally for opening the UI.
self.ready = False

Expand All @@ -150,6 +152,7 @@ def __init__(
self.checkpointing: bool = False

self._update_layout()
self._update_status()

self.is_headless: Optional[bool] = None

Expand Down Expand Up @@ -418,6 +421,7 @@ def run_once(self):

self._update_layout()
self._update_is_headless()
self._update_status()
self.maybe_apply_changes()

if self.checkpointing and self._should_snapshot():
Expand Down Expand Up @@ -485,19 +489,12 @@ def _run(self) -> bool:
self._original_state = deepcopy(self.state)
done = False

# TODO: Re-enable the `ready` property once issues are resolved
if not self.root.ready:
warnings.warn(
"One of your Flows returned `.ready` as `False`. "
"This feature is not yet enabled so this will be ignored.",
UserWarning,
)
self.ready = True
self.ready = self.root.ready

self._start_with_flow_works()

if self.ready and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)
if self.should_publish_changes_to_api and self.api_publish_state_queue is not None:
self.api_publish_state_queue.put((self.state_vars, self.status))

self._reset_run_time_monitor()

Expand All @@ -506,8 +503,8 @@ def _run(self) -> bool:

self._update_run_time_monitor()

if self.ready and self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)
if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue is not None:
self.api_publish_state_queue.put((self.state_vars, self.status))

self._has_updated = False

Expand All @@ -532,6 +529,23 @@ def _update_is_headless(self) -> None:
# This ensures support for apps which dynamically add a UI at runtime.
_handle_is_headless(self)

def _update_status(self) -> None:
old_status = self.status

work_statuses = {}
for work in breadth_first(self.root, types=(lightning_app.LightningWork,)):
work_statuses[work.name] = work.status

self.status = AppStatus(
is_ui_ready=self.ready,
work_statuses=work_statuses,
)

# If the work statuses changed, the state delta will trigger an update.
# If ready has changed, we trigger an update manually.
if self.status != old_status:
self._has_updated = True

def _apply_restarting(self) -> bool:
self._reset_original_state()
# apply stage after restoring the original state.
Expand Down
5 changes: 1 addition & 4 deletions src/lightning_app/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ def __getattr__(self, item):

@property
def ready(self) -> bool:
"""Not currently enabled.
Override to customize when your App should be ready.
"""
"""Override to customize when your App should be ready."""
flows = self.flows
return all(flow.ready for flow in flows.values()) if flows else True

Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/core/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
from lightning_app.storage.drive import _maybe_create_drive, Drive
from lightning_app.storage.payload import Payload
from lightning_app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, is_overridden
from lightning_app.utilities.app_status import WorkStatus
from lightning_app.utilities.component import _is_flow_context, _sanitize_state
from lightning_app.utilities.enum import (
CacheCallsKeys,
make_status,
WorkFailureReasons,
WorkStageStatus,
WorkStatus,
WorkStopReasons,
)
from lightning_app.utilities.exceptions import LightningWorkException
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/runners/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def terminate(self) -> None:
self._add_stopped_status_to_work(work)

# Publish the updated state and wait for the frontend to update.
self.app.api_publish_state_queue.put(self.app.state)
self.app.api_publish_state_queue.put((self.app.state, self.app.status))

for thread in self.threads + self.app.threads:
thread.join(timeout=0)
Expand Down
29 changes: 29 additions & 0 deletions src/lightning_app/utilities/app_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from datetime import datetime
from typing import Any, Dict, Optional

from pydantic import BaseModel


class WorkStatus(BaseModel):
"""The ``WorkStatus`` captures the status of a work according to the app."""

stage: str
timestamp: float
reason: Optional[str] = None
message: Optional[str] = None
count: int = 1

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)

assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10)


class AppStatus(BaseModel):
"""The ``AppStatus`` captures the current status of the app and its components."""

# ``True`` when the app UI is ready to be viewed
is_ui_ready: bool

# The statuses of ``LightningWork`` objects currently associated with this app
work_statuses: Dict[str, WorkStatus]
13 changes: 0 additions & 13 deletions src/lightning_app/utilities/enum.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import enum
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional

Expand Down Expand Up @@ -47,18 +46,6 @@ class WorkStageStatus:
FAILED = "failed"


@dataclass
class WorkStatus:
stage: WorkStageStatus
timestamp: float
reason: Optional[str] = None
message: Optional[str] = None
count: int = 1

def __post_init__(self):
assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10)


def make_status(stage: str, message: Optional[str] = None, reason: Optional[str] = None):
status = {
"stage": stage,
Expand Down
8 changes: 6 additions & 2 deletions tests/tests_app/core/test_lightning_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from lightning_app.runners import MultiProcessRuntime
from lightning_app.storage.drive import Drive
from lightning_app.testing.helpers import _MockQueue
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.component import _set_frontend_context, _set_work_context
from lightning_app.utilities.enum import AppStage
from lightning_app.utilities.load_app import extract_metadata_from_app
Expand Down Expand Up @@ -195,7 +196,7 @@ def test_update_publish_state_and_maybe_refresh_ui():
publish_state_queue = _MockQueue("publish_state_queue")
api_response_queue = _MockQueue("api_response_queue")

publish_state_queue.put(app.state_with_changes)
publish_state_queue.put((app.state_with_changes, None))

thread = UIRefresher(publish_state_queue, api_response_queue)
thread.run_once()
Expand Down Expand Up @@ -226,7 +227,7 @@ def get(self, timeout: int = 0):
has_started_queue = _MockQueue("has_started_queue")
api_response_queue = _MockQueue("api_response_queue")
state = app.state_with_changes
publish_state_queue.put(state)
publish_state_queue.put((state, AppStatus(is_ui_ready=True, work_statuses={})))
spec = extract_metadata_from_app(app)
ui_refresher = start_server(
publish_state_queue,
Expand Down Expand Up @@ -284,6 +285,9 @@ def get(self, timeout: int = 0):
{"name": "main_4", "content": "https://te"},
]

response = await client.get("/api/v1/status")
assert response.json() == {"is_ui_ready": True, "work_statuses": {}}

response = await client.post("/api/v1/state", json={"state": new_state}, headers=headers)
assert change_state_queue._queue[1].to_dict() == {
"values_changed": {"root['vars']['counter']": {"new_value": 1}}
Expand Down
34 changes: 25 additions & 9 deletions tests/tests_app/core/test_lightning_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import dataclass
from functools import partial
from time import time
from unittest.mock import ANY, MagicMock
from unittest.mock import ANY

import pytest
from deepdiff import DeepDiff, Delta
Expand All @@ -19,7 +19,7 @@
from lightning_app.storage.path import _storage_root_dir
from lightning_app.structures import Dict as LDict
from lightning_app.structures import List as LList
from lightning_app.testing.helpers import EmptyFlow, EmptyWork
from lightning_app.testing.helpers import _MockQueue, EmptyFlow, EmptyWork
from lightning_app.utilities.app_helpers import (
_delta_to_app_state_delta,
_LightningAppRef,
Expand Down Expand Up @@ -891,21 +891,37 @@ def run(self):


def test_flow_ready():
"""This test validates the api publish state queue is populated only once ready is True."""
"""This test validates that the app status queue is populated correctly."""

mock_queue = _MockQueue("api_publish_state_queue")

def run_patch(method):
app.api_publish_state_queue = MagicMock()
app.should_publish_changes_to_api = False
app.should_publish_changes_to_api = True
app.api_publish_state_queue = mock_queue
method()

state = {"done": False}

def lagged_run_once(method):
"""Ensure that the full loop is run after the app exits."""
new_done = method()
if state["done"]:
return True
state["done"] = new_done
return False

app = LightningApp(FlowReady())
app._run = partial(run_patch, method=app._run)
app.run_once = partial(lagged_run_once, method=app.run_once)
MultiProcessRuntime(app, start_server=False).dispatch()

# Validates the state has been added only when ready was true.
state = app.api_publish_state_queue.put._mock_call_args[0][0]
call_hash = state["works"]["w"]["calls"]["latest_call_hash"]
assert state["works"]["w"]["calls"][call_hash]["statuses"][0]["stage"] == "succeeded"
_, first_status = mock_queue.get()
assert not first_status.is_ui_ready

_, last_status = mock_queue.get()
while len(mock_queue) > 0:
_, last_status = mock_queue.get()
assert last_status.is_ui_ready


def test_structures_register_work_cloudcompute():
Expand Down
Loading

0 comments on commit 2a85d9b

Please sign in to comment.