From 774c457f52c6fb710b116f34d0cd101bd76fafa3 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Fri, 27 May 2022 18:18:09 -0500 Subject: [PATCH] Restrict garbage collection with sharded propeller (#444) * refactored to isolate computing sharded label selector requirements Signed-off-by: Daniel Rammer * restricting garbage collection with sharded label selector requirements Signed-off-by: Daniel Rammer * fix lint issue Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/controller.go | 21 ++++++++++++--- .../pkg/controller/garbage_collector.go | 27 ++++++++++++------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index 62571c4bdb..09c5337188 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -453,8 +453,9 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter return controller, nil } -// SharedInformerOptions creates informer options to work with FlytePropeller Sharding -func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption { +// getShardedLabelSelectorRequirements computes the collection of LabelSelectorReuquirements to +// satisfy sharding criteria. +func getShardedLabelSelectorRequirements(cfg *config.Config) []v1.LabelSelectorRequirement { selectors := []struct { label string operation v1.LabelSelectorOperator @@ -468,7 +469,7 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform {k8s.DomainLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeDomainLabel}, } - labelSelector := IgnoreCompletedWorkflowsLabelSelector() + var labelSelectorRequirements []v1.LabelSelectorRequirement for _, selector := range selectors { if len(selector.values) > 0 { labelSelectorRequirement := v1.LabelSelectorRequirement{ @@ -477,10 +478,22 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform Values: selector.values, } - labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorRequirement) + labelSelectorRequirements = append(labelSelectorRequirements, labelSelectorRequirement) } } + return labelSelectorRequirements +} + +// SharedInformerOptions creates informer options to work with FlytePropeller Sharding +func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption { + labelSelector := IgnoreCompletedWorkflowsLabelSelector() + + shardedLabelSelectorRequirements := getShardedLabelSelectorRequirements(cfg) + if len(shardedLabelSelectorRequirements) != 0 { + labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, shardedLabelSelectorRequirements...) + } + opts := []informers.SharedInformerOption{ informers.WithTweakListOptions(func(options *v1.ListOptions) { options.LabelSelector = v1.FormatLabelSelector(labelSelector) diff --git a/flytepropeller/pkg/controller/garbage_collector.go b/flytepropeller/pkg/controller/garbage_collector.go index d349e612d1..3fbbc7b48a 100644 --- a/flytepropeller/pkg/controller/garbage_collector.go +++ b/flytepropeller/pkg/controller/garbage_collector.go @@ -28,18 +28,22 @@ type gcMetrics struct { // GarbageCollector is an active background cleanup service, that deletes all workflows that are completed and older // than the configured TTL type GarbageCollector struct { - wfClient v1alpha1.FlyteworkflowV1alpha1Interface - namespaceClient corev1.NamespaceInterface - ttlHours int - interval time.Duration - clk clock.Clock - metrics *gcMetrics - namespace string + wfClient v1alpha1.FlyteworkflowV1alpha1Interface + namespaceClient corev1.NamespaceInterface + ttlHours int + interval time.Duration + clk clock.Clock + metrics *gcMetrics + namespace string + labelSelectorRequirements []v1.LabelSelectorRequirement } // Issues a background deletion command with label selector for all completed workflows outside of the retention period func (g *GarbageCollector) deleteWorkflows(ctx context.Context) error { s := CompletedWorkflowsSelectorOutsideRetentionPeriod(g.ttlHours, g.clk.Now()) + if len(g.labelSelectorRequirements) != 0 { + s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...) + } // Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each. if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" { @@ -74,6 +78,9 @@ func (g *GarbageCollector) deleteWorkflows(ctx context.Context) error { // Deprecated: Please use deleteWorkflows instead func (g *GarbageCollector) deprecatedDeleteWorkflows(ctx context.Context) error { s := DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod(g.ttlHours, g.clk.Now()) + if len(g.labelSelectorRequirements) != 0 { + s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...) + } // Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each. if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" { @@ -168,6 +175,7 @@ func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.Cl } else { logger.Warningf(context.TODO(), "defaulting max ttl for workflows to 23 hours, since configured duration is larger than 23 [%d]", cfg.MaxTTLInHours) } + labelSelectorRequirements := getShardedLabelSelectorRequirements(cfg) return &GarbageCollector{ wfClient: wfClient, ttlHours: ttl, @@ -178,7 +186,8 @@ func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.Cl gcRoundSuccess: labeled.NewCounter("gc_success", "successful executions of delete request", scope), gcRoundFailure: labeled.NewCounter("gc_failure", "failure to delete workflows", scope), }, - clk: clk, - namespace: cfg.LimitNamespace, + clk: clk, + namespace: cfg.LimitNamespace, + labelSelectorRequirements: labelSelectorRequirements, }, nil }