-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[App] Improve cluster creation / deletion experience (#15458)
Cluster creation and deletion can take a long time. Instead of having these long running operations happen in the background, they should happen in the foreground. The advantage is that failures are brought to the users attention immediately, instead of the next time they decide to run `lightning list clusters`. While the CLI waits for the cluster to run / delete, it will display cluster status changes to the user. This PR also hides the `--enable-performance` and `--edit-before-creation` creation flags, as well as the `--force` deletion flag. They are either not frequently used (performance mode is expensive), or prone to misuse. Co-authored-by: Neven Miculinic <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Raphael Randschau <[email protected]> (cherry picked from commit 33e1f93)
- Loading branch information
Showing
7 changed files
with
251 additions
and
108 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,10 @@ | |
import time | ||
from datetime import datetime | ||
from textwrap import dedent | ||
from typing import Any, List | ||
from typing import Any, List, Union | ||
|
||
import click | ||
import lightning_cloud | ||
from lightning_cloud.openapi import ( | ||
Externalv1Cluster, | ||
V1AWSClusterDriverSpec, | ||
|
@@ -15,8 +16,10 @@ | |
V1ClusterState, | ||
V1ClusterType, | ||
V1CreateClusterRequest, | ||
V1GetClusterResponse, | ||
V1KubernetesClusterDriver, | ||
) | ||
from lightning_utilities.core.enums import StrEnum | ||
from rich.console import Console | ||
from rich.table import Table | ||
from rich.text import Text | ||
|
@@ -25,10 +28,26 @@ | |
from lightning_app.utilities.network import LightningClient | ||
from lightning_app.utilities.openapi import create_openapi_object, string2dict | ||
|
||
CLUSTER_STATE_CHECKING_TIMEOUT = 60 | ||
MAX_CLUSTER_WAIT_TIME = 5400 | ||
|
||
|
||
class ClusterState(StrEnum): | ||
UNSPECIFIED = "unspecified" | ||
QUEUED = "queued" | ||
PENDING = "pending" | ||
RUNNING = "running" | ||
FAILED = "error" | ||
DELETED = "deleted" | ||
|
||
def __str__(self) -> str: | ||
return str(self.value) | ||
|
||
@classmethod | ||
def from_api(cls, status: V1ClusterState) -> "ClusterState": | ||
parsed = str(status).lower().split("_", maxsplit=2)[-1] | ||
return cls.from_str(parsed) | ||
|
||
|
||
class ClusterList(Formatable): | ||
def __init__(self, clusters: List[Externalv1Cluster]): | ||
self.clusters = clusters | ||
|
@@ -86,7 +105,7 @@ def create( | |
region: str = "us-east-1", | ||
external_id: str = None, | ||
edit_before_creation: bool = False, | ||
wait: bool = False, | ||
do_async: bool = False, | ||
) -> None: | ||
"""request Lightning AI BYOC compute cluster creation. | ||
|
@@ -97,7 +116,7 @@ def create( | |
region: AWS region containing compute resources | ||
external_id: AWS IAM Role external ID | ||
edit_before_creation: Enables interactive editing of requests before submitting it to Lightning AI. | ||
wait: Waits for the cluster to be in a RUNNING state. Only use this for debugging. | ||
do_async: Triggers cluster creation in the background and exits | ||
""" | ||
performance_profile = V1ClusterPerformanceProfile.DEFAULT | ||
if cost_savings: | ||
|
@@ -130,22 +149,31 @@ def create( | |
click.echo("cluster unchanged") | ||
|
||
resp = self.api_client.cluster_service_create_cluster(body=new_body) | ||
if wait: | ||
_wait_for_cluster_state(self.api_client, resp.id, V1ClusterState.RUNNING) | ||
|
||
click.echo( | ||
dedent( | ||
f"""\ | ||
{resp.id} is now being created... This can take up to an hour. | ||
BYOC cluster creation triggered successfully! | ||
This can take up to an hour to complete. | ||
To view the status of your clusters use: | ||
`lightning list clusters` | ||
lightning list clusters | ||
To view cluster logs use: | ||
`lightning show cluster logs {resp.id}` | ||
""" | ||
lightning show cluster logs {cluster_name} | ||
To delete the cluster run: | ||
lightning delete cluster {cluster_name} | ||
""" | ||
) | ||
) | ||
background_message = "\nCluster will be created in the background!" | ||
if do_async: | ||
click.echo(background_message) | ||
else: | ||
try: | ||
_wait_for_cluster_state(self.api_client, resp.id, V1ClusterState.RUNNING) | ||
except KeyboardInterrupt: | ||
click.echo(background_message) | ||
|
||
def get_clusters(self) -> ClusterList: | ||
resp = self.api_client.cluster_service_list_clusters(phase_not_in=[V1ClusterState.DELETED]) | ||
|
@@ -156,7 +184,7 @@ def list(self) -> None: | |
console = Console() | ||
console.print(clusters.as_table()) | ||
|
||
def delete(self, cluster_id: str, force: bool = False, wait: bool = False) -> None: | ||
def delete(self, cluster_id: str, force: bool = False, do_async: bool = False) -> None: | ||
if force: | ||
click.echo( | ||
""" | ||
|
@@ -167,47 +195,86 @@ def delete(self, cluster_id: str, force: bool = False, wait: bool = False) -> No | |
) | ||
click.confirm("Do you want to continue?", abort=True) | ||
|
||
resp: V1GetClusterResponse = self.api_client.cluster_service_get_cluster(id=cluster_id) | ||
bucket_name = resp.spec.driver.kubernetes.aws.bucket_name | ||
|
||
self.api_client.cluster_service_delete_cluster(id=cluster_id, force=force) | ||
click.echo("Cluster deletion triggered successfully") | ||
click.echo( | ||
dedent( | ||
f"""\ | ||
Cluster deletion triggered successfully | ||
For safety purposes we will not delete anything in the S3 bucket associated with the cluster: | ||
{bucket_name} | ||
if wait: | ||
_wait_for_cluster_state(self.api_client, cluster_id, V1ClusterState.DELETED) | ||
You may want to delete it manually using the AWS CLI: | ||
aws s3 rb --force s3://{bucket_name} | ||
""" | ||
) | ||
) | ||
|
||
background_message = "\nCluster will be deleted in the background!" | ||
if do_async: | ||
click.echo(background_message) | ||
else: | ||
try: | ||
_wait_for_cluster_state(self.api_client, cluster_id, V1ClusterState.DELETED) | ||
except KeyboardInterrupt: | ||
click.echo(background_message) | ||
|
||
|
||
def _wait_for_cluster_state( | ||
api_client: LightningClient, | ||
cluster_id: str, | ||
target_state: V1ClusterState, | ||
max_wait_time: int = MAX_CLUSTER_WAIT_TIME, | ||
check_timeout: int = CLUSTER_STATE_CHECKING_TIMEOUT, | ||
timeout_seconds: int = MAX_CLUSTER_WAIT_TIME, | ||
poll_duration_seconds: int = 10, | ||
) -> None: | ||
"""_wait_for_cluster_state waits until the provided cluster has reached a desired state, or failed. | ||
Messages will be displayed to the user as the cluster changes state. | ||
We poll the API server for any changes | ||
Args: | ||
api_client: LightningClient used for polling | ||
cluster_id: Specifies the cluster to wait for | ||
target_state: Specifies the desired state the target cluster needs to meet | ||
max_wait_time: Maximum duration to wait (in seconds) | ||
check_timeout: duration between polling for the cluster state (in seconds) | ||
timeout_seconds: Maximum duration to wait | ||
poll_duration_seconds: duration between polling for the cluster state | ||
""" | ||
start = time.time() | ||
elapsed = 0 | ||
while elapsed < max_wait_time: | ||
cluster_resp = api_client.cluster_service_list_clusters() | ||
new_cluster = None | ||
for clust in cluster_resp.clusters: | ||
if clust.id == cluster_id: | ||
new_cluster = clust | ||
break | ||
if new_cluster is not None: | ||
if new_cluster.status.phase == target_state: | ||
|
||
click.echo(f"Waiting for cluster to be {ClusterState.from_api(target_state)}...") | ||
while elapsed < timeout_seconds: | ||
try: | ||
resp: V1GetClusterResponse = api_client.cluster_service_get_cluster(id=cluster_id) | ||
click.echo(_cluster_status_long(cluster=resp, desired_state=target_state, elapsed=elapsed)) | ||
if resp.status.phase == target_state: | ||
break | ||
elif new_cluster.status.phase == V1ClusterState.FAILED: | ||
raise click.ClickException(f"Cluster {cluster_id} is in failed state.") | ||
time.sleep(check_timeout) | ||
elapsed = int(time.time() - start) | ||
time.sleep(poll_duration_seconds) | ||
elapsed = int(time.time() - start) | ||
except lightning_cloud.openapi.rest.ApiException as e: | ||
if e.status == 404 and target_state == V1ClusterState.DELETED: | ||
return | ||
raise | ||
else: | ||
raise click.ClickException("Max wait time elapsed") | ||
state_str = ClusterState.from_api(target_state) | ||
raise click.ClickException( | ||
dedent( | ||
f"""\ | ||
The cluster has not entered the {state_str} state within {_format_elapsed_seconds(timeout_seconds)}. | ||
The cluster may eventually be {state_str} afterwards, please check its status using: | ||
lighting list clusters | ||
To view cluster logs use: | ||
lightning show cluster logs {cluster_id} | ||
Contact [email protected] for additional help | ||
""" | ||
) | ||
) | ||
|
||
|
||
def _check_cluster_name_is_valid(_ctx: Any, _param: Any, value: str) -> str: | ||
|
@@ -219,3 +286,76 @@ def _check_cluster_name_is_valid(_ctx: Any, _param: Any, value: str) -> str: | |
Provide a cluster name using valid characters and try again.""" | ||
) | ||
return value | ||
|
||
|
||
def _cluster_status_long(cluster: V1GetClusterResponse, desired_state: V1ClusterState, elapsed: float) -> str: | ||
"""Echos a long-form status message to the user about the cluster state. | ||
Args: | ||
cluster: The cluster object | ||
elapsed: Seconds since we've started polling | ||
""" | ||
|
||
cluster_name = cluster.name | ||
current_state = cluster.status.phase | ||
current_reason = cluster.status.reason | ||
bucket_name = cluster.spec.driver.kubernetes.aws.bucket_name | ||
|
||
duration = _format_elapsed_seconds(elapsed) | ||
|
||
if current_state == V1ClusterState.FAILED: | ||
return dedent( | ||
f"""\ | ||
The requested cluster operation for cluster {cluster_name} has errors: | ||
{current_reason} | ||
--- | ||
We are automatically retrying, and an automated alert has been created | ||
WARNING: Any non-deleted cluster may be using resources. | ||
To avoid incuring cost on your cloud provider, delete the cluster using the following command: | ||
lightning delete cluster {cluster_name} | ||
Contact [email protected] for additional help | ||
""" | ||
) | ||
|
||
if desired_state == current_state == V1ClusterState.RUNNING: | ||
return dedent( | ||
f"""\ | ||
Cluster {cluster_name} is now running and ready to use. | ||
To launch an app on this cluster use: lightning run app app.py --cloud --cluster-id {cluster_name} | ||
""" | ||
) | ||
|
||
if desired_state == V1ClusterState.RUNNING: | ||
return f"Cluster {cluster_name} is being created [elapsed={duration}]" | ||
|
||
if desired_state == current_state == V1ClusterState.DELETED: | ||
return dedent( | ||
f"""\ | ||
Cluster {cluster_name} has been successfully deleted, and almost all AWS resources have been removed | ||
For safety purposes we kept the S3 bucket associated with the cluster: {bucket_name} | ||
You may want to delete it manually using the AWS CLI: | ||
aws s3 rb --force s3://{bucket_name} | ||
""" | ||
) | ||
|
||
if desired_state == V1ClusterState.DELETED: | ||
return f"Cluster {cluster_name} is being deleted [elapsed={duration}]" | ||
|
||
raise click.ClickException(f"Unknown cluster desired state {desired_state}") | ||
|
||
|
||
def _format_elapsed_seconds(seconds: Union[float, int]) -> str: | ||
"""Turns seconds into a duration string. | ||
>>> _format_elapsed_seconds(5) | ||
'05s' | ||
>>> _format_elapsed_seconds(60) | ||
'01m00s' | ||
""" | ||
minutes, seconds = divmod(seconds, 60) | ||
return (f"{minutes:02}m" if minutes else "") + f"{seconds:02}s" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.