Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent bug when launching apps on multiple clusters #15226

1 change: 1 addition & 0 deletions src/lightning_app/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Fixed an issue when using the CLI without arguments ([#14877](https://github.com/Lightning-AI/lightning/pull/14877))
- Fixed a bug where the upload files endpoint would raise an error when running locally ([#14924](https://github.com/Lightning-AI/lightning/pull/14924))
- Fixed a bug when launching an app on multiple clusters ([#15226](https://github.com/Lightning-AI/lightning/pull/15226))

## [0.6.2] - 2022-09-21

Expand Down
37 changes: 24 additions & 13 deletions src/lightning_app/runners/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,6 @@ def dispatch(
if cluster_id is not None:
self._ensure_cluster_project_binding(project.project_id, cluster_id)

lightning_app_release = self.backend.client.lightningapp_v2_service_create_lightningapp_release(
project_id=project.project_id, app_id=lit_app.id, body=release_body
)

if cluster_id is not None:
logger.info(f"running app on {lightning_app_release.cluster_id}")

if lightning_app_release.source_upload_url == "":
raise RuntimeError("The source upload url is empty.")

repo.package()
repo.upload(url=lightning_app_release.source_upload_url)

# check if user has sufficient credits to run an app
# if so set the desired state to running otherwise, create the app in stopped state,
# and open the admin ui to add credits and running the app.
Expand All @@ -305,6 +292,15 @@ def dispatch(

if find_instances_resp.lightningapps:
existing_instance = find_instances_resp.lightningapps[0]

# TODO: support multiple instances / 1 instance per cluster
if existing_instance.spec.cluster_id != cluster_id:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luca3rd I am confused about this. If the app already exists on a cluster, than it means the user is going to restart it when a new release.

raise ValueError(
f"Can not start app '{name}' on cluster '{cluster_id}' "
f"since this app already exists on '{existing_instance.spec.cluster_id}'. "
"To run it on another cluster, give it a new name with the --name option."
)

if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
# TODO(yurij): Implement release switching in the UI and remove this
# We can only switch release of the stopped instance
Expand All @@ -324,6 +320,21 @@ def dispatch(
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
raise RuntimeError("Failed to stop the existing instance.")

# create / upload the new app release / instace
lightning_app_release = self.backend.client.lightningapp_v2_service_create_lightningapp_release(
project_id=project.project_id, app_id=lit_app.id, body=release_body
)

if cluster_id is not None:
logger.info(f"running app on {lightning_app_release.cluster_id}")

if lightning_app_release.source_upload_url == "":
raise RuntimeError("The source upload url is empty.")

repo.package()
repo.upload(url=lightning_app_release.source_upload_url)

if find_instances_resp.lightningapps:
lightning_app_instance = (
self.backend.client.lightningapp_instance_service_update_lightningapp_instance_release(
project_id=project.project_id,
Expand Down
59 changes: 59 additions & 0 deletions tests/tests_app/runners/test_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,60 @@ def run(self):
class TestAppCreationClient:
"""Testing the calls made using GridRestClient to create the app."""

# TODO: remove this test once there is support for multiple instances
@mock.patch("lightning_app.runners.backends.cloud.LightningClient", mock.MagicMock())
def test_new_instance_on_different_cluster_fails(self, monkeypatch):
app_name = "test-app-name"
original_cluster = "cluster-001"
new_cluster = "cluster-002"

mock_client = mock.MagicMock()
mock_client.projects_service_list_memberships.return_value = V1ListMembershipsResponse(
memberships=[V1Membership(name="Default Project", project_id="default-project-id")]
)

cloud_backend = mock.MagicMock()
cloud_backend.client = mock_client
monkeypatch.setattr(cloud, "LocalSourceCodeDir", mock.MagicMock())
monkeypatch.setattr(cloud, "_prepare_lightning_wheels_and_requirements", mock.MagicMock())
monkeypatch.setattr(backends, "CloudBackend", mock.MagicMock(return_value=cloud_backend))

app = mock.MagicMock()
app.flows = []
app.frontend = {}

existing_instance = MagicMock()
existing_instance.status.phase = V1LightningappInstanceState.STOPPED
existing_instance.spec.cluster_id = original_cluster
mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = (
V1ListLightningappInstancesResponse(lightningapps=[existing_instance])
)

cloud_runtime = cloud.CloudRuntime(app=app, entrypoint_file="entrypoint.py")
cloud_runtime._check_uploaded_folder = mock.MagicMock()

# without requirements file
# setting is_file to False so requirements.txt existence check will return False
monkeypatch.setattr(Path, "is_file", lambda *args, **kwargs: False)
monkeypatch.setattr(cloud, "Path", Path)

# This is the main assertion:
# we have an existing instance on `cluster-001`
# but we want to run this app on `cluster-002`
with pytest.raises(ValueError) as exc:
cloud_runtime.dispatch(name=app_name, cluster_id=new_cluster)

assert exc.match(
f"Can not start app '{app_name}' on cluster '{new_cluster}' "
f"since this app already exists on '{original_cluster}'. "
"To run it on another cluster, give it a new name with the --name option."
)
cloud_runtime.backend.client.lightningapp_v2_service_create_lightningapp_release.assert_not_called()
cloud_runtime.backend.client.projects_service_create_project_cluster_binding.assert_called_once_with(
project_id="default-project-id",
body=V1ProjectClusterBinding(cluster_id=new_cluster, project_id="default-project-id"),
)

@mock.patch("lightning_app.runners.backends.cloud.LightningClient", mock.MagicMock())
def test_run_with_custom_flow_compute_config(self, monkeypatch):
mock_client = mock.MagicMock()
Expand Down Expand Up @@ -265,6 +319,7 @@ def test_call_with_work_app(self, lightningapps, monkeypatch, tmpdir):
mock_client = mock.MagicMock()
if lightningapps:
lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED
lightningapps[0].spec.cluster_id = None
mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = (
V1ListLightningappInstancesResponse(lightningapps=lightningapps)
)
Expand Down Expand Up @@ -420,6 +475,7 @@ def test_call_with_work_app_and_attached_drives(self, lightningapps, monkeypatch
mock_client = mock.MagicMock()
if lightningapps:
lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED
lightningapps[0].spec.cluster_id = None
mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = (
V1ListLightningappInstancesResponse(lightningapps=lightningapps)
)
Expand Down Expand Up @@ -547,6 +603,7 @@ def test_call_with_work_app_and_multiple_attached_drives(self, lightningapps, mo
mock_client = mock.MagicMock()
if lightningapps:
lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED
lightningapps[0].spec.cluster_id = None
mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = (
V1ListLightningappInstancesResponse(lightningapps=lightningapps)
)
Expand Down Expand Up @@ -737,6 +794,7 @@ def test_call_with_work_app_and_attached_mount_and_drive(self, lightningapps, mo
mock_client = mock.MagicMock()
if lightningapps:
lightningapps[0].status.phase = V1LightningappInstanceState.STOPPED
lightningapps[0].spec.cluster_id = None
mock_client.lightningapp_instance_service_list_lightningapp_instances.return_value = (
V1ListLightningappInstancesResponse(lightningapps=lightningapps)
)
Expand All @@ -747,6 +805,7 @@ def test_call_with_work_app_and_attached_mount_and_drive(self, lightningapps, mo
)
existing_instance = MagicMock()
existing_instance.status.phase = V1LightningappInstanceState.STOPPED
existing_instance.spec.cluster_id = None
mock_client.lightningapp_service_get_lightningapp = MagicMock(return_value=existing_instance)
cloud_backend = mock.MagicMock()
cloud_backend.client = mock_client
Expand Down