Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Update resource failures w/ Finalizers set (#423) #5673

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var (
DefaultPodTemplateResync: config2.Duration{
Duration: 30 * time.Second,
},
UpdateBaseBackoffDuration: 10,
UpdateBackoffRetries: 5,
}

// K8sPluginConfigSection provides a singular top level config section for all plugins.
Expand Down Expand Up @@ -206,6 +208,12 @@ type K8sPluginConfig struct {

// SendObjectEvents indicates whether to send k8s object events in TaskExecutionEvent updates (similar to kubectl get events).
SendObjectEvents bool `json:"send-object-events" pflag:",If true, will send k8s object events in TaskExecutionEvent updates."`

// Initial delay in exponential backoff when updating a resource in milliseconds.
UpdateBaseBackoffDuration int `json:"update-base-backoff-duration" pflag:",Initial delay in exponential backoff when updating a resource in milliseconds."`

// Number of retries for exponential backoff when updating a resource.
UpdateBackoffRetries int `json:"update-backoff-retries" pflag:",Number of retries for exponential backoff when updating a resource."`
}

// FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 54 additions & 25 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -92,9 +93,11 @@
kubeClient pluginsCore.KubeClient
metrics PluginMetrics
// Per namespace-resource
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
eventWatcher EventWatcher
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
eventWatcher EventWatcher
updateBaseBackoffDuration int
updateBackoffRetries int
}

func (e *PluginManager) addObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
Expand Down Expand Up @@ -463,25 +466,48 @@
}
nsName = k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}

retryBackoff := wait.Backoff{
Duration: time.Duration(e.updateBaseBackoffDuration) * time.Millisecond,
Factor: 2.0,
Jitter: 0.1,
Steps: e.updateBackoffRetries,
}

// Attempt to cleanup finalizers so that the object may be deleted/garbage collected. We try to clear them for all
// objects, regardless of whether or not InjectFinalizer is configured to handle all cases where InjectFinalizer is
// enabled/disabled during object execution.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
if isK8sObjectNotExists(err) {
return nil
var lastErr error
_ = wait.ExponentialBackoff(retryBackoff, func() (bool, error) {
lastErr = nil
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
if isK8sObjectNotExists(err) {
return true, nil

Check warning on line 484 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L481-L484

Added lines #L481 - L484 were not covered by tests
}
lastErr = err

Check warning on line 486 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L486

Added line #L486 was not covered by tests
// This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a
// Pod does not exist error. This should be retried using the retry policy
logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err)
return true, err

Check warning on line 490 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L489-L490

Added lines #L489 - L490 were not covered by tests
}
// This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a
// Pod does not exist error. This should be retried using the retry policy
logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err)
return err
}

// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
// the same event (idempotent) and then come here again...
err = e.clearFinalizers(ctx, o)
if err != nil {
errs.Append(err)
// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
// the same event (idempotent) and then come here again...
if err := e.clearFinalizers(ctx, o); err != nil {
lastErr = err

Check warning on line 497 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L496-L497

Added lines #L496 - L497 were not covered by tests
// retry is if there is a conflict in case the informer cache is out of sync
if k8serrors.IsConflict(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v. Retrying..", nsName, err)
return false, nil

Check warning on line 501 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L499-L501

Added lines #L499 - L501 were not covered by tests
}
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v", nsName, err)
return true, err

Check warning on line 504 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L503-L504

Added lines #L503 - L504 were not covered by tests
}
return true, nil

Check warning on line 506 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L506

Added line #L506 was not covered by tests
})

if lastErr != nil {
errs.Append(lastErr)

Check warning on line 510 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L510

Added line #L510 was not covered by tests
}

// If we should delete the resource when finalize is called, do a best effort delete.
Expand Down Expand Up @@ -630,8 +656,9 @@
return nil, err
}

k8sConfig := config.GetK8sPluginConfig()
var eventWatcher EventWatcher
if config.GetK8sPluginConfig().SendObjectEvents {
if k8sConfig.SendObjectEvents {
eventWatcher, err = NewEventWatcher(ctx, gvk, kubeClientset)
if err != nil {
return nil, err
Expand All @@ -645,13 +672,15 @@
rm.RunCollectorOnce(ctx)

return &PluginManager{
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
eventWatcher: eventWatcher,
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
eventWatcher: eventWatcher,
updateBaseBackoffDuration: k8sConfig.UpdateBaseBackoffDuration,
updateBackoffRetries: k8sConfig.UpdateBackoffRetries,
}, nil
}

Expand Down
Loading