diff --git a/pyproject.toml b/pyproject.toml index 8f16fe6bc15c8..6587db0a2c80e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -138,6 +138,7 @@ module = [ "lightning_app.utilities.packaging.cloud_compute", "lightning_app.utilities.packaging.docker", "lightning_app.utilities.packaging.lightning_utils", + "lightning_app.utilities.port", "lightning_app.utilities.proxies", "lightning_app.utilities.scheduler", "lightning_app.utilities.state", diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index 370a523d7c823..971d0ee9b19d7 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -13,6 +13,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Show a message when `BuildConfig(requirements=[...])` is passed but a `requirements.txt` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799)) - Show a message when `BuildConfig(dockerfile="...")` is passed but a `Dockerfile` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799)) +- Added a CloudMultiProcessBackend which enables running a child App from within the Flow in the cloud ([#15800](https://github.com/Lightning-AI/lightning/pull/15800)) + + ### Changed - `lightning add ssh-key` CLI command has been transitioned to `lightning create ssh-key` with the same calling signature ([#15761](https://github.com/Lightning-AI/lightning/pull/15761)) diff --git a/src/lightning_app/cli/lightning_cli.py b/src/lightning_app/cli/lightning_cli.py index 905581490972d..9f6682bce6a75 100644 --- a/src/lightning_app/cli/lightning_cli.py +++ b/src/lightning_app/cli/lightning_cli.py @@ -54,7 +54,7 @@ def get_app_url(runtime_type: RuntimeType, *args: Any, need_credits: bool = Fals action = "?action=add_credits" if need_credits else "" return f"{get_lightning_cloud_url()}/me/apps/{lit_app.id}{action}" else: - return "http://127.0.0.1:7501/view" + return os.getenv("APP_SERVER_HOST", "http://127.0.0.1:7501/view") def main() -> None: diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index 128b6cfb2980f..ce1e81d57d9ea 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -138,6 +138,7 @@ def __init__( self._schedules: Dict[str, Dict] = {} self.threads: List[threading.Thread] = [] self.exception = None + self.collect_changes: bool = True # NOTE: Checkpointing is disabled by default for the time being. We # will enable it when resuming from full checkpoint is supported. Also, @@ -362,11 +363,14 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque delta.raise_errors = False return deltas - def maybe_apply_changes(self) -> bool: + def maybe_apply_changes(self) -> None: """Get the deltas from both the flow queue and the work queue, merge the two deltas and update the state.""" self._send_flow_to_work_deltas(self.state) + if not self.collect_changes: + return None + deltas = self._collect_deltas_from_ui_and_work_queues() if not deltas: diff --git a/src/lightning_app/core/constants.py b/src/lightning_app/core/constants.py index 4a9cfece63590..4038c85e7fc1e 100644 --- a/src/lightning_app/core/constants.py +++ b/src/lightning_app/core/constants.py @@ -3,6 +3,8 @@ import lightning_cloud.env +from lightning_app.utilities.port import _find_lit_app_port + def get_lightning_cloud_url() -> str: # DO NOT CHANGE! @@ -19,7 +21,8 @@ def get_lightning_cloud_url() -> str: FLOW_DURATION_SAMPLES = 5 APP_SERVER_HOST = os.getenv("LIGHTNING_APP_STATE_URL", "http://127.0.0.1") -APP_SERVER_PORT = 7501 +APP_SERVER_IN_CLOUD = "http://lightningapp" in APP_SERVER_HOST +APP_SERVER_PORT = _find_lit_app_port(7501) APP_STATE_MAX_SIZE_BYTES = 1024 * 1024 # 1 MB CLOUD_QUEUE_TYPE = os.getenv("LIGHTNING_CLOUD_QUEUE_TYPE", None) @@ -52,9 +55,6 @@ def get_lightning_cloud_url() -> str: # EXPERIMENTAL: ENV VARIABLES TO ENABLE MULTIPLE WORKS IN THE SAME MACHINE DEFAULT_NUMBER_OF_EXPOSED_PORTS = int(os.getenv("DEFAULT_NUMBER_OF_EXPOSED_PORTS", "50")) -ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER = bool( - int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")) -) # Note: This is disabled for the time being. ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER = bool( int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER", "0")) ) # This isn't used in the cloud yet. @@ -71,3 +71,7 @@ def get_lightning_cloud_url() -> str: ) ENABLE_STATE_WEBSOCKET = bool(int(os.getenv("ENABLE_STATE_WEBSOCKET", "0"))) ENABLE_UPLOAD_ENDPOINT = bool(int(os.getenv("ENABLE_UPLOAD_ENDPOINT", "1"))) + + +def enable_multiple_works_in_default_container() -> bool: + return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0"))) diff --git a/src/lightning_app/runners/backends/__init__.py b/src/lightning_app/runners/backends/__init__.py index d5bfb0e780c50..51d68a678a5d5 100644 --- a/src/lightning_app/runners/backends/__init__.py +++ b/src/lightning_app/runners/backends/__init__.py @@ -1,9 +1,10 @@ from enum import Enum +from lightning_app.core.constants import APP_SERVER_IN_CLOUD from lightning_app.runners.backends.backend import Backend from lightning_app.runners.backends.cloud import CloudBackend from lightning_app.runners.backends.docker import DockerBackend -from lightning_app.runners.backends.mp_process import MultiProcessingBackend +from lightning_app.runners.backends.mp_process import CloudMultiProcessingBackend, MultiProcessingBackend class BackendType(Enum): @@ -13,6 +14,8 @@ class BackendType(Enum): def get_backend(self, entrypoint_file: str) -> "Backend": if self == BackendType.MULTIPROCESSING: + if APP_SERVER_IN_CLOUD: + return CloudMultiProcessingBackend(entrypoint_file) return MultiProcessingBackend(entrypoint_file) elif self == BackendType.DOCKER: return DockerBackend(entrypoint_file) diff --git a/src/lightning_app/runners/backends/mp_process.py b/src/lightning_app/runners/backends/mp_process.py index 3ad83aab55341..36a067d0bfd80 100644 --- a/src/lightning_app/runners/backends/mp_process.py +++ b/src/lightning_app/runners/backends/mp_process.py @@ -6,6 +6,7 @@ from lightning_app.runners.backends.backend import Backend, WorkManager from lightning_app.utilities.enum import WorkStageStatus from lightning_app.utilities.network import _check_service_url_is_ready +from lightning_app.utilities.port import disable_port, enable_port from lightning_app.utilities.proxies import ProxyWorkRun, WorkRunner @@ -83,3 +84,24 @@ def resolve_url(self, app, base_url: Optional[str] = None) -> None: def stop_work(self, app, work: "lightning_app.LightningWork") -> None: work_manager: MultiProcessWorkManager = app.processes[work.name] work_manager.kill() + + +class CloudMultiProcessingBackend(MultiProcessingBackend): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # Note: Track the open ports to close them on termination. + self.ports = [] + + def create_work(self, app, work) -> None: + work._host = "0.0.0.0" + nc = enable_port() + self.ports.append(nc.port) + work._port = nc.port + work._future_url = f"https://{nc.host}" + return super().create_work(app, work) + + def stop_work(self, app, work: "lightning_app.LightningWork") -> None: + disable_port(work._port) + self.ports = [port for port in self.ports if port != work._port] + return super().stop_work(app, work) diff --git a/src/lightning_app/runners/cloud.py b/src/lightning_app/runners/cloud.py index e6a474ac921ce..0752a3b5be8a8 100644 --- a/src/lightning_app/runners/cloud.py +++ b/src/lightning_app/runners/cloud.py @@ -51,7 +51,7 @@ DISABLE_DEPENDENCY_CACHE, DOT_IGNORE_FILENAME, ENABLE_APP_COMMENT_COMMAND_EXECUTION, - ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER, + enable_multiple_works_in_default_container, ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER, ENABLE_PULLING_STATE_ENDPOINT, ENABLE_PUSHING_STATE_ENDPOINT, @@ -243,7 +243,7 @@ def dispatch( if self.run_app_comment_commands or ENABLE_APP_COMMENT_COMMAND_EXECUTION: v1_env_vars.append(V1EnvVar(name="ENABLE_APP_COMMENT_COMMAND_EXECUTION", value="1")) - if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER: + if enable_multiple_works_in_default_container(): v1_env_vars.append(V1EnvVar(name="ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", value="1")) if ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER: @@ -303,7 +303,7 @@ def dispatch( ) network_configs: Optional[List[V1NetworkConfig]] = None - if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER: + if enable_multiple_works_in_default_container(): network_configs = [] initial_port = 8080 + 1 + len(frontend_specs) for _ in range(DEFAULT_NUMBER_OF_EXPOSED_PORTS): diff --git a/src/lightning_app/runners/multiprocess.py b/src/lightning_app/runners/multiprocess.py index 8abd0a443ac32..1f7e0b906ba4b 100644 --- a/src/lightning_app/runners/multiprocess.py +++ b/src/lightning_app/runners/multiprocess.py @@ -5,6 +5,7 @@ from lightning_app.api.http_methods import _add_tags_to_api, _validate_api from lightning_app.core.api import start_server +from lightning_app.core.constants import APP_SERVER_IN_CLOUD from lightning_app.runners.backends import Backend from lightning_app.runners.runtime import Runtime from lightning_app.storage.orchestrator import StorageOrchestrator @@ -13,6 +14,7 @@ from lightning_app.utilities.component import _set_flow_context, _set_frontend_context from lightning_app.utilities.load_app import extract_metadata_from_app from lightning_app.utilities.network import find_free_network_port +from lightning_app.utilities.port import disable_port @dataclass @@ -31,6 +33,9 @@ def dispatch(self, *args: Any, on_before_run: Optional[Callable] = None, **kwarg try: _set_flow_context() + # Note: In case the runtime is used in the cloud. + self.host = "0.0.0.0" if APP_SERVER_IN_CLOUD else self.host + self.app.backend = self.backend self.backend._prepare_queues(self.app) self.backend.resolve_url(self.app, "http://127.0.0.1") @@ -109,3 +114,11 @@ def dispatch(self, *args: Any, on_before_run: Optional[Callable] = None, **kwarg raise finally: self.terminate() + + def terminate(self): + if APP_SERVER_IN_CLOUD: + # Close all the ports open for the App within the App. + ports = [self.port] + getattr(self.backend, "ports", []) + for port in ports: + disable_port(port) + super().terminate() diff --git a/src/lightning_app/utilities/port.py b/src/lightning_app/utilities/port.py new file mode 100644 index 0000000000000..45877dfd85930 --- /dev/null +++ b/src/lightning_app/utilities/port.py @@ -0,0 +1,143 @@ +import os +from typing import Optional + +from lightning_cloud.openapi import AppinstancesIdBody, Externalv1LightningappInstance, V1NetworkConfig + +from lightning_app.utilities.network import LightningClient + + +def _find_lit_app_port(default_port: int) -> int: + """Make a request to the cloud controlplane to find a disabled port of the flow, enable it and return it.""" + + app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None) + project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None) + enable_multiple_works_in_default_container = bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0"))) + + if not app_id or not project_id or not enable_multiple_works_in_default_container: + return default_port + + client = LightningClient() + list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id) + lit_app: Optional[Externalv1LightningappInstance] = None + + for lightningapp in list_apps_resp.lightningapps: + if lightningapp.id == app_id: + lit_app = lightningapp + + if not lit_app: + raise RuntimeError( + "App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues." + ) + + found_nc = None + + for nc in lit_app.spec.network_config: + if not nc.enable: + found_nc = nc + nc.enable = True + break + + client.lightningapp_instance_service_update_lightningapp_instance( + project_id=project_id, + id=lit_app.id, + body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec), + ) + + if not found_nc: + raise RuntimeError( + "No available port was found. Please open an issue at https://github.com/lightning-AI/lightning/issues." + ) + + # Note: This is required for the framework to know we need to use the CloudMultiProcessRuntime. + os.environ["APP_SERVER_HOST"] = f"https://{found_nc.host}" + + return found_nc.port + + +def enable_port() -> V1NetworkConfig: + """Make a request to the cloud controlplane to open a port of the flow.""" + app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None) + project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None) + + if not app_id or not project_id: + raise Exception("The app_id and project_id should be defined.") + + client = LightningClient() + list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id) + lit_app: Optional[Externalv1LightningappInstance] = None + + for lightningapp in list_apps_resp.lightningapps: + if lightningapp.id == app_id: + lit_app = lightningapp + + if not lit_app: + raise RuntimeError( + "App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues." + ) + + found_nc = None + + for nc in lit_app.spec.network_config: + if not nc.enable: + found_nc = nc + nc.enable = True + break + + client.lightningapp_instance_service_update_lightningapp_instance( + project_id=project_id, + id=lit_app.id, + body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec), + ) + + if not found_nc: + raise RuntimeError( + "No available port was found. Please open an issue at https://github.com/lightning-AI/lightning/issues." + ) + + return found_nc + + +def disable_port(port: int, ignore_disabled: bool = True) -> None: + """Make a request to the cloud controlplane to close a port of the flow.""" + + app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None) + project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None) + + if not app_id or not project_id: + raise Exception("The app_id and project_id should be defined.") + + client = LightningClient() + list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id) + lit_app: Optional[Externalv1LightningappInstance] = None + + for lightningapp in list_apps_resp.lightningapps: + if lightningapp.id == app_id: + lit_app = lightningapp + + if not lit_app: + raise RuntimeError( + "App was not found. Please open an issue at https://github.com/lightning-AI/lightning/issues." + ) + + found_nc = None + + for nc in lit_app.spec.network_config: + if nc.port == port: + if not nc.enable and not ignore_disabled: + raise RuntimeError(f"The port {port} was already disabled.") + + nc.enable = False + found_nc = nc + break + + client.lightningapp_instance_service_update_lightningapp_instance( + project_id=project_id, + id=lit_app.id, + body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec), + ) + + if not found_nc: + ports = [nc.port for nc in lit_app.spec.network_config] + raise ValueError(f"The provided port doesn't exists. Available ports are {ports}.") + + assert found_nc diff --git a/tests/tests_app/utilities/test_port.py b/tests/tests_app/utilities/test_port.py new file mode 100644 index 0000000000000..c371fe057e1bd --- /dev/null +++ b/tests/tests_app/utilities/test_port.py @@ -0,0 +1,109 @@ +from unittest.mock import MagicMock + +import pytest +from lightning_cloud.openapi import V1NetworkConfig + +from lightning_app.utilities import port +from lightning_app.utilities.port import _find_lit_app_port, disable_port, enable_port + + +def test_find_lit_app_port(monkeypatch): + client = MagicMock() + monkeypatch.setattr(port, "LightningClient", MagicMock(return_value=client)) + + assert 5701 == _find_lit_app_port(5701) + + resp = MagicMock() + lit_app = MagicMock() + lit_app.id = "a" + lit_app.spec.network_config = [ + V1NetworkConfig(host="a", port=0, enable=True), + V1NetworkConfig(host="a", port=1, enable=False), + ] + resp.lightningapps = [lit_app] + client.lightningapp_instance_service_list_lightningapp_instances.return_value = resp + + monkeypatch.setenv("LIGHTNING_CLOUD_APP_ID", "a") + monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "a") + monkeypatch.setenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "1") + + assert _find_lit_app_port(5701) == 1 + + lit_app.spec.network_config = [ + V1NetworkConfig(host="a", port=0, enable=True), + V1NetworkConfig(host="a", port=1, enable=True), + ] + + with pytest.raises(RuntimeError, match="No available port was found. Please"): + _find_lit_app_port(5701) + + +def test_enable_port(monkeypatch): + client = MagicMock() + monkeypatch.setattr(port, "LightningClient", MagicMock(return_value=client)) + + assert 5701 == _find_lit_app_port(5701) + + resp = MagicMock() + lit_app = MagicMock() + lit_app.id = "a" + lit_app.spec.network_config = [ + V1NetworkConfig(host="a", port=0, enable=True), + V1NetworkConfig(host="a", port=1, enable=False), + ] + resp.lightningapps = [lit_app] + client.lightningapp_instance_service_list_lightningapp_instances.return_value = resp + + monkeypatch.setenv("LIGHTNING_CLOUD_APP_ID", "a") + monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "a") + monkeypatch.setenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "1") + + assert enable_port() + + lit_app.spec.network_config = [ + V1NetworkConfig(host="a", port=0, enable=True), + V1NetworkConfig(host="a", port=1, enable=True), + ] + + with pytest.raises(RuntimeError, match="No available port was found. Please"): + assert enable_port() + + +def test_disable_port(monkeypatch): + client = MagicMock() + monkeypatch.setattr(port, "LightningClient", MagicMock(return_value=client)) + + assert 5701 == _find_lit_app_port(5701) + + resp = MagicMock() + lit_app = MagicMock() + lit_app.id = "a" + lit_app.spec.network_config = [ + V1NetworkConfig(host="a", port=0, enable=True), + V1NetworkConfig(host="a", port=1, enable=False), + ] + resp.lightningapps = [lit_app] + client.lightningapp_instance_service_list_lightningapp_instances.return_value = resp + + monkeypatch.setenv("LIGHTNING_CLOUD_APP_ID", "a") + monkeypatch.setenv("LIGHTNING_CLOUD_PROJECT_ID", "a") + monkeypatch.setenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "1") + + disable_port(0) + assert not lit_app.spec.network_config[0].enable + + lit_app.spec.network_config = [ + V1NetworkConfig(host="a", port=0, enable=True), + V1NetworkConfig(host="a", port=1, enable=False), + ] + + with pytest.raises(RuntimeError, match="The port 1 was already disabled."): + disable_port(1, ignore_disabled=False) + + lit_app.spec.network_config = [ + V1NetworkConfig(host="a", port=0, enable=True), + V1NetworkConfig(host="a", port=1, enable=False), + ] + + with pytest.raises(ValueError, match="[0, 1]"): + assert disable_port(10)