From 4f2ef5475dca17b7188799759f8b1af8226acc0a Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 12 Dec 2022 16:50:23 +0000 Subject: [PATCH 1/5] [App] Support randomising app name when running on multiple clusters --- src/lightning_app/runners/cloud.py | 205 ++++++++++-------- .../utilities/packaging/app_config.py | 5 +- tests/tests_app/runners/test_cloud.py | 127 +++++++---- 3 files changed, 201 insertions(+), 136 deletions(-) diff --git a/src/lightning_app/runners/cloud.py b/src/lightning_app/runners/cloud.py index 6ef7770124aae..5eda1fd6dc6fa 100644 --- a/src/lightning_app/runners/cloud.py +++ b/src/lightning_app/runners/cloud.py @@ -6,7 +6,6 @@ import time from dataclasses import dataclass from pathlib import Path -from textwrap import dedent from typing import Any, List, Optional, Union import click @@ -19,6 +18,7 @@ Externalv1LightningappInstance, Gridv1ImageSpec, V1BuildSpec, + V1ClusterType, V1DependencyFileInfo, V1Drive, V1DriveSpec, @@ -210,8 +210,6 @@ def dispatch( # Determine the root of the project: Start at the entrypoint_file and look for nearby Lightning config files, # going up the directory structure. The root of the project is where the Lightning config file is located. - # TODO: verify lightning version - # _verify_lightning_version() config_file = _get_config_file(self.entrypoint_file) app_config = AppConfig.load_from_file(config_file) if config_file.exists() else AppConfig() root = Path(self.entrypoint_file).absolute().parent @@ -228,10 +226,6 @@ def dispatch( # Override the name if provided by the CLI app_config.name = name - if cluster_id: - # Override the cluster ID if provided by the CLI - app_config.cluster_id = cluster_id - print(f"The name of the app is: {app_config.name}") v1_env_vars = [V1EnvVar(name=k, value=v) for k, v in self.env_vars.items()] @@ -293,17 +287,93 @@ def dispatch( project = _get_project(self.backend.client) try: - list_apps_resp = self.backend.client.lightningapp_v2_service_list_lightningapps_v2( - project_id=project.project_id, name=app_config.name + if cluster_id is not None: + # Verify that the cluster exists + list_clusters_resp = self.backend.client.cluster_service_list_clusters() + cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters] + if cluster_id not in cluster_ids: + msg = f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist." + raise ValueError(msg) + + self._ensure_cluster_project_binding(project.project_id, cluster_id) + + # Resolve the app name, instance, and cluster ID + existing_instance = None + app_name = app_config.name + + # List existing instances + find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances( + project_id=project.project_id ) - if list_apps_resp.lightningapps: - # There can be only one app with unique project_id<>name pair - lit_app = list_apps_resp.lightningapps[0] - else: - app_body = Body7(name=app_config.name, can_download_source_code=True) + + # Seach for instances whose name starts with the given name + instances = [ + lightningapp + for lightningapp in find_instances_resp.lightningapps + if lightningapp.name.startswith(app_name) + ] + + # If instances exist and cluster is None, mimic cluster selection logic to choose a default + if cluster_id is None and len(instances) > 0: + # Determine the cluster ID + cluster_id = self._get_default_cluster(project.project_id) + + # If an instance exists on the cluster with the base name - restart it + for instance in instances: + if instance.spec.cluster_id == cluster_id: + existing_instance = instance + break + + # If instances exist but not on the cluster - choose a randomised name + if len(instances) > 0 and existing_instance is None: + letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + + name_exists = True + while name_exists: + random_name = app_name + "-" + "".join(random.sample(letters, 4)) + name_exists = any([instance.name == random_name for instance in instances]) + + app_name = random_name + + # Create the app if it doesn't exist + if existing_instance is None: + app_body = Body7(name=app_name, can_download_source_code=True) lit_app = self.backend.client.lightningapp_v2_service_create_lightningapp_v2( project_id=project.project_id, body=app_body ) + app_id = lit_app.id + else: + app_id = existing_instance.spec.app_id + + # check if user has sufficient credits to run an app + # if so set the desired state to running otherwise, create the app in stopped state, + # and open the admin ui to add credits and running the app. + has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app) + app_release_desired_state = ( + V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED + ) + if not has_sufficient_credits: + logger.warn("You may need Lightning credits to run your apps on the cloud.") + + # Stop the instance if it isn't stopped yet + if existing_instance and existing_instance.status.phase != V1LightningappInstanceState.STOPPED: + # TODO(yurij): Implement release switching in the UI and remove this + # We can only switch release of the stopped instance + existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance( + project_id=project.project_id, + id=existing_instance.id, + body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)), + ) + # wait for the instance to stop for up to 150 seconds + for _ in range(150): + existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance( + project_id=project.project_id, id=existing_instance.id + ) + if existing_instance.status.phase == V1LightningappInstanceState.STOPPED: + break + time.sleep(1) + if existing_instance.status.phase != V1LightningappInstanceState.STOPPED: + raise RuntimeError("Failed to stop the existing instance.") network_configs: Optional[List[V1NetworkConfig]] = None if enable_multiple_works_in_default_container(): @@ -318,90 +388,18 @@ def dispatch( ) initial_port += 1 - # check if user has sufficient credits to run an app - # if so set the desired state to running otherwise, create the app in stopped state, - # and open the admin ui to add credits and running the app. - has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app) - app_release_desired_state = ( - V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED - ) - if not has_sufficient_credits: - logger.warn("You may need Lightning credits to run your apps on the cloud.") - - # right now we only allow a single instance of the app - find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances( - project_id=project.project_id, app_id=lit_app.id - ) - queue_server_type = V1QueueServerType.UNSPECIFIED if CLOUD_QUEUE_TYPE == "http": queue_server_type = V1QueueServerType.HTTP elif CLOUD_QUEUE_TYPE == "redis": queue_server_type = V1QueueServerType.REDIS - existing_instance: Optional[Externalv1LightningappInstance] = None - if find_instances_resp.lightningapps: - existing_instance = find_instances_resp.lightningapps[0] - - if not app_config.cluster_id: - # Re-run the app on the same cluster - app_config.cluster_id = existing_instance.spec.cluster_id - - if existing_instance.status.phase != V1LightningappInstanceState.STOPPED: - # TODO(yurij): Implement release switching in the UI and remove this - # We can only switch release of the stopped instance - existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance( - project_id=project.project_id, - id=existing_instance.id, - body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)), - ) - # wait for the instance to stop for up to 150 seconds - for _ in range(150): - existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance( - project_id=project.project_id, id=existing_instance.id - ) - if existing_instance.status.phase == V1LightningappInstanceState.STOPPED: - break - time.sleep(1) - if existing_instance.status.phase != V1LightningappInstanceState.STOPPED: - raise RuntimeError("Failed to stop the existing instance.") - - if app_config.cluster_id is not None: - # Verify that the cluster exists - list_clusters_resp = self.backend.client.cluster_service_list_clusters() - cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters] - if app_config.cluster_id not in cluster_ids: - if cluster_id: - msg = f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist." - else: - msg = ( - f"Your app last ran on cluster {app_config.cluster_id}, but that cluster " - "doesn't exist anymore." - ) - raise ValueError(msg) - if existing_instance and existing_instance.spec.cluster_id != app_config.cluster_id: - raise ValueError( - dedent( - f"""\ - An app names {app_config.name} is already running on cluster {existing_instance.spec.cluster_id}, and you requested it to run on cluster {app_config.cluster_id}. - - In order to proceed, please either: - a. rename the app to run on {app_config.cluster_id} with the --name option - lightning run app {app_entrypoint_file} --name (new name) --cloud --cluster-id {app_config.cluster_id} - b. delete the app running on {existing_instance.spec.cluster_id} in the UI before running this command. - """ # noqa: E501 - ) - ) - - if app_config.cluster_id is not None: - self._ensure_cluster_project_binding(project.project_id, app_config.cluster_id) - release_body = Body8( app_entrypoint_file=app_spec.app_entrypoint_file, enable_app_server=app_spec.enable_app_server, flow_servers=app_spec.flow_servers, image_spec=app_spec.image_spec, - cluster_id=app_config.cluster_id, + cluster_id=cluster_id, network_config=network_configs, works=works, local_source=True, @@ -412,14 +410,13 @@ def dispatch( # create / upload the new app release lightning_app_release = self.backend.client.lightningapp_v2_service_create_lightningapp_release( - project_id=project.project_id, app_id=lit_app.id, body=release_body + project_id=project.project_id, app_id=app_id, body=release_body ) if lightning_app_release.source_upload_url == "": raise RuntimeError("The source upload url is empty.") if getattr(lightning_app_release, "cluster_id", None): - app_config.cluster_id = lightning_app_release.cluster_id logger.info(f"Running app on {lightning_app_release.cluster_id}") # Save the config for re-runs @@ -428,7 +425,7 @@ def dispatch( repo.package() repo.upload(url=lightning_app_release.source_upload_url) - if find_instances_resp.lightningapps: + if existing_instance is not None: lightning_app_instance = ( self.backend.client.lightningapp_instance_service_update_lightningapp_instance_release( project_id=project.project_id, @@ -452,12 +449,12 @@ def dispatch( lightning_app_instance = ( self.backend.client.lightningapp_v2_service_create_lightningapp_release_instance( project_id=project.project_id, - app_id=lit_app.id, + app_id=app_id, id=lightning_app_release.id, body=Body9( - cluster_id=app_config.cluster_id, + cluster_id=cluster_id, desired_state=app_release_desired_state, - name=lit_app.name, + name=app_name, env=v1_env_vars, queue_server_type=queue_server_type, ), @@ -490,6 +487,28 @@ def _ensure_cluster_project_binding(self, project_id: str, cluster_id: str): body=V1ProjectClusterBinding(cluster_id=cluster_id, project_id=project_id), ) + def _get_default_cluster(self, project_id: str) -> str: + """This utility implements a minimal version of the cluster selection logic used in the cloud. + + TODO: This should be requested directly from the platform. + """ + cluster_bindings = self.backend.client.projects_service_list_project_cluster_bindings( + project_id=project_id + ).clusters + + if len(cluster_bindings) == 1: + return cluster_bindings[0].cluster_id + + clusters = [ + self.backend.client.cluster_service_get_cluster(cluster_binding.cluster_id) + for cluster_binding in cluster_bindings + ] + + # Filter global clusters + clusters = [cluster for cluster in clusters if cluster.spec.cluster_type == V1ClusterType.GLOBAL] + + return random.choice(clusters).id + @staticmethod def _check_uploaded_folder(root: Path, repo: LocalSourceCodeDir) -> None: """This method is used to inform the users if their folder files are large and how to filter them.""" diff --git a/src/lightning_app/utilities/packaging/app_config.py b/src/lightning_app/utilities/packaging/app_config.py index c3e44159ffb4e..202ad91aec19f 100644 --- a/src/lightning_app/utilities/packaging/app_config.py +++ b/src/lightning_app/utilities/packaging/app_config.py @@ -1,6 +1,6 @@ import pathlib from dataclasses import asdict, dataclass, field -from typing import Optional, Union +from typing import Union import yaml @@ -18,7 +18,6 @@ class AppConfig: """ name: str = field(default_factory=get_unique_name) - cluster_id: Optional[str] = field(default=None) def save_to_file(self, path: Union[str, pathlib.Path]) -> None: """Save the configuration to the given file in YAML format.""" @@ -35,6 +34,8 @@ def load_from_file(cls, path: Union[str, pathlib.Path]) -> "AppConfig": """Load the configuration from the given file.""" with open(path) as file: config = yaml.safe_load(file) + # Ignore `cluster_id` without error for backwards compatibility. + config.pop("cluster_id", None) return cls(**config) @classmethod diff --git a/tests/tests_app/runners/test_cloud.py b/tests/tests_app/runners/test_cloud.py index e89e1e8aa468d..1e5f3c9433efb 100644 --- a/tests/tests_app/runners/test_cloud.py +++ b/tests/tests_app/runners/test_cloud.py @@ -2,7 +2,6 @@ import os import re import sys -from contextlib import nullcontext as does_not_raise from copy import copy from pathlib import Path from unittest import mock @@ -17,12 +16,15 @@ Gridv1ImageSpec, IdGetBody, V1BuildSpec, + V1ClusterSpec, + V1ClusterType, V1DependencyFileInfo, V1Drive, V1DriveSpec, V1DriveStatus, V1DriveType, V1EnvVar, + V1GetClusterResponse, V1LightningappInstanceState, V1LightningappRelease, V1LightningworkDrives, @@ -30,6 +32,7 @@ V1ListClustersResponse, V1ListLightningappInstancesResponse, V1ListMembershipsResponse, + V1ListProjectClusterBindingsResponse, V1Membership, V1Metadata, V1NetworkConfig, @@ -158,25 +161,16 @@ def test_run_on_deleted_cluster(self, cloud_backend): # TODO: remove this test once there is support for multiple instances @pytest.mark.parametrize( - "old_cluster,new_cluster,expected_raise", + "old_cluster,new_cluster", [ - ( - "test", - "other", - pytest.raises( - ValueError, - match="already running on cluster", - ), - ), - ("test", "test", does_not_raise()), - (None, None, does_not_raise()), - (None, "litng-ai-03", does_not_raise()), - ("litng-ai-03", None, does_not_raise()), + ("test", "other"), + ("test", "test"), + (None, None), + (None, "litng-ai-03"), + ("litng-ai-03", None), ], ) - def test_new_instance_on_different_cluster( - self, cloud_backend, project_id, old_cluster, new_cluster, expected_raise - ): + def test_new_instance_on_different_cluster(self, cloud_backend, project_id, old_cluster, new_cluster): app_name = "test-app" mock_client = mock.MagicMock() @@ -197,6 +191,18 @@ def test_new_instance_on_different_cluster( ] ) + mock_client.projects_service_list_project_cluster_bindings.return_value = V1ListProjectClusterBindingsResponse( + clusters=[ + V1ProjectClusterBinding(cluster_id=old_cluster or DEFAULT_CLUSTER), + V1ProjectClusterBinding(cluster_id=new_cluster or DEFAULT_CLUSTER), + ] + ) + + # Mock all clusters as global clusters + mock_client.cluster_service_get_cluster.side_effect = lambda cluster_id: V1GetClusterResponse( + id=cluster_id, spec=V1ClusterSpec(cluster_type=V1ClusterType.GLOBAL) + ) + cloud_backend.client = mock_client app = mock.MagicMock() @@ -216,8 +222,15 @@ def test_new_instance_on_different_cluster( # This is the main assertion: # we have an existing instance on `cluster-001` # but we want to run this app on `cluster-002` - with expected_raise: - cloud_runtime.dispatch(name=app_name, cluster_id=new_cluster) + cloud_runtime.dispatch(name=app_name, cluster_id=new_cluster) + + if new_cluster != old_cluster and None not in (old_cluster, new_cluster): + # If we switched cluster, check that a new name was used which starts with the old name + mock_client.lightningapp_v2_service_create_lightningapp_release_instance.assert_called_once() + args = mock_client.lightningapp_v2_service_create_lightningapp_release_instance.call_args + assert args[1]["body"].name != app_name + assert args[1]["body"].name.startswith(app_name) + assert args[1]["body"].cluster_id == new_cluster @pytest.mark.parametrize("flow_cloud_compute", [None, CloudCompute(name="t2.medium")]) @mock.patch("lightning_app.runners.backends.cloud.LightningClient", mock.MagicMock()) @@ -326,9 +339,7 @@ def test_requirements_file(self, monkeypatch): mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( V1ListLightningappInstancesResponse(lightningapps=[]) ) - mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease( - cluster_id="test" - ) + mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease() mock_client.cluster_service_list_clusters.return_value = V1ListClustersResponse([Externalv1Cluster(id="test")]) cloud_backend = mock.MagicMock() cloud_backend.client = mock_client @@ -373,7 +384,6 @@ def test_requirements_file(self, monkeypatch): path="requirements.txt", ), ) - body.cluster_id = "test" cloud_runtime.backend.client.lightningapp_v2_service_create_lightningapp_release.assert_called_with( project_id="test-project-id", app_id=mock.ANY, body=body ) @@ -436,7 +446,7 @@ def test_no_cache(self, monkeypatch): def test_call_with_work_app(self, lightningapps, start_with_flow, monkeypatch, tmpdir): source_code_root_dir = Path(tmpdir / "src").absolute() source_code_root_dir.mkdir() - Path(source_code_root_dir / ".lightning").write_text("cluster_id: test\nname: myapp") + Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() @@ -447,9 +457,15 @@ def test_call_with_work_app(self, lightningapps, start_with_flow, monkeypatch, t mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( V1ListLightningappInstancesResponse(lightningapps=lightningapps) ) - mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease( - cluster_id="test" + mock_client.projects_service_list_project_cluster_bindings.return_value = V1ListProjectClusterBindingsResponse( + clusters=[ + V1ProjectClusterBinding(cluster_id="test"), + ] + ) + mock_client.cluster_service_get_cluster.side_effect = lambda cluster_id: V1GetClusterResponse( + id=cluster_id, spec=V1ClusterSpec(cluster_type=V1ClusterType.GLOBAL) ) + mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease() mock_client.cluster_service_list_clusters.return_value = V1ListClustersResponse([Externalv1Cluster(id="test")]) mock_client.lightningapp_v2_service_create_lightningapp_release_instance.return_value = MagicMock() existing_instance = MagicMock() @@ -551,9 +567,7 @@ def test_call_with_queue_server_type_specified(self, lightningapps, monkeypatch, mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( V1ListLightningappInstancesResponse(lightningapps=[]) ) - mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease( - cluster_id="test" - ) + mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease() mock_client.cluster_service_list_clusters.return_value = V1ListClustersResponse([Externalv1Cluster(id="test")]) cloud_backend = mock.MagicMock() cloud_backend.client = mock_client @@ -574,7 +588,6 @@ def test_call_with_queue_server_type_specified(self, lightningapps, monkeypatch, # calling with no env variable set body = IdGetBody( - cluster_id="test", desired_state=V1LightningappInstanceState.STOPPED, env=[], name=mock.ANY, @@ -590,7 +603,6 @@ def test_call_with_queue_server_type_specified(self, lightningapps, monkeypatch, cloud_runtime.backend.client.reset_mock() cloud_runtime.dispatch() body = IdGetBody( - cluster_id="test", desired_state=V1LightningappInstanceState.STOPPED, env=[], name=mock.ANY, @@ -606,7 +618,7 @@ def test_call_with_queue_server_type_specified(self, lightningapps, monkeypatch, def test_call_with_work_app_and_attached_drives(self, lightningapps, monkeypatch, tmpdir): source_code_root_dir = Path(tmpdir / "src").absolute() source_code_root_dir.mkdir() - Path(source_code_root_dir / ".lightning").write_text("cluster_id: test\nname: myapp") + Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() @@ -617,9 +629,15 @@ def test_call_with_work_app_and_attached_drives(self, lightningapps, monkeypatch mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( V1ListLightningappInstancesResponse(lightningapps=lightningapps) ) - mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease( - cluster_id="test" + mock_client.projects_service_list_project_cluster_bindings.return_value = V1ListProjectClusterBindingsResponse( + clusters=[ + V1ProjectClusterBinding(cluster_id="test"), + ] + ) + mock_client.cluster_service_get_cluster.side_effect = lambda cluster_id: V1GetClusterResponse( + id=cluster_id, spec=V1ClusterSpec(cluster_type=V1ClusterType.GLOBAL) ) + mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease() mock_client.cluster_service_list_clusters.return_value = V1ListClustersResponse([Externalv1Cluster(id="test")]) lightning_app_instance = MagicMock() mock_client.lightningapp_v2_service_create_lightningapp_release_instance = MagicMock( @@ -742,7 +760,7 @@ def test_call_with_work_app_and_attached_drives(self, lightningapps, monkeypatch def test_call_with_work_app_and_app_comment_command_execution_set(self, lightningapps, monkeypatch, tmpdir): source_code_root_dir = Path(tmpdir / "src").absolute() source_code_root_dir.mkdir() - Path(source_code_root_dir / ".lightning").write_text("cluster_id: test\nname: myapp") + Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() @@ -750,12 +768,20 @@ def test_call_with_work_app_and_app_comment_command_execution_set(self, lightnin if lightningapps: lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED lightningapps[0].spec.cluster_id = "test" + mock_client.projects_service_list_project_cluster_bindings.return_value = ( + V1ListProjectClusterBindingsResponse( + clusters=[ + V1ProjectClusterBinding(cluster_id="test"), + ] + ) + ) + mock_client.cluster_service_get_cluster.side_effect = lambda cluster_id: V1GetClusterResponse( + id=cluster_id, spec=V1ClusterSpec(cluster_type=V1ClusterType.GLOBAL) + ) mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( V1ListLightningappInstancesResponse(lightningapps=lightningapps) ) - mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease( - cluster_id="test" - ) + mock_client.lightningapp_v2_service_create_lightningapp_release.return_value = V1LightningappRelease() mock_client.cluster_service_list_clusters.return_value = V1ListClustersResponse([Externalv1Cluster(id="test")]) lightning_app_instance = MagicMock() mock_client.lightningapp_v2_service_create_lightningapp_release_instance = MagicMock( @@ -846,7 +872,6 @@ def test_call_with_work_app_and_app_comment_command_execution_set(self, lightnin app_id=mock.ANY, id=mock.ANY, body=Body9( - cluster_id="test", desired_state=V1LightningappInstanceState.STOPPED, name=mock.ANY, env=[V1EnvVar(name="ENABLE_APP_COMMENT_COMMAND_EXECUTION", value="1")], @@ -859,7 +884,7 @@ def test_call_with_work_app_and_app_comment_command_execution_set(self, lightnin def test_call_with_work_app_and_multiple_attached_drives(self, lightningapps, monkeypatch, tmpdir): source_code_root_dir = Path(tmpdir / "src").absolute() source_code_root_dir.mkdir() - Path(source_code_root_dir / ".lightning").write_text("cluster_id: test\nname: myapp") + Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() @@ -867,6 +892,16 @@ def test_call_with_work_app_and_multiple_attached_drives(self, lightningapps, mo if lightningapps: lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED lightningapps[0].spec.cluster_id = "test" + mock_client.projects_service_list_project_cluster_bindings.return_value = ( + V1ListProjectClusterBindingsResponse( + clusters=[ + V1ProjectClusterBinding(cluster_id="test"), + ] + ) + ) + mock_client.cluster_service_get_cluster.side_effect = lambda cluster_id: V1GetClusterResponse( + id=cluster_id, spec=V1ClusterSpec(cluster_type=V1ClusterType.GLOBAL) + ) mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( V1ListLightningappInstancesResponse(lightningapps=lightningapps) ) @@ -1062,7 +1097,7 @@ def test_call_with_work_app_and_multiple_attached_drives(self, lightningapps, mo def test_call_with_work_app_and_attached_mount_and_drive(self, lightningapps, monkeypatch, tmpdir): source_code_root_dir = Path(tmpdir / "src").absolute() source_code_root_dir.mkdir() - Path(source_code_root_dir / ".lightning").write_text("cluster_id: test\nname: myapp") + Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() @@ -1070,6 +1105,16 @@ def test_call_with_work_app_and_attached_mount_and_drive(self, lightningapps, mo if lightningapps: lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED lightningapps[0].spec.cluster_id = "test" + mock_client.projects_service_list_project_cluster_bindings.return_value = ( + V1ListProjectClusterBindingsResponse( + clusters=[ + V1ProjectClusterBinding(cluster_id="test"), + ] + ) + ) + mock_client.cluster_service_get_cluster.side_effect = lambda cluster_id: V1GetClusterResponse( + id=cluster_id, spec=V1ClusterSpec(cluster_type=V1ClusterType.GLOBAL) + ) mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( V1ListLightningappInstancesResponse(lightningapps=lightningapps) ) From f8f5dd79b4f7b159fe50228fc89b0184d4871bac Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 12 Dec 2022 17:13:29 +0000 Subject: [PATCH 2/5] Fix tests --- tests/tests_app/cli/test_cloud_cli.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/tests_app/cli/test_cloud_cli.py b/tests/tests_app/cli/test_cloud_cli.py index 169e82ab1f42f..598a90368da88 100644 --- a/tests/tests_app/cli/test_cloud_cli.py +++ b/tests/tests_app/cli/test_cloud_cli.py @@ -11,7 +11,6 @@ from lightning_cloud.openapi import ( V1LightningappV2, V1ListLightningappInstancesResponse, - V1ListLightningappsV2Response, V1ListMembershipsResponse, V1Membership, ) @@ -102,8 +101,8 @@ def __init__(self, *args, create_response, **kwargs): super().__init__() self.create_response = create_response - def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs): - return V1ListLightningappsV2Response(lightningapps=[V1LightningappV2(id="my_app", name="app")]) + def lightningapp_v2_service_create_lightningapp_v2(self, *args, **kwargs): + return V1LightningappV2(id="my_app", name="app") def lightningapp_v2_service_create_lightningapp_release(self, project_id, app_id, body): assert project_id == "test-project-id" @@ -183,7 +182,7 @@ def __init__(self, *args, message, **kwargs): super().__init__() self.message = message - def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs): + def lightningapp_instance_service_list_lightningapp_instances(self, *args, **kwargs): raise ApiException( http_resp=HttpHeaderDict( data=self.message, From c001b045221ca52cb6cf4a62a4da13cd4c865d14 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 12 Dec 2022 17:30:59 +0000 Subject: [PATCH 3/5] Move out name randomisation --- src/lightning_app/runners/cloud.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/lightning_app/runners/cloud.py b/src/lightning_app/runners/cloud.py index 5eda1fd6dc6fa..e7ca8893b8c55 100644 --- a/src/lightning_app/runners/cloud.py +++ b/src/lightning_app/runners/cloud.py @@ -292,8 +292,7 @@ def dispatch( list_clusters_resp = self.backend.client.cluster_service_list_clusters() cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters] if cluster_id not in cluster_ids: - msg = f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist." - raise ValueError(msg) + raise ValueError(f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist.") self._ensure_cluster_project_binding(project.project_id, cluster_id) @@ -326,11 +325,9 @@ def dispatch( # If instances exist but not on the cluster - choose a randomised name if len(instances) > 0 and existing_instance is None: - letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - name_exists = True while name_exists: - random_name = app_name + "-" + "".join(random.sample(letters, 4)) + random_name = self._randomise_name(app_name) name_exists = any([instance.name == random_name for instance in instances]) app_name = random_name @@ -496,6 +493,9 @@ def _get_default_cluster(self, project_id: str) -> str: project_id=project_id ).clusters + if not cluster_bindings: + raise ValueError(f"No clusters are bound to the project {project_id}.") + if len(cluster_bindings) == 1: return cluster_bindings[0].cluster_id @@ -509,6 +509,11 @@ def _get_default_cluster(self, project_id: str) -> str: return random.choice(clusters).id + @staticmethod + def _randomise_name(app_name: str) -> str: + letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + return app_name + "-" + "".join(random.sample(letters, 4)) + @staticmethod def _check_uploaded_folder(root: Path, repo: LocalSourceCodeDir) -> None: """This method is used to inform the users if their folder files are large and how to filter them.""" From 2921da784f17aea6e1fd5cf9c8c5d0a5f7a7d037 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 12 Dec 2022 18:00:53 +0000 Subject: [PATCH 4/5] Add pagination todo --- src/lightning_app/runners/cloud.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lightning_app/runners/cloud.py b/src/lightning_app/runners/cloud.py index e7ca8893b8c55..974953a8bbaa6 100644 --- a/src/lightning_app/runners/cloud.py +++ b/src/lightning_app/runners/cloud.py @@ -301,6 +301,7 @@ def dispatch( app_name = app_config.name # List existing instances + # TODO: Add pagination, otherwise this could break if users have a lot of apps. find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances( project_id=project.project_id ) From 1a99ea990c5a1703a0a23426784fba38139fda63 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 12 Dec 2022 20:24:39 +0000 Subject: [PATCH 5/5] Make safer --- src/lightning_app/runners/cloud.py | 8 +++++--- tests/tests_app/runners/test_cloud.py | 12 +++++++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/lightning_app/runners/cloud.py b/src/lightning_app/runners/cloud.py index 974953a8bbaa6..738833611603b 100644 --- a/src/lightning_app/runners/cloud.py +++ b/src/lightning_app/runners/cloud.py @@ -1,6 +1,7 @@ import fnmatch import json import random +import re import string import sys import time @@ -306,11 +307,12 @@ def dispatch( project_id=project.project_id ) - # Seach for instances whose name starts with the given name + # Seach for instances with the given name (possibly with some random characters appended) + pattern = re.escape(f"{app_name}-") + ".{4}" instances = [ lightningapp for lightningapp in find_instances_resp.lightningapps - if lightningapp.name.startswith(app_name) + if lightningapp.name == app_name or (re.fullmatch(pattern, lightningapp.name) is not None) ] # If instances exist and cluster is None, mimic cluster selection logic to choose a default @@ -318,7 +320,7 @@ def dispatch( # Determine the cluster ID cluster_id = self._get_default_cluster(project.project_id) - # If an instance exists on the cluster with the base name - restart it + # If an instance exists on the cluster with the same base name - restart it for instance in instances: if instance.spec.cluster_id == cluster_id: existing_instance = instance diff --git a/tests/tests_app/runners/test_cloud.py b/tests/tests_app/runners/test_cloud.py index 1e5f3c9433efb..a331465e0fc2f 100644 --- a/tests/tests_app/runners/test_cloud.py +++ b/tests/tests_app/runners/test_cloud.py @@ -159,7 +159,6 @@ def test_run_on_deleted_cluster(self, cloud_backend): with pytest.raises(ValueError, match="that cluster doesn't exist"): cloud_runtime.dispatch(name=app_name, cluster_id="unknown-cluster") - # TODO: remove this test once there is support for multiple instances @pytest.mark.parametrize( "old_cluster,new_cluster", [ @@ -210,6 +209,7 @@ def test_new_instance_on_different_cluster(self, cloud_backend, project_id, old_ app.frontend = {} existing_instance = MagicMock() + existing_instance.name = app_name existing_instance.status.phase = V1LightningappInstanceState.STOPPED existing_instance.spec.cluster_id = old_cluster or DEFAULT_CLUSTER mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( @@ -449,9 +449,11 @@ def test_call_with_work_app(self, lightningapps, start_with_flow, monkeypatch, t Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() + (source_code_root_dir / "entrypoint.py").touch() mock_client = mock.MagicMock() if lightningapps: + lightningapps[0].name = "myapp" lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED lightningapps[0].spec.cluster_id = "test" mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( @@ -621,9 +623,11 @@ def test_call_with_work_app_and_attached_drives(self, lightningapps, monkeypatch Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() + (source_code_root_dir / "entrypoint.py").touch() mock_client = mock.MagicMock() if lightningapps: + lightningapps[0].name = "myapp" lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED lightningapps[0].spec.cluster_id = "test" mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = ( @@ -763,9 +767,11 @@ def test_call_with_work_app_and_app_comment_command_execution_set(self, lightnin Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() + (source_code_root_dir / "entrypoint.py").touch() mock_client = mock.MagicMock() if lightningapps: + lightningapps[0].name = "myapp" lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED lightningapps[0].spec.cluster_id = "test" mock_client.projects_service_list_project_cluster_bindings.return_value = ( @@ -887,9 +893,11 @@ def test_call_with_work_app_and_multiple_attached_drives(self, lightningapps, mo Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() + (source_code_root_dir / "entrypoint.py").touch() mock_client = mock.MagicMock() if lightningapps: + lightningapps[0].name = "myapp" lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED lightningapps[0].spec.cluster_id = "test" mock_client.projects_service_list_project_cluster_bindings.return_value = ( @@ -1100,9 +1108,11 @@ def test_call_with_work_app_and_attached_mount_and_drive(self, lightningapps, mo Path(source_code_root_dir / ".lightning").write_text("name: myapp") requirements_file = Path(source_code_root_dir / "requirements.txt") Path(requirements_file).touch() + (source_code_root_dir / "entrypoint.py").touch() mock_client = mock.MagicMock() if lightningapps: + lightningapps[0].name = "myapp" lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED lightningapps[0].spec.cluster_id = "test" mock_client.projects_service_list_project_cluster_bindings.return_value = (