From 4bafc5962f9109d179a942e211ab08da3f70d1c1 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Thu, 15 Dec 2022 15:34:17 +0900 Subject: [PATCH 01/34] wip clean up autoscaler ui --- examples/app_server_with_auto_scaler/app.py | 14 ++---- src/lightning_app/components/auto_scaler.py | 51 ++++++++++++++++++++- 2 files changed, 54 insertions(+), 11 deletions(-) diff --git a/examples/app_server_with_auto_scaler/app.py b/examples/app_server_with_auto_scaler/app.py index 70799827776a8..453db2424b404 100644 --- a/examples/app_server_with_auto_scaler/app.py +++ b/examples/app_server_with_auto_scaler/app.py @@ -1,5 +1,5 @@ # ! pip install torch torchvision -from typing import Any, List +from typing import List import torch import torchvision @@ -8,16 +8,12 @@ import lightning as L -class RequestModel(BaseModel): - image: str # bytecode - - class BatchRequestModel(BaseModel): - inputs: List[RequestModel] + inputs: List[L.app.components.Image] class BatchResponse(BaseModel): - outputs: List[Any] + outputs: List[L.app.components.Number] class PyTorchServer(L.app.components.PythonServer): @@ -81,8 +77,8 @@ def scale(self, replicas: int, metrics: dict) -> int: max_replicas=4, autoscale_interval=10, endpoint="predict", - input_type=RequestModel, - output_type=Any, + input_type=L.app.components.Image, + output_type=L.app.components.Number, timeout_batching=1, max_batch_size=8, ) diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py index 13948ba50af89..d68b4e04ea336 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/auto_scaler.py @@ -280,6 +280,8 @@ async def update_servers(servers: List[str], authenticated: bool = Depends(authe async def balance_api(inputs: self._input_type): return await self.process_request(inputs) + logger.info(f"Your load balancer has started. The endpoint is 'http://{self.host}:{self.port}{self.endpoint}'") + uvicorn.run( fastapi_app, host=self.host, @@ -332,6 +334,51 @@ def send_request_to_update_servers(self, servers: List[str]): response = requests.put(f"{self.url}/system/update-servers", json=servers, headers=headers, timeout=10) response.raise_for_status() + @staticmethod + def _get_sample_dict_from_datatype(datatype: Any) -> dict: + if hasattr(datatype, "_get_sample_data"): + return datatype._get_sample_data() + + datatype_props = datatype.schema()["properties"] + out: Dict[str, Any] = {} + for k, v in datatype_props.items(): + if v["type"] == "string": + out[k] = "data string" + elif v["type"] == "number": + out[k] = 0.0 + elif v["type"] == "integer": + out[k] = 0 + elif v["type"] == "boolean": + out[k] = False + else: + raise TypeError("Unsupported type") + return out + + def configure_layout(self) -> None: + try: + from lightning_api_access import APIAccessFrontend + except ModuleNotFoundError: + logger.warn("APIAccessFrontend not found. Please install lightning-api-access to enable the UI") + return + + try: + request = self._get_sample_dict_from_datatype(self._input_type) + response = self._get_sample_dict_from_datatype(self._output_type) + except (AttributeError, TypeError): + return + + return APIAccessFrontend( + apis=[ + { + "name": self.__class__.__name__, + "url": f"{self.url}{self.endpoint}", + "method": "POST", + "request": request, + "response": response, + } + ] + ) + class AutoScaler(LightningFlow): """The ``AutoScaler`` can be used to automatically change the number of replicas of the given server in @@ -574,5 +621,5 @@ def autoscale(self) -> None: self._last_autoscale = time.time() def configure_layout(self): - tabs = [{"name": "Swagger", "content": self.load_balancer.url}] - return tabs + layout = self.load_balancer.configure_layout() + return layout if layout else super().configure_layout() From 1cefa05920e3063ef3632e74fa6710ba4d27e3f2 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Thu, 15 Dec 2022 16:32:38 +0900 Subject: [PATCH 02/34] Revert "wip clean up autoscaler ui" This reverts commit 065db7e122a1cee1b4ff650cd31a7c8591e3e916. --- examples/app_server_with_auto_scaler/app.py | 14 ++++-- src/lightning_app/components/auto_scaler.py | 51 +-------------------- 2 files changed, 11 insertions(+), 54 deletions(-) diff --git a/examples/app_server_with_auto_scaler/app.py b/examples/app_server_with_auto_scaler/app.py index 453db2424b404..70799827776a8 100644 --- a/examples/app_server_with_auto_scaler/app.py +++ b/examples/app_server_with_auto_scaler/app.py @@ -1,5 +1,5 @@ # ! pip install torch torchvision -from typing import List +from typing import Any, List import torch import torchvision @@ -8,12 +8,16 @@ import lightning as L +class RequestModel(BaseModel): + image: str # bytecode + + class BatchRequestModel(BaseModel): - inputs: List[L.app.components.Image] + inputs: List[RequestModel] class BatchResponse(BaseModel): - outputs: List[L.app.components.Number] + outputs: List[Any] class PyTorchServer(L.app.components.PythonServer): @@ -77,8 +81,8 @@ def scale(self, replicas: int, metrics: dict) -> int: max_replicas=4, autoscale_interval=10, endpoint="predict", - input_type=L.app.components.Image, - output_type=L.app.components.Number, + input_type=RequestModel, + output_type=Any, timeout_batching=1, max_batch_size=8, ) diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py index d68b4e04ea336..13948ba50af89 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/auto_scaler.py @@ -280,8 +280,6 @@ async def update_servers(servers: List[str], authenticated: bool = Depends(authe async def balance_api(inputs: self._input_type): return await self.process_request(inputs) - logger.info(f"Your load balancer has started. The endpoint is 'http://{self.host}:{self.port}{self.endpoint}'") - uvicorn.run( fastapi_app, host=self.host, @@ -334,51 +332,6 @@ def send_request_to_update_servers(self, servers: List[str]): response = requests.put(f"{self.url}/system/update-servers", json=servers, headers=headers, timeout=10) response.raise_for_status() - @staticmethod - def _get_sample_dict_from_datatype(datatype: Any) -> dict: - if hasattr(datatype, "_get_sample_data"): - return datatype._get_sample_data() - - datatype_props = datatype.schema()["properties"] - out: Dict[str, Any] = {} - for k, v in datatype_props.items(): - if v["type"] == "string": - out[k] = "data string" - elif v["type"] == "number": - out[k] = 0.0 - elif v["type"] == "integer": - out[k] = 0 - elif v["type"] == "boolean": - out[k] = False - else: - raise TypeError("Unsupported type") - return out - - def configure_layout(self) -> None: - try: - from lightning_api_access import APIAccessFrontend - except ModuleNotFoundError: - logger.warn("APIAccessFrontend not found. Please install lightning-api-access to enable the UI") - return - - try: - request = self._get_sample_dict_from_datatype(self._input_type) - response = self._get_sample_dict_from_datatype(self._output_type) - except (AttributeError, TypeError): - return - - return APIAccessFrontend( - apis=[ - { - "name": self.__class__.__name__, - "url": f"{self.url}{self.endpoint}", - "method": "POST", - "request": request, - "response": response, - } - ] - ) - class AutoScaler(LightningFlow): """The ``AutoScaler`` can be used to automatically change the number of replicas of the given server in @@ -621,5 +574,5 @@ def autoscale(self) -> None: self._last_autoscale = time.time() def configure_layout(self): - layout = self.load_balancer.configure_layout() - return layout if layout else super().configure_layout() + tabs = [{"name": "Swagger", "content": self.load_balancer.url}] + return tabs From 1e69092882fba2a0869660cc13663310820adf32 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Thu, 15 Dec 2022 16:38:13 +0900 Subject: [PATCH 03/34] Apply sherin's suggestion --- src/lightning_app/components/auto_scaler.py | 106 +++++++++++++++++--- 1 file changed, 91 insertions(+), 15 deletions(-) diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py index 13948ba50af89..a02854595bd81 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/auto_scaler.py @@ -6,7 +6,7 @@ import uuid from base64 import b64encode from itertools import cycle -from typing import Any, Dict, List, Tuple, Type +from typing import Any, Dict, List, Optional, Tuple, Type import requests import uvicorn @@ -15,13 +15,14 @@ from fastapi.responses import RedirectResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel +from starlette.staticfiles import StaticFiles from starlette.status import HTTP_401_UNAUTHORIZED -from lightning_app.core.flow import LightningFlow -from lightning_app.core.work import LightningWork -from lightning_app.utilities.app_helpers import Logger -from lightning_app.utilities.imports import _is_aiohttp_available, requires -from lightning_app.utilities.packaging.cloud_compute import CloudCompute +from lightning.app.core.flow import LightningFlow +from lightning.app.core.work import LightningWork +from lightning.app.utilities.app_helpers import Logger +from lightning.app.utilities.imports import _is_aiohttp_available, requires +from lightning.app.utilities.packaging.cloud_compute import CloudCompute if _is_aiohttp_available(): import aiohttp @@ -120,12 +121,12 @@ class _LoadBalancer(LightningWork): @requires(["aiohttp"]) def __init__( self, - input_type: BaseModel, - output_type: BaseModel, + input_type: Type[BaseModel], + output_type: Type[BaseModel], endpoint: str, max_batch_size: int = 8, # all timeout args are in seconds - timeout_batching: int = 1, + timeout_batching: float = 1, timeout_keep_alive: int = 60, timeout_inference_request: int = 60, **kwargs: Any, @@ -280,6 +281,12 @@ async def update_servers(servers: List[str], authenticated: bool = Depends(authe async def balance_api(inputs: self._input_type): return await self.process_request(inputs) + endpoint_info_page = self._get_endpoint_info_page() + if endpoint_info_page: + fastapi_app.mount( + "/endpoint-info", StaticFiles(directory=endpoint_info_page.serve_dir, html=True), name="static" + ) + uvicorn.run( fastapi_app, host=self.host, @@ -332,6 +339,70 @@ def send_request_to_update_servers(self, servers: List[str]): response = requests.put(f"{self.url}/system/update-servers", json=servers, headers=headers, timeout=10) response.raise_for_status() + @staticmethod + def _get_sample_dict_from_datatype(datatype: Any) -> dict: + if hasattr(datatype, "_get_sample_data"): + return datatype._get_sample_data() + + datatype_props = datatype.schema()["properties"] + out: Dict[str, Any] = {} + for k, v in datatype_props.items(): + if v["type"] == "string": + out[k] = "data string" + elif v["type"] == "number": + out[k] = 0.0 + elif v["type"] == "integer": + out[k] = 0 + elif v["type"] == "boolean": + out[k] = False + else: + raise TypeError("Unsupported type") + return out + + def get_code_sample(self, url: str) -> Optional[str]: + input_type: Any = self._input_type + output_type: Any = self._output_type + + if not (hasattr(input_type, "request_code_sample") and hasattr(output_type, "response_code_sample")): + return None + return f"{input_type.request_code_sample(url)}\n{output_type.response_code_sample()}" + + def _get_endpoint_info_page(self) -> Optional["APIAccessFrontend"]: # noqa: F821 + try: + from lightning_api_access import APIAccessFrontend + except ModuleNotFoundError: + logger.warn("APIAccessFrontend not found. Please install lightning-api-access to enable the UI") + return + + # TODO - change this name and url path + class_name = "Loadbalanced" + # TODO - make it work for local host too + url = f"{self._future_url}/predict" + + # TODO - sample data below cannot be None + + try: + request = self._get_sample_dict_from_datatype(self._input_type) + except TypeError: + request = None + try: + response = self._get_sample_dict_from_datatype(self._output_type) + except TypeError: + response = None + + frontend_objects = { + "name": class_name, + "url": url, + "method": "POST", + "request": request, + "response": response, + } + code_samples = self.get_code_sample(url) + if code_samples: + frontend_objects["code_sample"] = self.get_code_sample(url) + + return APIAccessFrontend(apis=[frontend_objects]) + class AutoScaler(LightningFlow): """The ``AutoScaler`` can be used to automatically change the number of replicas of the given server in @@ -403,8 +474,8 @@ def __init__( max_batch_size: int = 8, timeout_batching: float = 1, endpoint: str = "api/predict", - input_type: BaseModel = Dict, - output_type: BaseModel = Dict, + input_type: Type[BaseModel] = Dict, + output_type: Type[BaseModel] = Dict, *work_args: Any, **work_kwargs: Any, ) -> None: @@ -511,9 +582,11 @@ def scale(self, replicas: int, metrics: dict) -> int: The target number of running works. The value will be adjusted after this method runs so that it satisfies ``min_replicas<=replicas<=max_replicas``. """ - pending_requests_per_running_or_pending_work = metrics["pending_requests"] / ( - replicas + metrics["pending_works"] - ) + pending_requests_per_running_or_pending_work = 0 + if replicas: + pending_requests_per_running_or_pending_work = metrics["pending_requests"] / ( + replicas + metrics["pending_works"] + ) # scale out if the number of pending requests exceeds max batch size. max_requests_per_work = self.max_batch_size @@ -574,5 +647,8 @@ def autoscale(self) -> None: self._last_autoscale = time.time() def configure_layout(self): - tabs = [{"name": "Swagger", "content": self.load_balancer.url}] + tabs = [ + {"name": "Endpoint Info", "content": f"{self.load_balancer}/endpoint-info"}, + {"name": "Swagger", "content": self.load_balancer.url}, + ] return tabs From f9406ccb695d71d38b1ff478de3dab7a6ab62eef Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Thu, 15 Dec 2022 16:48:11 +0900 Subject: [PATCH 04/34] update example --- examples/app_server_with_auto_scaler/app.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/examples/app_server_with_auto_scaler/app.py b/examples/app_server_with_auto_scaler/app.py index 70799827776a8..453db2424b404 100644 --- a/examples/app_server_with_auto_scaler/app.py +++ b/examples/app_server_with_auto_scaler/app.py @@ -1,5 +1,5 @@ # ! pip install torch torchvision -from typing import Any, List +from typing import List import torch import torchvision @@ -8,16 +8,12 @@ import lightning as L -class RequestModel(BaseModel): - image: str # bytecode - - class BatchRequestModel(BaseModel): - inputs: List[RequestModel] + inputs: List[L.app.components.Image] class BatchResponse(BaseModel): - outputs: List[Any] + outputs: List[L.app.components.Number] class PyTorchServer(L.app.components.PythonServer): @@ -81,8 +77,8 @@ def scale(self, replicas: int, metrics: dict) -> int: max_replicas=4, autoscale_interval=10, endpoint="predict", - input_type=RequestModel, - output_type=Any, + input_type=L.app.components.Image, + output_type=L.app.components.Number, timeout_batching=1, max_batch_size=8, ) From 694627fd2723624f77f032808a0675f52661b48d Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Thu, 15 Dec 2022 16:48:27 +0900 Subject: [PATCH 05/34] print endpoint in the log --- src/lightning_app/components/auto_scaler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py index a02854595bd81..0ae379e2d36e9 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/auto_scaler.py @@ -287,6 +287,8 @@ async def balance_api(inputs: self._input_type): "/endpoint-info", StaticFiles(directory=endpoint_info_page.serve_dir, html=True), name="static" ) + logger.info(f"Your load balancer has started. The endpoint is 'http://{self.host}:{self.port}{self.endpoint}'") + uvicorn.run( fastapi_app, host=self.host, From 96b77ea9f11b7add21f9400297929905b22b0c79 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Thu, 15 Dec 2022 16:49:07 +0900 Subject: [PATCH 06/34] Fix import --- src/lightning_app/components/auto_scaler.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py index 0ae379e2d36e9..30258ef8a4efa 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/auto_scaler.py @@ -18,11 +18,11 @@ from starlette.staticfiles import StaticFiles from starlette.status import HTTP_401_UNAUTHORIZED -from lightning.app.core.flow import LightningFlow -from lightning.app.core.work import LightningWork -from lightning.app.utilities.app_helpers import Logger -from lightning.app.utilities.imports import _is_aiohttp_available, requires -from lightning.app.utilities.packaging.cloud_compute import CloudCompute +from lightning_app.core.flow import LightningFlow +from lightning_app.core.work import LightningWork +from lightning_app.utilities.app_helpers import Logger +from lightning_app.utilities.imports import _is_aiohttp_available, requires +from lightning_app.utilities.packaging.cloud_compute import CloudCompute if _is_aiohttp_available(): import aiohttp From 44cbec29492431310b348c6d4438d268979b8ac0 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Thu, 15 Dec 2022 22:55:41 +0900 Subject: [PATCH 07/34] revert irrelevant change --- src/lightning_app/components/auto_scaler.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py index 30258ef8a4efa..4792ddd27fefb 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/auto_scaler.py @@ -584,11 +584,9 @@ def scale(self, replicas: int, metrics: dict) -> int: The target number of running works. The value will be adjusted after this method runs so that it satisfies ``min_replicas<=replicas<=max_replicas``. """ - pending_requests_per_running_or_pending_work = 0 - if replicas: - pending_requests_per_running_or_pending_work = metrics["pending_requests"] / ( - replicas + metrics["pending_works"] - ) + pending_requests_per_running_or_pending_work = metrics["pending_requests"] / ( + replicas + metrics["pending_works"] + ) # scale out if the number of pending requests exceeds max batch size. max_requests_per_work = self.max_batch_size From 82fef89236e61d16cbe115f2e725cb99e036d769 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 15:35:41 +0000 Subject: [PATCH 08/34] Update src/lightning_app/components/auto_scaler.py Co-authored-by: Jirka Borovec <6035284+Borda@users.noreply.github.com> --- src/lightning_app/components/auto_scaler.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py index 4792ddd27fefb..01e0bbb144f56 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/auto_scaler.py @@ -348,17 +348,11 @@ def _get_sample_dict_from_datatype(datatype: Any) -> dict: datatype_props = datatype.schema()["properties"] out: Dict[str, Any] = {} + lut = {"string": "data string", "number": 0.0, "integer": 0, "boolean": False} for k, v in datatype_props.items(): - if v["type"] == "string": - out[k] = "data string" - elif v["type"] == "number": - out[k] = 0.0 - elif v["type"] == "integer": - out[k] = 0 - elif v["type"] == "boolean": - out[k] = False - else: + if v["type"] not in lut: raise TypeError("Unsupported type") + out[k] = lut[v["type"]] return out def get_code_sample(self, url: str) -> Optional[str]: From d8f47781b676a234ccb3ae5563981d628571860c Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 15:49:17 +0000 Subject: [PATCH 09/34] clean up --- src/lightning_app/components/__init__.py | 3 +- .../components/serve/__init__.py | 4 +- .../components/{ => serve}/auto_scaler.py | 47 +++++++++---------- 3 files changed, 25 insertions(+), 29 deletions(-) rename src/lightning_app/components/{ => serve}/auto_scaler.py (95%) diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index ca47c36071dae..95b2a4c6b9eec 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -1,4 +1,3 @@ -from lightning_app.components.auto_scaler import AutoScaler from lightning_app.components.database.client import DatabaseClient from lightning_app.components.database.server import Database from lightning_app.components.multi_node import ( @@ -10,7 +9,7 @@ from lightning_app.components.python.popen import PopenPythonScript from lightning_app.components.python.tracer import Code, TracerPythonScript from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import Image, Number, PythonServer +from lightning_app.components.serve.python_server import Image, Number, PythonServer, AutoScaler from lightning_app.components.serve.serve import ModelInferenceAPI from lightning_app.components.serve.streamlit import ServeStreamlit from lightning_app.components.training import LightningTrainerScript, PyTorchLightningScriptRunner diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index cb46a71bf9ea5..fa984bc3f9b68 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,5 +1,5 @@ from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import Image, Number, PythonServer +from lightning_app.components.serve.python_server import Image, Number, PythonServer, AutoScaler from lightning_app.components.serve.streamlit import ServeStreamlit -__all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number"] +__all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "AutoScaler"] diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py similarity index 95% rename from src/lightning_app/components/auto_scaler.py rename to src/lightning_app/components/serve/auto_scaler.py index 01e0bbb144f56..70efd4f6352c4 100644 --- a/src/lightning_app/components/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -23,6 +23,8 @@ from lightning_app.utilities.app_helpers import Logger from lightning_app.utilities.imports import _is_aiohttp_available, requires from lightning_app.utilities.packaging.cloud_compute import CloudCompute +from lightning_app.utilities.cloud import is_running_in_cloud + if _is_aiohttp_available(): import aiohttp @@ -115,7 +117,7 @@ class _LoadBalancer(LightningWork): requests to be batched. In any case, requests are processed as soon as `max_batch_size` is reached. timeout_keep_alive: The number of seconds until it closes Keep-Alive connections if no new data is received. timeout_inference_request: The number of seconds to wait for inference. - \**kwargs: Arguments passed to :func:`LightningWork.init` like ``CloudCompute``, ``BuildConfig``, etc. + **kwargs: Arguments passed to :func:`LightningWork.init` like ``CloudCompute``, ``BuildConfig``, etc. """ @requires(["aiohttp"]) @@ -129,6 +131,7 @@ def __init__( timeout_batching: float = 1, timeout_keep_alive: int = 60, timeout_inference_request: int = 60, + work_name: Optional[str] = "API", # used for displaying the name in the UI **kwargs: Any, ) -> None: super().__init__(cloud_compute=CloudCompute("default"), **kwargs) @@ -143,6 +146,7 @@ def __init__( self._batch = [] self._responses = {} # {request_id: response} self._last_batch_sent = 0 + self._work_name = work_name if not endpoint.startswith("/"): endpoint = "/" + endpoint @@ -370,33 +374,25 @@ def _get_endpoint_info_page(self) -> Optional["APIAccessFrontend"]: # noqa: F82 logger.warn("APIAccessFrontend not found. Please install lightning-api-access to enable the UI") return - # TODO - change this name and url path - class_name = "Loadbalanced" - # TODO - make it work for local host too - url = f"{self._future_url}/predict" - - # TODO - sample data below cannot be None + if is_running_in_cloud(): + url = f"{self._future_url}{self.endpoint}" + else: + url = f"http://localhost:{self.port}{self.endpoint}" - try: - request = self._get_sample_dict_from_datatype(self._input_type) - except TypeError: - request = None - try: - response = self._get_sample_dict_from_datatype(self._output_type) - except TypeError: - response = None - - frontend_objects = { - "name": class_name, - "url": url, - "method": "POST", - "request": request, - "response": response, - } + frontend_objects = {"name": self._work_name, "url": url, "method": "POST", "request": None, "response": None} code_samples = self.get_code_sample(url) if code_samples: - frontend_objects["code_sample"] = self.get_code_sample(url) - + frontend_objects["code_samples"] = code_samples + # TODO also set request/response for JS UI + else: + try: + request = self._get_sample_dict_from_datatype(self._input_type) + response = self._get_sample_dict_from_datatype(self._output_type) + except TypeError: + return None + else: + frontend_objects["request"] = request + frontend_objects["response"] = response return APIAccessFrontend(apis=[frontend_objects]) @@ -505,6 +501,7 @@ def __init__( timeout_batching=timeout_batching, cache_calls=True, parallel=True, + work_name=self._work_cls.__name__ ) for _ in range(min_replicas): work = self.create_work() From c7443b677187fc7c22824af304a0165b307e4a65 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Dec 2022 15:50:39 +0000 Subject: [PATCH 10/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning_app/components/__init__.py | 2 +- src/lightning_app/components/serve/__init__.py | 2 +- src/lightning_app/components/serve/auto_scaler.py | 5 ++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index 95b2a4c6b9eec..73afc2a52113b 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -9,7 +9,7 @@ from lightning_app.components.python.popen import PopenPythonScript from lightning_app.components.python.tracer import Code, TracerPythonScript from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import Image, Number, PythonServer, AutoScaler +from lightning_app.components.serve.python_server import AutoScaler, Image, Number, PythonServer from lightning_app.components.serve.serve import ModelInferenceAPI from lightning_app.components.serve.streamlit import ServeStreamlit from lightning_app.components.training import LightningTrainerScript, PyTorchLightningScriptRunner diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index fa984bc3f9b68..acce6a2b27e23 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,5 +1,5 @@ from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import Image, Number, PythonServer, AutoScaler +from lightning_app.components.serve.python_server import AutoScaler, Image, Number, PythonServer from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "AutoScaler"] diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 70efd4f6352c4..1fce191d7603f 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -21,10 +21,9 @@ from lightning_app.core.flow import LightningFlow from lightning_app.core.work import LightningWork from lightning_app.utilities.app_helpers import Logger +from lightning_app.utilities.cloud import is_running_in_cloud from lightning_app.utilities.imports import _is_aiohttp_available, requires from lightning_app.utilities.packaging.cloud_compute import CloudCompute -from lightning_app.utilities.cloud import is_running_in_cloud - if _is_aiohttp_available(): import aiohttp @@ -501,7 +500,7 @@ def __init__( timeout_batching=timeout_batching, cache_calls=True, parallel=True, - work_name=self._work_cls.__name__ + work_name=self._work_cls.__name__, ) for _ in range(min_replicas): work = self.create_work() From 26f5f4bd62024bafab0d4812a74539f322a49237 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 16:03:59 +0000 Subject: [PATCH 11/34] test rename --- tests/tests_app/components/{ => serve}/test_auto_scaler.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/tests_app/components/{ => serve}/test_auto_scaler.py (100%) diff --git a/tests/tests_app/components/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py similarity index 100% rename from tests/tests_app/components/test_auto_scaler.py rename to tests/tests_app/components/serve/test_auto_scaler.py From 4f3365c25e40309abb493d8c920aa27866802884 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 16:07:03 +0000 Subject: [PATCH 12/34] Changelog --- src/lightning_app/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index 132706e539837..e15e4791b067a 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -15,6 +15,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added partial support for fastapi `Request` annotation in `configure_api` handlers ([#16047](https://github.com/Lightning-AI/lightning/pull/16047)) +- Added a nicer UI with URL and examples for the autoscaler component ([#16063](https://github.com/Lightning-AI/lightning/pull/16063)) + ### Changed From 9e0cb747b1e22d3f24071d5a50862ca148dfaa4f Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 16:18:32 +0000 Subject: [PATCH 13/34] adding up/down scale interval arguments --- .../components/serve/auto_scaler.py | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 1fce191d7603f..f3cbcc4f4d29f 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -403,7 +403,8 @@ class AutoScaler(LightningFlow): Args: min_replicas: The number of works to start when app initializes. max_replicas: The max number of works to spawn to handle the incoming requests. - autoscale_interval: The number of seconds to wait before checking whether to upscale or downscale the works. + autoscale_up_interval: The number of seconds to wait before checking whether to upscale the server + autoscale_down_interval: The number of seconds to wait before checking whether to downscale the server endpoint: Provide the REST API path. max_batch_size: (auto-batching) The number of requests to process at once. timeout_batching: (auto-batching) The number of seconds to wait before sending the requests to process. @@ -461,7 +462,8 @@ def __init__( work_cls: Type[LightningWork], min_replicas: int = 1, max_replicas: int = 4, - autoscale_interval: int = 10, + autoscale_up_interval: int = 10, + autoscale_down_interval: int = 10, max_batch_size: int = 8, timeout_batching: float = 1, endpoint: str = "api/predict", @@ -480,7 +482,8 @@ def __init__( self._input_type = input_type self._output_type = output_type - self.autoscale_interval = autoscale_interval + self.autoscale_up_interval = autoscale_up_interval + self.autoscale_down_interval = autoscale_down_interval self.max_batch_size = max_batch_size if max_replicas < min_replicas: @@ -602,11 +605,6 @@ def num_pending_works(self) -> int: def autoscale(self) -> None: """Adjust the number of works based on the target number returned by ``self.scale``.""" - if time.time() - self._last_autoscale < self.autoscale_interval: - return - - self.load_balancer.update_servers(self.workers) - metrics = { "pending_requests": self.num_pending_requests, "pending_works": self.num_pending_works, @@ -619,19 +617,21 @@ def autoscale(self) -> None: ) # upscale - num_workers_to_add = num_target_workers - self.num_replicas - for _ in range(num_workers_to_add): - logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}") - work = self.create_work() - new_work_id = self.add_work(work) - logger.info(f"Work created: '{new_work_id}'") + if time.time() - self._last_autoscale > self.autoscale_up_interval: + num_workers_to_add = num_target_workers - self.num_replicas + for _ in range(num_workers_to_add): + logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}") + work = self.create_work() + new_work_id = self.add_work(work) + logger.info(f"Work created: '{new_work_id}'") # downscale - num_workers_to_remove = self.num_replicas - num_target_workers - for _ in range(num_workers_to_remove): - logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}") - removed_work_id = self.remove_work(self.num_replicas - 1) - logger.info(f"Work removed: '{removed_work_id}'") + if time.time() - self._last_autoscale > self.autoscale_down_interval: + num_workers_to_remove = self.num_replicas - num_target_workers + for _ in range(num_workers_to_remove): + logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}") + removed_work_id = self.remove_work(self.num_replicas - 1) + logger.info(f"Work removed: '{removed_work_id}'") self.load_balancer.update_servers(self.workers) self._last_autoscale = time.time() From debb133f1b21906d4ddfd1f7d9eeadb1f674031f Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 16:20:25 +0000 Subject: [PATCH 14/34] changelog --- src/lightning_app/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index e15e4791b067a..6e53d0230b8a6 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -17,6 +17,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added a nicer UI with URL and examples for the autoscaler component ([#16063](https://github.com/Lightning-AI/lightning/pull/16063)) +- Enabled users to have more control over scaling up/down interval ([#16093](https://github.com/Lightning-AI/lightning/pull/16093)) + ### Changed From bdeb1f54bde2f08f317df0fe3b20c6bf5db283c7 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 17:19:00 +0000 Subject: [PATCH 15/34] Update src/lightning_app/components/serve/__init__.py --- src/lightning_app/components/serve/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index 6a125ac3fda04..dbeb7f8372766 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,4 +1,5 @@ from lightning_app.components.serve.python_server import Category, Image, Number, Text, PythonServer, AutoScaler +from lightning_app.components.serve.gradio import ServeGradio from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "Category", "Text", "AutoScaler"] From 3d3b5929eca0fbdf360fa5d9a6923b579690b160 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Dec 2022 17:21:49 +0000 Subject: [PATCH 16/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning_app/components/__init__.py | 2 +- src/lightning_app/components/serve/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index 9c94e7dfe009f..8182fa4f57826 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -9,7 +9,7 @@ from lightning_app.components.python.popen import PopenPythonScript from lightning_app.components.python.tracer import Code, TracerPythonScript from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import Category, Image, Number, Text, PythonServer, AutoScaler +from lightning_app.components.serve.python_server import AutoScaler, Category, Image, Number, PythonServer, Text from lightning_app.components.serve.serve import ModelInferenceAPI from lightning_app.components.serve.streamlit import ServeStreamlit from lightning_app.components.training import LightningTrainerScript, PyTorchLightningScriptRunner diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index dbeb7f8372766..7ac1d79b05659 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,5 +1,5 @@ -from lightning_app.components.serve.python_server import Category, Image, Number, Text, PythonServer, AutoScaler from lightning_app.components.serve.gradio import ServeGradio +from lightning_app.components.serve.python_server import AutoScaler, Category, Image, Number, PythonServer, Text from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "Category", "Text", "AutoScaler"] From c38768a9ba80304262bfa43a6bafcb7969743a0c Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Fri, 16 Dec 2022 17:51:31 +0000 Subject: [PATCH 17/34] test --- src/lightning_app/components/__init__.py | 3 ++- src/lightning_app/components/serve/__init__.py | 3 ++- .../components/serve/auto_scaler.py | 4 ++++ .../components/serve/test_auto_scaler.py | 18 +++++++++++++++++- 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index 8182fa4f57826..801365c6acc63 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -9,7 +9,8 @@ from lightning_app.components.python.popen import PopenPythonScript from lightning_app.components.python.tracer import Code, TracerPythonScript from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import AutoScaler, Category, Image, Number, PythonServer, Text +from lightning_app.components.serve.python_server import Category, Image, Number, PythonServer, Text +from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.serve import ModelInferenceAPI from lightning_app.components.serve.streamlit import ServeStreamlit from lightning_app.components.training import LightningTrainerScript, PyTorchLightningScriptRunner diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index 7ac1d79b05659..34785f820c2eb 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,5 +1,6 @@ from lightning_app.components.serve.gradio import ServeGradio -from lightning_app.components.serve.python_server import AutoScaler, Category, Image, Number, PythonServer, Text +from lightning_app.components.serve.python_server import Category, Image, Number, PythonServer, Text +from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "Category", "Text", "AutoScaler"] diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 1fce191d7603f..6027249de850f 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -346,6 +346,10 @@ def send_request_to_update_servers(self, servers: List[str]): @staticmethod def _get_sample_dict_from_datatype(datatype: Any) -> dict: + if not hasattr(datatype, "schema"): + # not a pydantic model + raise TypeError(f"datatype must be a pydantic model, for the UI to be generated. but got {datatype}") + if hasattr(datatype, "_get_sample_data"): return datatype._get_sample_data() diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index 672b05bbc9a15..2e6fa84d6ccc6 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -1,10 +1,11 @@ import time +from unittest import mock from unittest.mock import patch import pytest from lightning_app import CloudCompute, LightningWork -from lightning_app.components import AutoScaler +from lightning_app.components import AutoScaler, Text class EmptyWork(LightningWork): @@ -98,3 +99,18 @@ def test_create_work_cloud_compute_cloned(): auto_scaler = AutoScaler(EmptyWork, cloud_compute=cloud_compute) _ = auto_scaler.create_work() assert auto_scaler._work_kwargs["cloud_compute"] is not cloud_compute + + +fastapi_mock = mock.MagicMock() +mocked_fastapi_creater = mock.MagicMock(return_value=fastapi_mock) + + +@patch("lightning_app.components.serve.auto_scaler._create_fastapi", mocked_fastapi_creater) +@patch("lightning_app.components.serve.auto_scaler.uvicorn.run", mock.MagicMock()) +def test_API_ACCESS_ENDPOINT_creation(): + auto_scaler = AutoScaler(EmptyWork, input_type=Text, output_type=Text) + assert auto_scaler.load_balancer._work_name == "EmptyWork" + + auto_scaler.load_balancer.run() + fastapi_mock.mount.assert_called_once_with("/endpoint-info", mock.ANY, name="static") + From 98ba462dfa78c5ece64a6842d827327e86ad99eb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Dec 2022 17:53:03 +0000 Subject: [PATCH 18/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning_app/components/__init__.py | 2 +- src/lightning_app/components/serve/__init__.py | 2 +- tests/tests_app/components/serve/test_auto_scaler.py | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index 801365c6acc63..5fd8af6b055de 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -8,9 +8,9 @@ ) from lightning_app.components.python.popen import PopenPythonScript from lightning_app.components.python.tracer import Code, TracerPythonScript +from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.gradio import ServeGradio from lightning_app.components.serve.python_server import Category, Image, Number, PythonServer, Text -from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.serve import ModelInferenceAPI from lightning_app.components.serve.streamlit import ServeStreamlit from lightning_app.components.training import LightningTrainerScript, PyTorchLightningScriptRunner diff --git a/src/lightning_app/components/serve/__init__.py b/src/lightning_app/components/serve/__init__.py index 34785f820c2eb..ac02e69c4f2ab 100644 --- a/src/lightning_app/components/serve/__init__.py +++ b/src/lightning_app/components/serve/__init__.py @@ -1,6 +1,6 @@ +from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.gradio import ServeGradio from lightning_app.components.serve.python_server import Category, Image, Number, PythonServer, Text -from lightning_app.components.serve.auto_scaler import AutoScaler from lightning_app.components.serve.streamlit import ServeStreamlit __all__ = ["ServeGradio", "ServeStreamlit", "PythonServer", "Image", "Number", "Category", "Text", "AutoScaler"] diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index 2e6fa84d6ccc6..d1adb1c5aacd2 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -113,4 +113,3 @@ def test_API_ACCESS_ENDPOINT_creation(): auto_scaler.load_balancer.run() fastapi_mock.mount.assert_called_once_with("/endpoint-info", mock.ANY, name="static") - From 2520758957e894148bd83e25577a8f1fd1046649 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 07:26:08 +0000 Subject: [PATCH 19/34] fix imports --- src/lightning_app/components/serve/auto_scaler.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 6027249de850f..36ae720ae8943 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -185,15 +185,13 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): async def consumer(self): while True: await asyncio.sleep(0.05) - batch = self._batch[: self.max_batch_size] - while batch and ( - (len(batch) == self.max_batch_size) or ((time.time() - self._last_batch_sent) > self.timeout_batching) - ): + is_batch_ready = len(batch) == self.max_batch_size + is_batch_timeout = time.time() - self._last_batch_sent > self.timeout_batching + if batch and (is_batch_ready or is_batch_timeout): asyncio.create_task(self.send_batch(batch)) - - self._batch = self._batch[self.max_batch_size :] - batch = self._batch[: self.max_batch_size] + # resetting the batch array, TODO - not locking the array + self._batch = self._batch[len(batch) :] self._last_batch_sent = time.time() async def process_request(self, data: BaseModel): From eb87ee805dd91fd2cc665a66c59c0d4e36ee598e Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 07:34:25 +0000 Subject: [PATCH 20/34] mypy --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8611ef9323deb..4461d956634c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,8 +79,8 @@ module = [ "lightning_app.components.serve.types.image", "lightning_app.components.serve.types.type", "lightning_app.components.serve.python_server", + "lightning_app.components.serve.auto_scaler", "lightning_app.components.training", - "lightning_app.components.auto_scaler", "lightning_app.core.api", "lightning_app.core.app", "lightning_app.core.flow", From bbceaef21b374cba23a0274fad204f9352d7a850 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 07:37:14 +0000 Subject: [PATCH 21/34] revert --- src/lightning_app/components/serve/auto_scaler.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 36ae720ae8943..6027249de850f 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -185,13 +185,15 @@ async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): async def consumer(self): while True: await asyncio.sleep(0.05) + batch = self._batch[: self.max_batch_size] - is_batch_ready = len(batch) == self.max_batch_size - is_batch_timeout = time.time() - self._last_batch_sent > self.timeout_batching - if batch and (is_batch_ready or is_batch_timeout): + while batch and ( + (len(batch) == self.max_batch_size) or ((time.time() - self._last_batch_sent) > self.timeout_batching) + ): asyncio.create_task(self.send_batch(batch)) - # resetting the batch array, TODO - not locking the array - self._batch = self._batch[len(batch) :] + + self._batch = self._batch[self.max_batch_size :] + batch = self._batch[: self.max_batch_size] self._last_batch_sent = time.time() async def process_request(self, data: BaseModel): From 8b65680216c00c2fea877022f76be8aad01b4495 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 08:13:34 +0000 Subject: [PATCH 22/34] testfix --- tests/tests_app/components/serve/test_auto_scaler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index d1adb1c5aacd2..6bd5aa958b6bf 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -33,8 +33,8 @@ def test_num_replicas_after_init(): @patch("uvicorn.run") -@patch("lightning_app.components.auto_scaler._LoadBalancer.url") -@patch("lightning_app.components.auto_scaler.AutoScaler.num_pending_requests") +@patch("lightning_app.components.serve.auto_scaler._LoadBalancer.url") +@patch("lightning_app.components.serve.auto_scaler.AutoScaler.num_pending_requests") def test_num_replicas_not_above_max_replicas(*_): """Test self.num_replicas doesn't exceed max_replicas.""" max_replicas = 6 @@ -53,8 +53,8 @@ def test_num_replicas_not_above_max_replicas(*_): @patch("uvicorn.run") -@patch("lightning_app.components.auto_scaler._LoadBalancer.url") -@patch("lightning_app.components.auto_scaler.AutoScaler.num_pending_requests") +@patch("lightning_app.components.serve.auto_scaler._LoadBalancer.url") +@patch("lightning_app.components.serve.auto_scaler.AutoScaler.num_pending_requests") def test_num_replicas_not_belo_min_replicas(*_): """Test self.num_replicas doesn't exceed max_replicas.""" min_replicas = 1 From a5ffaae684a9de7ebf904ad3aa3b9318a054fb4e Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 08:50:28 +0000 Subject: [PATCH 23/34] docs fix --- docs/source-app/api_references.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source-app/api_references.rst b/docs/source-app/api_references.rst index 2272f7bf13c41..931a9864d261f 100644 --- a/docs/source-app/api_references.rst +++ b/docs/source-app/api_references.rst @@ -45,7 +45,7 @@ ___________________ ~multi_node.lite.LiteMultiNode ~multi_node.pytorch_spawn.PyTorchSpawnMultiNode ~multi_node.trainer.LightningTrainerMultiNode - ~auto_scaler.AutoScaler + ~serve.auto_scaler.AutoScaler ---- From cfb02a336c8936b47bd322ebf376f13a56e4327f Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 10:52:39 +0000 Subject: [PATCH 24/34] Update src/lightning_app/components/serve/auto_scaler.py Co-authored-by: Akihiro Nitta --- src/lightning_app/components/serve/auto_scaler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index d3679efabf811..225dc3aaaec46 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -407,8 +407,8 @@ class AutoScaler(LightningFlow): Args: min_replicas: The number of works to start when app initializes. max_replicas: The max number of works to spawn to handle the incoming requests. - autoscale_up_interval: The number of seconds to wait before checking whether to upscale the server - autoscale_down_interval: The number of seconds to wait before checking whether to downscale the server + scale_out_interval: The number of seconds to wait before checking whether to upscale the server + scale_in_interval: The number of seconds to wait before checking whether to downscale the server endpoint: Provide the REST API path. max_batch_size: (auto-batching) The number of requests to process at once. timeout_batching: (auto-batching) The number of seconds to wait before sending the requests to process. From 7f4848761583e5672c25639e5f0ec89bfd92fb85 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 11:15:22 +0000 Subject: [PATCH 25/34] arg change\ --- src/lightning_app/components/serve/auto_scaler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 0dcc7594918b6..8b7207777f9f8 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -621,7 +621,7 @@ def autoscale(self) -> None: ) # upscale - if time.time() - self._last_autoscale > self.autoscale_up_interval: + if time.time() - self._last_autoscale > self.scale_out_interval: num_workers_to_add = num_target_workers - self.num_replicas for _ in range(num_workers_to_add): logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}") @@ -630,7 +630,7 @@ def autoscale(self) -> None: logger.info(f"Work created: '{new_work_id}'") # downscale - if time.time() - self._last_autoscale > self.autoscale_down_interval: + if time.time() - self._last_autoscale > self.scale_in_interval: num_workers_to_remove = self.num_replicas - num_target_workers for _ in range(num_workers_to_remove): logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}") From 2ca55d38ada4abafbaeb850342e0bb527f96e622 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 11:49:53 +0000 Subject: [PATCH 26/34] tests --- .../components/serve/test_auto_scaler.py | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index 6bd5aa958b6bf..706ec472b9f16 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -113,3 +113,43 @@ def test_API_ACCESS_ENDPOINT_creation(): auto_scaler.load_balancer.run() fastapi_mock.mount.assert_called_once_with("/endpoint-info", mock.ANY, name="static") + + +def test_autoscaler_scale_up(monkeypatch): + monkeypatch.setattr(AutoScaler, "num_pending_works", 0) + monkeypatch.setattr(AutoScaler, "num_pending_requests", 100) + monkeypatch.setattr(AutoScaler, "scale", mock.MagicMock(return_value=1)) + monkeypatch.setattr(AutoScaler, "create_work", mock.MagicMock()) + monkeypatch.setattr(AutoScaler, "add_work", mock.MagicMock()) + + auto_scaler = AutoScaler(EmptyWork, min_replicas=0, max_replicas=4, scale_out_interval=0.001) + + # Mocking the attributes + auto_scaler._last_autoscale = time.time() - 100000 + auto_scaler.num_replicas = 0 + + # triggering scale up + auto_scaler.autoscale() + auto_scaler.scale.assert_called_once() + auto_scaler.create_work.assert_called_once() + auto_scaler.add_work.assert_called_once() + + +def test_autoscaler_scale_down(monkeypatch): + monkeypatch.setattr(AutoScaler, "num_pending_works", 0) + monkeypatch.setattr(AutoScaler, "num_pending_requests", 0) + monkeypatch.setattr(AutoScaler, "scale", mock.MagicMock(return_value=0)) + monkeypatch.setattr(AutoScaler, "remove_work", mock.MagicMock()) + monkeypatch.setattr(AutoScaler, "workers", mock.MagicMock()) + + auto_scaler = AutoScaler(EmptyWork, min_replicas=0, max_replicas=4, scale_in_interval=0.001) + + # Mocking the attributes + auto_scaler._last_autoscale = time.time() - 100000 + auto_scaler.num_replicas = 1 + auto_scaler.__dict__["load_balancer"] = mock.MagicMock() + + # triggering scale up + auto_scaler.autoscale() + auto_scaler.scale.assert_called_once() + auto_scaler.remove_work.assert_called_once() From f64fc70ed6c32652c63e54cce1cdb517f7373362 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 12:10:43 +0000 Subject: [PATCH 27/34] review --- src/lightning_app/components/serve/auto_scaler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 8b7207777f9f8..85743b833414f 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -628,6 +628,8 @@ def autoscale(self) -> None: work = self.create_work() new_work_id = self.add_work(work) logger.info(f"Work created: '{new_work_id}'") + if num_workers_to_add > 0: + self._last_autoscale = time.time() # downscale if time.time() - self._last_autoscale > self.scale_in_interval: @@ -636,9 +638,10 @@ def autoscale(self) -> None: logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}") removed_work_id = self.remove_work(self.num_replicas - 1) logger.info(f"Work removed: '{removed_work_id}'") + if num_workers_to_remove > 0: + self._last_autoscale = time.time() self.load_balancer.update_servers(self.workers) - self._last_autoscale = time.time() def configure_layout(self): tabs = [ From b188ad9ef84ec876af71d11b1d8f2f38f09b9bf3 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 12:12:01 +0000 Subject: [PATCH 28/34] review --- src/lightning_app/components/serve/auto_scaler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 85743b833414f..507d2e19678b5 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -626,6 +626,7 @@ def autoscale(self) -> None: for _ in range(num_workers_to_add): logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}") work = self.create_work() + # TODO: move works into structures new_work_id = self.add_work(work) logger.info(f"Work created: '{new_work_id}'") if num_workers_to_add > 0: From 7d6aba7df57715508de4cf7d1f7fe9d7a065461d Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 12:55:07 +0000 Subject: [PATCH 29/34] fixing comments --- src/lightning_app/components/serve/auto_scaler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 507d2e19678b5..51163a4c3cdaa 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -620,11 +620,11 @@ def autoscale(self) -> None: min(self.max_replicas, self.scale(self.num_replicas, metrics)), ) - # upscale + # scale-out if time.time() - self._last_autoscale > self.scale_out_interval: num_workers_to_add = num_target_workers - self.num_replicas for _ in range(num_workers_to_add): - logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}") + logger.info(f"Scaling out from {self.num_replicas} to {self.num_replicas + 1}") work = self.create_work() # TODO: move works into structures new_work_id = self.add_work(work) @@ -632,11 +632,11 @@ def autoscale(self) -> None: if num_workers_to_add > 0: self._last_autoscale = time.time() - # downscale + # scale-in if time.time() - self._last_autoscale > self.scale_in_interval: num_workers_to_remove = self.num_replicas - num_target_workers for _ in range(num_workers_to_remove): - logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}") + logger.info(f"Scaling in from {self.num_replicas} to {self.num_replicas - 1}") removed_work_id = self.remove_work(self.num_replicas - 1) logger.info(f"Work removed: '{removed_work_id}'") if num_workers_to_remove > 0: From 66396b1b34f34baaacc8b595f6c900bbc3efa229 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 13:24:29 +0000 Subject: [PATCH 30/34] name change --- tests/tests_app/components/serve/test_auto_scaler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index 476afced651ed..c3cfa99c9d69b 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -42,7 +42,8 @@ def test_num_replicas_not_above_max_replicas(*_): EmptyWork, min_replicas=1, max_replicas=max_replicas, - autoscale_interval=0.001, + scale_out_interval=0.001, + scale_in_interval=0.001, ) for _ in range(max_replicas + 1): @@ -62,7 +63,8 @@ def test_num_replicas_not_belo_min_replicas(*_): EmptyWork, min_replicas=min_replicas, max_replicas=4, - autoscale_interval=0.001, + scale_out_interval=0.001, + scale_in_interval=0.001, ) for _ in range(3): From 11058ca9ecf59fd58c1b92e63a03157c1ed9ff99 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 13:25:12 +0000 Subject: [PATCH 31/34] Update src/lightning_app/components/serve/auto_scaler.py Co-authored-by: thomas chaton --- src/lightning_app/components/serve/auto_scaler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index ce665d7705df8..1b76f60c2f58d 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -410,7 +410,7 @@ class AutoScaler(LightningFlow): min_replicas: The number of works to start when app initializes. max_replicas: The max number of works to spawn to handle the incoming requests. scale_out_interval: The number of seconds to wait before checking whether to upscale the server - scale_in_interval: The number of seconds to wait before checking whether to downscale the server + scale_in_interval: The number of seconds to wait before checking whether to decrease the number of servers. endpoint: Provide the REST API path. max_batch_size: (auto-batching) The number of requests to process at once. timeout_batching: (auto-batching) The number of seconds to wait before sending the requests to process. From 778daa5e0bc97f0cf88aec077c849699e98ef92b Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 13:25:20 +0000 Subject: [PATCH 32/34] Update src/lightning_app/components/serve/auto_scaler.py Co-authored-by: thomas chaton --- src/lightning_app/components/serve/auto_scaler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 1b76f60c2f58d..ca215026b0a99 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -409,7 +409,7 @@ class AutoScaler(LightningFlow): Args: min_replicas: The number of works to start when app initializes. max_replicas: The max number of works to spawn to handle the incoming requests. - scale_out_interval: The number of seconds to wait before checking whether to upscale the server + scale_out_interval: The number of seconds to wait before checking whether to increase the number of servers. scale_in_interval: The number of seconds to wait before checking whether to decrease the number of servers. endpoint: Provide the REST API path. max_batch_size: (auto-batching) The number of requests to process at once. From 2a7e6a2c78327c870cfc3f74bd0eabd24799d9aa Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 13:25:57 +0000 Subject: [PATCH 33/34] args change --- examples/app_server_with_auto_scaler/app.py | 3 ++- src/lightning_app/components/serve/auto_scaler.py | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/app_server_with_auto_scaler/app.py b/examples/app_server_with_auto_scaler/app.py index 453db2424b404..2c8fb744c4fcf 100644 --- a/examples/app_server_with_auto_scaler/app.py +++ b/examples/app_server_with_auto_scaler/app.py @@ -75,7 +75,8 @@ def scale(self, replicas: int, metrics: dict) -> int: # autoscaler specific args min_replicas=1, max_replicas=4, - autoscale_interval=10, + scale_out_interval=10, + scale_in_interval=10, endpoint="predict", input_type=L.app.components.Image, output_type=L.app.components.Number, diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index ce665d7705df8..43e7a79256fb5 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -427,7 +427,8 @@ class AutoScaler(LightningFlow): MyPythonServer, min_replicas=1, max_replicas=8, - autoscale_interval=10, + scale_out_interval=10, + scale_in_interval=10, ) ) @@ -456,7 +457,8 @@ def scale(self, replicas: int, metrics: dict) -> int: MyPythonServer, min_replicas=1, max_replicas=8, - autoscale_interval=10, + scale_out_interval=10, + scale_in_interval=10, max_batch_size=8, # for auto batching timeout_batching=1, # for auto batching ) From 8177a8393cfb3a276fbf56e5298252dc7e389f4c Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 13:27:09 +0000 Subject: [PATCH 34/34] Update src/lightning_app/CHANGELOG.md --- src/lightning_app/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index 7f1baae6e2fb3..4ee679fcd6493 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -16,7 +16,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added a nicer UI with URL and examples for the autoscaler component ([#16063](https://github.com/Lightning-AI/lightning/pull/16063)) -- Enabled users to have more control over scaling up/down interval ([#16093](https://github.com/Lightning-AI/lightning/pull/16093)) +- Enabled users to have more control over scaling out/in interval ([#16093](https://github.com/Lightning-AI/lightning/pull/16093)) - Added more datatypes to serving component ([#16018](https://github.com/Lightning-AI/lightning/pull/16018))