Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Support running on multiple clusters #16016

Merged
merged 6 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 112 additions & 93 deletions src/lightning_app/runners/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import time
from dataclasses import dataclass
from pathlib import Path
from textwrap import dedent
from typing import Any, List, Optional, Union

import click
Expand All @@ -19,6 +18,7 @@
Externalv1LightningappInstance,
Gridv1ImageSpec,
V1BuildSpec,
V1ClusterType,
V1DependencyFileInfo,
V1Drive,
V1DriveSpec,
Expand Down Expand Up @@ -210,8 +210,6 @@ def dispatch(
# Determine the root of the project: Start at the entrypoint_file and look for nearby Lightning config files,
# going up the directory structure. The root of the project is where the Lightning config file is located.

# TODO: verify lightning version
# _verify_lightning_version()
config_file = _get_config_file(self.entrypoint_file)
app_config = AppConfig.load_from_file(config_file) if config_file.exists() else AppConfig()
root = Path(self.entrypoint_file).absolute().parent
Expand All @@ -228,10 +226,6 @@ def dispatch(
# Override the name if provided by the CLI
app_config.name = name

if cluster_id:
# Override the cluster ID if provided by the CLI
app_config.cluster_id = cluster_id

print(f"The name of the app is: {app_config.name}")

v1_env_vars = [V1EnvVar(name=k, value=v) for k, v in self.env_vars.items()]
Expand Down Expand Up @@ -293,17 +287,93 @@ def dispatch(
project = _get_project(self.backend.client)

try:
list_apps_resp = self.backend.client.lightningapp_v2_service_list_lightningapps_v2(
project_id=project.project_id, name=app_config.name
if cluster_id is not None:
# Verify that the cluster exists
list_clusters_resp = self.backend.client.cluster_service_list_clusters()
cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters]
if cluster_id not in cluster_ids:
msg = f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist."
raise ValueError(msg)

self._ensure_cluster_project_binding(project.project_id, cluster_id)

# Resolve the app name, instance, and cluster ID
existing_instance = None
app_name = app_config.name

# List existing instances
find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances(
project_id=project.project_id
)
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved
if list_apps_resp.lightningapps:
# There can be only one app with unique project_id<>name pair
lit_app = list_apps_resp.lightningapps[0]
else:
app_body = Body7(name=app_config.name, can_download_source_code=True)

# Seach for instances whose name starts with the given name
instances = [
lightningapp
for lightningapp in find_instances_resp.lightningapps
if lightningapp.name.startswith(app_name)
tchaton marked this conversation as resolved.
Show resolved Hide resolved
]

# If instances exist and cluster is None, mimic cluster selection logic to choose a default
if cluster_id is None and len(instances) > 0:
# Determine the cluster ID
cluster_id = self._get_default_cluster(project.project_id)

# If an instance exists on the cluster with the base name - restart it
tchaton marked this conversation as resolved.
Show resolved Hide resolved
for instance in instances:
if instance.spec.cluster_id == cluster_id:
existing_instance = instance
break

# If instances exist but not on the cluster - choose a randomised name
if len(instances) > 0 and existing_instance is None:
tchaton marked this conversation as resolved.
Show resolved Hide resolved
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"

name_exists = True
while name_exists:
random_name = app_name + "-" + "".join(random.sample(letters, 4))
name_exists = any([instance.name == random_name for instance in instances])

app_name = random_name
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved

# Create the app if it doesn't exist
if existing_instance is None:
app_body = Body7(name=app_name, can_download_source_code=True)
lit_app = self.backend.client.lightningapp_v2_service_create_lightningapp_v2(
project_id=project.project_id, body=app_body
)
app_id = lit_app.id
else:
app_id = existing_instance.spec.app_id

# check if user has sufficient credits to run an app
# if so set the desired state to running otherwise, create the app in stopped state,
# and open the admin ui to add credits and running the app.
has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app)
app_release_desired_state = (
V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED
)
if not has_sufficient_credits:
logger.warn("You may need Lightning credits to run your apps on the cloud.")

# Stop the instance if it isn't stopped yet
if existing_instance and existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
# TODO(yurij): Implement release switching in the UI and remove this
# We can only switch release of the stopped instance
existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project.project_id,
id=existing_instance.id,
body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)),
)
# wait for the instance to stop for up to 150 seconds
for _ in range(150):
existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance(
project_id=project.project_id, id=existing_instance.id
)
if existing_instance.status.phase == V1LightningappInstanceState.STOPPED:
break
time.sleep(1)
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
raise RuntimeError("Failed to stop the existing instance.")

network_configs: Optional[List[V1NetworkConfig]] = None
if enable_multiple_works_in_default_container():
Expand All @@ -318,90 +388,18 @@ def dispatch(
)
initial_port += 1

# check if user has sufficient credits to run an app
# if so set the desired state to running otherwise, create the app in stopped state,
# and open the admin ui to add credits and running the app.
has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app)
app_release_desired_state = (
V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED
)
if not has_sufficient_credits:
logger.warn("You may need Lightning credits to run your apps on the cloud.")

# right now we only allow a single instance of the app
find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances(
project_id=project.project_id, app_id=lit_app.id
)

queue_server_type = V1QueueServerType.UNSPECIFIED
if CLOUD_QUEUE_TYPE == "http":
queue_server_type = V1QueueServerType.HTTP
elif CLOUD_QUEUE_TYPE == "redis":
queue_server_type = V1QueueServerType.REDIS

existing_instance: Optional[Externalv1LightningappInstance] = None
if find_instances_resp.lightningapps:
existing_instance = find_instances_resp.lightningapps[0]

