diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index 109ef06ba1..eb19015586 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -64,6 +64,8 @@ var ( DefaultPodTemplateResync: config2.Duration{ Duration: 30 * time.Second, }, + UpdateBaseBackoffDuration: 10, + UpdateBackoffRetries: 5, } // K8sPluginConfigSection provides a singular top level config section for all plugins. @@ -206,6 +208,12 @@ type K8sPluginConfig struct { // SendObjectEvents indicates whether to send k8s object events in TaskExecutionEvent updates (similar to kubectl get events). SendObjectEvents bool `json:"send-object-events" pflag:",If true, will send k8s object events in TaskExecutionEvent updates."` + + // Initial delay in exponential backoff when updating a resource in milliseconds. + UpdateBaseBackoffDuration int `json:"update-base-backoff-duration" pflag:",Initial delay in exponential backoff when updating a resource in milliseconds."` + + // Number of retries for exponential backoff when updating a resource. + UpdateBackoffRetries int `json:"update-backoff-retries" pflag:",Number of retries for exponential backoff when updating a resource."` } // FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go index 7a3f1c951e..4652d0bfd4 100755 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go @@ -67,5 +67,7 @@ func (cfg K8sPluginConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-pod-template-name"), defaultK8sConfig.DefaultPodTemplateName, "Name of the PodTemplate to use as the base for all k8s pods created by FlytePropeller.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-pod-template-resync"), defaultK8sConfig.DefaultPodTemplateResync.String(), "Frequency of resyncing default pod templates") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "send-object-events"), defaultK8sConfig.SendObjectEvents, "If true, will send k8s object events in TaskExecutionEvent updates.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "update-base-backoff-duration"), defaultK8sConfig.UpdateBaseBackoffDuration, "Initial delay in exponential backoff when updating a resource in milliseconds.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "update-backoff-retries"), defaultK8sConfig.UpdateBackoffRetries, "Number of retries for exponential backoff when updating a resource.") return cmdFlags } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go index 4d5918a3b5..cc46ffa466 100755 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go @@ -337,4 +337,32 @@ func TestK8sPluginConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_update-base-backoff-duration", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("update-base-backoff-duration", testValue) + if vInt, err := cmdFlags.GetInt("update-base-backoff-duration"); err == nil { + testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vInt), &actual.UpdateBaseBackoffDuration) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_update-backoff-retries", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("update-backoff-retries", testValue) + if vInt, err := cmdFlags.GetInt("update-backoff-retries"); err == nil { + testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vInt), &actual.UpdateBackoffRetries) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index f9c3806ee6..42d3ad9b85 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" @@ -92,9 +93,11 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller - resourceLevelMonitor *ResourceLevelMonitor - eventWatcher EventWatcher + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor + eventWatcher EventWatcher + updateBaseBackoffDuration int + updateBackoffRetries int } func (e *PluginManager) addObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { @@ -463,25 +466,48 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu } nsName = k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} + retryBackoff := wait.Backoff{ + Duration: time.Duration(e.updateBaseBackoffDuration) * time.Millisecond, + Factor: 2.0, + Jitter: 0.1, + Steps: e.updateBackoffRetries, + } + // Attempt to cleanup finalizers so that the object may be deleted/garbage collected. We try to clear them for all // objects, regardless of whether or not InjectFinalizer is configured to handle all cases where InjectFinalizer is // enabled/disabled during object execution. - if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { - if isK8sObjectNotExists(err) { - return nil + var lastErr error + _ = wait.ExponentialBackoff(retryBackoff, func() (bool, error) { + lastErr = nil + if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { + if isK8sObjectNotExists(err) { + return true, nil + } + lastErr = err + // This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a + // Pod does not exist error. This should be retried using the retry policy + logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err) + return true, err } - // This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a - // Pod does not exist error. This should be retried using the retry policy - logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err) - return err - } - // This must happen after sending admin event. It's safe against partial failures because if the event failed, we will - // simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send - // the same event (idempotent) and then come here again... - err = e.clearFinalizers(ctx, o) - if err != nil { - errs.Append(err) + // This must happen after sending admin event. It's safe against partial failures because if the event failed, we will + // simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send + // the same event (idempotent) and then come here again... + if err := e.clearFinalizers(ctx, o); err != nil { + lastErr = err + // retry is if there is a conflict in case the informer cache is out of sync + if k8serrors.IsConflict(err) { + logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v. Retrying..", nsName, err) + return false, nil + } + logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v", nsName, err) + return true, err + } + return true, nil + }) + + if lastErr != nil { + errs.Append(lastErr) } // If we should delete the resource when finalize is called, do a best effort delete. @@ -630,8 +656,9 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, err } + k8sConfig := config.GetK8sPluginConfig() var eventWatcher EventWatcher - if config.GetK8sPluginConfig().SendObjectEvents { + if k8sConfig.SendObjectEvents { eventWatcher, err = NewEventWatcher(ctx, gvk, kubeClientset) if err != nil { return nil, err @@ -645,13 +672,15 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry rm.RunCollectorOnce(ctx) return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: kubeClient, - resourceLevelMonitor: rm, - eventWatcher: eventWatcher, + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: kubeClient, + resourceLevelMonitor: rm, + eventWatcher: eventWatcher, + updateBaseBackoffDuration: k8sConfig.UpdateBaseBackoffDuration, + updateBackoffRetries: k8sConfig.UpdateBackoffRetries, }, nil }