From c855ee4c8b69b8788c2241ceb5119d4bca811d32 Mon Sep 17 00:00:00 2001 From: Yi Chen Date: Thu, 19 Sep 2024 20:40:29 +0800 Subject: [PATCH] Fix: spark application does not respect time to live seconds (#2165) * Add time to live seconds example spark application Signed-off-by: Yi Chen * fix: spark application does not respect time to live seconds Signed-off-by: Yi Chen --------- Signed-off-by: Yi Chen --- examples/spark-pi-ttl.yaml | 37 ++++++ .../controller/sparkapplication/controller.go | 125 ++++++++---------- pkg/util/sparkapplication.go | 6 + 3 files changed, 95 insertions(+), 73 deletions(-) create mode 100644 examples/spark-pi-ttl.yaml diff --git a/examples/spark-pi-ttl.yaml b/examples/spark-pi-ttl.yaml new file mode 100644 index 000000000..68e6dd413 --- /dev/null +++ b/examples/spark-pi-ttl.yaml @@ -0,0 +1,37 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: spark-pi-ttl + namespace: default +spec: + type: Scala + mode: cluster + image: spark:3.5.2 + imagePullPolicy: IfNotPresent + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar + sparkVersion: 3.5.2 + timeToLiveSeconds: 30 + driver: + cores: 1 + memory: 512m + serviceAccount: spark-operator-spark + executor: + instances: 1 + cores: 1 + memory: 512m diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 6031f700e..e257b37f5 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -371,9 +371,11 @@ func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req c if err := r.updateSparkApplicationState(ctx, app); err != nil { return err } + if err := r.updateSparkApplicationStatus(ctx, app); err != nil { return err } + return nil }, ) @@ -529,85 +531,62 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c } func (r *Reconciler) reconcileCompletedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - key := req.NamespacedName - retryErr := retry.RetryOnConflict( - retry.DefaultRetry, - func() error { - old, err := r.getSparkApplication(key) - if err != nil { - return err - } - if old.Status.AppState.State != v1beta2.ApplicationStateCompleted { - return nil - } - app := old.DeepCopy() - - if util.IsExpired(app) { - logger.Info("Deleting expired SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) - if err := r.client.Delete(ctx, app); err != nil { - return err - } - return nil - } - if err := r.updateExecutorState(ctx, app); err != nil { - return err - } - if err := r.updateSparkApplicationStatus(ctx, app); err != nil { - return err - } - if err := r.cleanUpOnTermination(old, app); err != nil { - logger.Error(err, "Failed to clean up resources for SparkApplication", "name", old.Name, "namespace", old.Namespace, "state", old.Status.AppState.State) - return err - } - return nil - }, - ) - if retryErr != nil { - logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) - return ctrl.Result{}, retryErr - } - return ctrl.Result{}, nil + return r.reconcileTerminatedSparkApplication(ctx, req) } func (r *Reconciler) reconcileFailedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + return r.reconcileTerminatedSparkApplication(ctx, req) +} + +func (r *Reconciler) reconcileTerminatedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { key := req.NamespacedName - retryErr := retry.RetryOnConflict( - retry.DefaultRetry, - func() error { - old, err := r.getSparkApplication(key) - if err != nil { - return err - } - if old.Status.AppState.State != v1beta2.ApplicationStateFailed { - return nil - } - app := old.DeepCopy() + old, err := r.getSparkApplication(key) + if err != nil { + return ctrl.Result{Requeue: true}, err + } - if util.IsExpired(app) { - logger.Info("Deleting expired SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) - if err := r.client.Delete(ctx, app); err != nil { - return err - } - return nil - } - if err := r.updateExecutorState(ctx, app); err != nil { - return err - } - if err := r.updateSparkApplicationStatus(ctx, app); err != nil { - return err - } - if err := r.cleanUpOnTermination(old, app); err != nil { - logger.Error(err, "Failed to clean up resources for SparkApplication", "name", old.Name, "namespace", old.Namespace, "state", old.Status.AppState.State) - return err - } - return nil - }, - ) - if retryErr != nil { - logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) - return ctrl.Result{}, retryErr + app := old.DeepCopy() + if !util.IsTerminated(app) { + return ctrl.Result{}, nil } - return ctrl.Result{}, nil + + if util.IsExpired(app) { + logger.Info("Deleting expired SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + if err := r.client.Delete(ctx, app); err != nil { + return ctrl.Result{Requeue: true}, err + } + return ctrl.Result{}, nil + } + + if err := r.updateExecutorState(ctx, app); err != nil { + return ctrl.Result{Requeue: true}, err + } + + if err := r.updateSparkApplicationStatus(ctx, app); err != nil { + return ctrl.Result{Requeue: true}, err + } + + if err := r.cleanUpOnTermination(old, app); err != nil { + logger.Error(err, "Failed to clean up resources for SparkApplication", "name", old.Name, "namespace", old.Namespace, "state", old.Status.AppState.State) + return ctrl.Result{Requeue: true}, err + } + + // If termination time or TTL is not set, will not requeue this application. + if app.Status.TerminationTime.IsZero() || app.Spec.TimeToLiveSeconds == nil || *app.Spec.TimeToLiveSeconds <= 0 { + return ctrl.Result{}, nil + } + + // Otherwise, requeue the application for subsequent deletion. + now := time.Now() + ttl := time.Duration(*app.Spec.TimeToLiveSeconds) * time.Second + survival := now.Sub(app.Status.TerminationTime.Time) + + // If survival time is greater than TTL, requeue the application immediately. + if survival >= ttl { + return ctrl.Result{Requeue: true}, nil + } + // Otherwise, requeue the application after (TTL - survival) seconds. + return ctrl.Result{RequeueAfter: ttl - survival}, nil } func (r *Reconciler) reconcileUnknownSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { diff --git a/pkg/util/sparkapplication.go b/pkg/util/sparkapplication.go index aaa8b9456..29b8dab81 100644 --- a/pkg/util/sparkapplication.go +++ b/pkg/util/sparkapplication.go @@ -51,6 +51,12 @@ func GetApplicationState(app *v1beta2.SparkApplication) v1beta2.ApplicationState return app.Status.AppState.State } +// IsTerminated returns whether the given SparkApplication is terminated. +func IsTerminated(app *v1beta2.SparkApplication) bool { + return app.Status.AppState.State == v1beta2.ApplicationStateCompleted || + app.Status.AppState.State == v1beta2.ApplicationStateFailed +} + // IsExpired returns whether the given SparkApplication is expired. func IsExpired(app *v1beta2.SparkApplication) bool { // The application has no TTL defined and will never expire.