Skip to content

Commit

Permalink
Restrict garbage collection with sharded propeller (flyteorg#444)
Browse files Browse the repository at this point in the history
* refactored to isolate computing sharded label selector requirements

Signed-off-by: Daniel Rammer <[email protected]>

* restricting garbage collection with sharded label selector requirements

Signed-off-by: Daniel Rammer <[email protected]>

* fix lint issue

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored May 27, 2022
1 parent bc060ea commit 774c457
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
21 changes: 17 additions & 4 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,9 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return controller, nil
}

// SharedInformerOptions creates informer options to work with FlytePropeller Sharding
func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption {
// getShardedLabelSelectorRequirements computes the collection of LabelSelectorReuquirements to
// satisfy sharding criteria.
func getShardedLabelSelectorRequirements(cfg *config.Config) []v1.LabelSelectorRequirement {
selectors := []struct {
label string
operation v1.LabelSelectorOperator
Expand All @@ -468,7 +469,7 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform
{k8s.DomainLabel, v1.LabelSelectorOpNotIn, cfg.ExcludeDomainLabel},
}

labelSelector := IgnoreCompletedWorkflowsLabelSelector()
var labelSelectorRequirements []v1.LabelSelectorRequirement
for _, selector := range selectors {
if len(selector.values) > 0 {
labelSelectorRequirement := v1.LabelSelectorRequirement{
Expand All @@ -477,10 +478,22 @@ func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []inform
Values: selector.values,
}

labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorRequirement)
labelSelectorRequirements = append(labelSelectorRequirements, labelSelectorRequirement)
}
}

return labelSelectorRequirements
}

// SharedInformerOptions creates informer options to work with FlytePropeller Sharding
func SharedInformerOptions(cfg *config.Config, defaultNamespace string) []informers.SharedInformerOption {
labelSelector := IgnoreCompletedWorkflowsLabelSelector()

shardedLabelSelectorRequirements := getShardedLabelSelectorRequirements(cfg)
if len(shardedLabelSelectorRequirements) != 0 {
labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, shardedLabelSelectorRequirements...)
}

opts := []informers.SharedInformerOption{
informers.WithTweakListOptions(func(options *v1.ListOptions) {
options.LabelSelector = v1.FormatLabelSelector(labelSelector)
Expand Down
27 changes: 18 additions & 9 deletions flytepropeller/pkg/controller/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,22 @@ type gcMetrics struct {
// GarbageCollector is an active background cleanup service, that deletes all workflows that are completed and older
// than the configured TTL
type GarbageCollector struct {
wfClient v1alpha1.FlyteworkflowV1alpha1Interface
namespaceClient corev1.NamespaceInterface
ttlHours int
interval time.Duration
clk clock.Clock
metrics *gcMetrics
namespace string
wfClient v1alpha1.FlyteworkflowV1alpha1Interface
namespaceClient corev1.NamespaceInterface
ttlHours int
interval time.Duration
clk clock.Clock
metrics *gcMetrics
namespace string
labelSelectorRequirements []v1.LabelSelectorRequirement
}

// Issues a background deletion command with label selector for all completed workflows outside of the retention period
func (g *GarbageCollector) deleteWorkflows(ctx context.Context) error {
s := CompletedWorkflowsSelectorOutsideRetentionPeriod(g.ttlHours, g.clk.Now())
if len(g.labelSelectorRequirements) != 0 {
s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...)
}

// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
Expand Down Expand Up @@ -74,6 +78,9 @@ func (g *GarbageCollector) deleteWorkflows(ctx context.Context) error {
// Deprecated: Please use deleteWorkflows instead
func (g *GarbageCollector) deprecatedDeleteWorkflows(ctx context.Context) error {
s := DeprecatedCompletedWorkflowsSelectorOutsideRetentionPeriod(g.ttlHours, g.clk.Now())
if len(g.labelSelectorRequirements) != 0 {
s.MatchExpressions = append(s.MatchExpressions, g.labelSelectorRequirements...)
}

// Delete doesn't support 'all' namespaces. Let's fetch namespaces and loop over each.
if g.namespace == "" || strings.ToLower(g.namespace) == "all" || strings.ToLower(g.namespace) == "all-namespaces" {
Expand Down Expand Up @@ -168,6 +175,7 @@ func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.Cl
} else {
logger.Warningf(context.TODO(), "defaulting max ttl for workflows to 23 hours, since configured duration is larger than 23 [%d]", cfg.MaxTTLInHours)
}
labelSelectorRequirements := getShardedLabelSelectorRequirements(cfg)
return &GarbageCollector{
wfClient: wfClient,
ttlHours: ttl,
Expand All @@ -178,7 +186,8 @@ func NewGarbageCollector(cfg *config.Config, scope promutils.Scope, clk clock.Cl
gcRoundSuccess: labeled.NewCounter("gc_success", "successful executions of delete request", scope),
gcRoundFailure: labeled.NewCounter("gc_failure", "failure to delete workflows", scope),
},
clk: clk,
namespace: cfg.LimitNamespace,
clk: clk,
namespace: cfg.LimitNamespace,
labelSelectorRequirements: labelSelectorRequirements,
}, nil
}

0 comments on commit 774c457

Please sign in to comment.