if not app_config.cluster_id:
# Re-run the app on the same cluster
app_config.cluster_id = existing_instance.spec.cluster_id

if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
# TODO(yurij): Implement release switching in the UI and remove this
# We can only switch release of the stopped instance
existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project.project_id,
id=existing_instance.id,
body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)),
)
# wait for the instance to stop for up to 150 seconds
for _ in range(150):
existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance(
project_id=project.project_id, id=existing_instance.id
)
if existing_instance.status.phase == V1LightningappInstanceState.STOPPED:
break
time.sleep(1)
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
raise RuntimeError("Failed to stop the existing instance.")

if app_config.cluster_id is not None:
# Verify that the cluster exists
list_clusters_resp = self.backend.client.cluster_service_list_clusters()
cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters]
if app_config.cluster_id not in cluster_ids:
if cluster_id:
msg = f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist."
else:
msg = (
f"Your app last ran on cluster {app_config.cluster_id}, but that cluster "
"doesn't exist anymore."
)
raise ValueError(msg)
if existing_instance and existing_instance.spec.cluster_id != app_config.cluster_id:
raise ValueError(
dedent(
f"""\
An app names {app_config.name} is already running on cluster {existing_instance.spec.cluster_id}, and you requested it to run on cluster {app_config.cluster_id}.

In order to proceed, please either:
a. rename the app to run on {app_config.cluster_id} with the --name option
lightning run app {app_entrypoint_file} --name (new name) --cloud --cluster-id {app_config.cluster_id}
b. delete the app running on {existing_instance.spec.cluster_id} in the UI before running this command.
""" # noqa: E501
)
)

if app_config.cluster_id is not None:
self._ensure_cluster_project_binding(project.project_id, app_config.cluster_id)

release_body = Body8(
app_entrypoint_file=app_spec.app_entrypoint_file,
enable_app_server=app_spec.enable_app_server,
flow_servers=app_spec.flow_servers,
image_spec=app_spec.image_spec,
cluster_id=app_config.cluster_id,
cluster_id=cluster_id,
network_config=network_configs,
works=works,
local_source=True,
Expand All @@ -412,14 +410,13 @@ def dispatch(

# create / upload the new app release
lightning_app_release = self.backend.client.lightningapp_v2_service_create_lightningapp_release(
project_id=project.project_id, app_id=lit_app.id, body=release_body
project_id=project.project_id, app_id=app_id, body=release_body
)

if lightning_app_release.source_upload_url == "":
raise RuntimeError("The source upload url is empty.")

if getattr(lightning_app_release, "cluster_id", None):
app_config.cluster_id = lightning_app_release.cluster_id
logger.info(f"Running app on {lightning_app_release.cluster_id}")

# Save the config for re-runs
Expand All @@ -428,7 +425,7 @@ def dispatch(
repo.package()
repo.upload(url=lightning_app_release.source_upload_url)

if find_instances_resp.lightningapps:
if existing_instance is not None:
lightning_app_instance = (
self.backend.client.lightningapp_instance_service_update_lightningapp_instance_release(
project_id=project.project_id,
Expand All @@ -452,12 +449,12 @@ def dispatch(
lightning_app_instance = (
self.backend.client.lightningapp_v2_service_create_lightningapp_release_instance(
project_id=project.project_id,
app_id=lit_app.id,
app_id=app_id,
id=lightning_app_release.id,
body=Body9(
cluster_id=app_config.cluster_id,
cluster_id=cluster_id,
desired_state=app_release_desired_state,
name=lit_app.name,
name=app_name,
env=v1_env_vars,
queue_server_type=queue_server_type,
),
Expand Down Expand Up @@ -490,6 +487,28 @@ def _ensure_cluster_project_binding(self, project_id: str, cluster_id: str):
body=V1ProjectClusterBinding(cluster_id=cluster_id, project_id=project_id),
)

def _get_default_cluster(self, project_id: str) -> str:
"""This utility implements a minimal version of the cluster selection logic used in the cloud.

TODO: This should be requested directly from the platform.
"""
cluster_bindings = self.backend.client.projects_service_list_project_cluster_bindings(
project_id=project_id
).clusters

if len(cluster_bindings) == 1:
return cluster_bindings[0].cluster_id

clusters = [
self.backend.client.cluster_service_get_cluster(cluster_binding.cluster_id)
for cluster_binding in cluster_bindings
]

# Filter global clusters
clusters = [cluster for cluster in clusters if cluster.spec.cluster_type == V1ClusterType.GLOBAL]

return random.choice(clusters).id

@staticmethod
def _check_uploaded_folder(root: Path, repo: LocalSourceCodeDir) -> None:
"""This method is used to inform the users if their folder files are large and how to filter them."""
Expand Down
5 changes: 3 additions & 2 deletions src/lightning_app/utilities/packaging/app_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pathlib
from dataclasses import asdict, dataclass, field
from typing import Optional, Union
from typing import Union

import yaml

Expand All @@ -18,7 +18,6 @@ class AppConfig:
"""

name: str = field(default_factory=get_unique_name)
cluster_id: Optional[str] = field(default=None)

def save_to_file(self, path: Union[str, pathlib.Path]) -> None:
"""Save the configuration to the given file in YAML format."""
Expand All @@ -35,6 +34,8 @@ def load_from_file(cls, path: Union[str, pathlib.Path]) -> "AppConfig":
"""Load the configuration from the given file."""
with open(path) as file:
config = yaml.safe_load(file)
# Ignore `cluster_id` without error for backwards compatibility.
config.pop("cluster_id", None)
return cls(**config)

@classmethod
Expand Down
Loading