From e4b65548a22ad94ab9e670459303888da7de9ee2 Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Fri, 24 Mar 2023 15:48:19 -0400 Subject: [PATCH] helm-operator: reduce cache memory footprint Make use of label selectors used by informers for both the primary CR and for chart manifest objects Signed-off-by: Joe Lanford --- .../helm-operator-cache-selectors.yaml | 9 ++ go.mod | 2 +- go.sum | 4 +- internal/cmd/helm-operator/run/cmd.go | 113 +++++++++++++----- internal/helm/client/client.go | 42 +++++++ internal/helm/controller/controller.go | 42 +------ internal/helm/controller/controller_test.go | 39 ------ internal/helm/release/manager_factory.go | 4 + 8 files changed, 149 insertions(+), 106 deletions(-) create mode 100644 changelog/fragments/helm-operator-cache-selectors.yaml delete mode 100644 internal/helm/controller/controller_test.go diff --git a/changelog/fragments/helm-operator-cache-selectors.yaml b/changelog/fragments/helm-operator-cache-selectors.yaml new file mode 100644 index 00000000000..f6e6228e9c2 --- /dev/null +++ b/changelog/fragments/helm-operator-cache-selectors.yaml @@ -0,0 +1,9 @@ +entries: + - description: > + (helm): Use informer cache label selectors to reduce memory consumption. + kind: bugfix + breaking: false + - description: > + (helm): Fix bug with detection of owner reference support when setting up dynamic watches + kind: bugfix + breaking: false diff --git a/go.mod b/go.mod index 60210295aa3..b08bdae68b0 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( k8s.io/client-go v0.26.2 k8s.io/kubectl v0.26.2 k8s.io/utils v0.0.0-20230711102312-30195339c3c7 - sigs.k8s.io/controller-runtime v0.14.5 + sigs.k8s.io/controller-runtime v0.14.6 sigs.k8s.io/controller-tools v0.11.3 sigs.k8s.io/kubebuilder/v3 v3.9.1 sigs.k8s.io/yaml v1.3.0 diff --git a/go.sum b/go.sum index 8449088f3de..89ade6b2d8e 100644 --- a/go.sum +++ b/go.sum @@ -1658,8 +1658,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo= -sigs.k8s.io/controller-runtime v0.14.5 h1:6xaWFqzT5KuAQ9ufgUaj1G/+C4Y1GRkhrxl+BJ9i+5s= -sigs.k8s.io/controller-runtime v0.14.5/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= +sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA= +sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0= sigs.k8s.io/controller-tools v0.11.3 h1:T1xzLkog9saiyQSLz1XOImu4OcbdXWytc5cmYsBeBiE= sigs.k8s.io/controller-tools v0.11.3/go.mod h1:qcfX7jfcfYD/b7lAhvqAyTbt/px4GpvN88WKLFFv7p8= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= diff --git a/internal/cmd/helm-operator/run/cmd.go b/internal/cmd/helm-operator/run/cmd.go index 50931ed1d44..0cd7997f95f 100644 --- a/internal/cmd/helm-operator/run/cmd.go +++ b/internal/cmd/helm-operator/run/cmd.go @@ -25,6 +25,7 @@ import ( "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apimachruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -45,6 +46,10 @@ import ( "github.com/operator-framework/operator-sdk/internal/helm/watches" "github.com/operator-framework/operator-sdk/internal/util/k8sutil" sdkVersion "github.com/operator-framework/operator-sdk/internal/version" + "helm.sh/helm/v3/pkg/chart/loader" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" ) var log = logf.Log.WithName("cmd") @@ -136,6 +141,23 @@ func run(cmd *cobra.Command, f *flags.Flags) { // Set default manager options options = f.ToManagerOptions(options) + if options.Scheme == nil { + options.Scheme = apimachruntime.NewScheme() + } + + ws, err := watches.Load(f.WatchesFile) + if err != nil { + log.Error(err, "Failed to load watches file.") + os.Exit(1) + } + + watchNamespaces := getWatchNamespaces(options.Namespace) + options.NewCache, err = buildNewCacheFunc(watchNamespaces, ws, options.Scheme) + if err != nil { + log.Error(err, "Failed to create NewCache function for manager.") + os.Exit(1) + } + if options.NewClient == nil { options.NewClient = func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) { // Create the Client for Write operations. @@ -152,27 +174,6 @@ func run(cmd *cobra.Command, f *flags.Flags) { }) } } - namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar) - log = log.WithValues("Namespace", namespace) - if found { - log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar)) - if namespace == metav1.NamespaceAll { - log.Info("Watching all namespaces.") - options.Namespace = metav1.NamespaceAll - } else { - if strings.Contains(namespace, ",") { - log.Info("Watching multiple namespaces.") - options.NewCache = cache.MultiNamespacedCacheBuilder(strings.Split(namespace, ",")) - } else { - log.Info("Watching single namespace.") - options.Namespace = namespace - } - } - } else if options.Namespace == "" { - log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+ - "Watching all namespaces.", k8sutil.WatchNamespaceEnvVar)) - options.Namespace = metav1.NamespaceAll - } mgr, err := manager.New(cfg, options) if err != nil { @@ -189,11 +190,6 @@ func run(cmd *cobra.Command, f *flags.Flags) { os.Exit(1) } - ws, err := watches.Load(f.WatchesFile) - if err != nil { - log.Error(err, "Failed to create new manager factories.") - os.Exit(1) - } acg, err := helmClient.NewActionConfigGetter(mgr.GetConfig(), mgr.GetRESTMapper(), mgr.GetLogger()) if err != nil { log.Error(err, "Failed to create Helm action config getter") @@ -207,7 +203,6 @@ func run(cmd *cobra.Command, f *flags.Flags) { } err := controller.Add(mgr, controller.WatchOptions{ - Namespace: namespace, GVK: w.GroupVersionKind, ManagerFactory: release.NewManagerFactory(mgr, acg, w.ChartDir), ReconcilePeriod: reconcilePeriod, @@ -250,3 +245,67 @@ func exitIfUnsupported(options manager.Options) { os.Exit(1) } } + +func getWatchNamespaces(defaultNamespace string) []string { + namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar) + log = log.WithValues("Namespace", namespace) + if found { + log.V(1).Info(fmt.Sprintf("Setting namespace with value in %s", k8sutil.WatchNamespaceEnvVar)) + if namespace == metav1.NamespaceAll { + log.Info("Watching all namespaces.") + return []string{metav1.NamespaceAll} + } + if strings.Contains(namespace, ",") { + log.Info("Watching multiple namespaces.") + return strings.Split(namespace, ",") + } + log.Info("Watching single namespace.") + return []string{namespace} + } + if defaultNamespace == "" { + log.Info(fmt.Sprintf("Watch namespaces not configured by environment variable %s or file. "+ + "Watching all namespaces.", k8sutil.WatchNamespaceEnvVar)) + return []string{metav1.NamespaceAll} + } + return []string{defaultNamespace} +} + +func buildNewCacheFunc(watchNamespaces []string, ws []watches.Watch, sch *apimachruntime.Scheme) (cache.NewCacheFunc, error) { + selectorsByObject := cache.SelectorsByObject{} + chartNames := make([]string, 0, len(ws)) + for _, w := range ws { + sch.AddKnownTypeWithName(w.GroupVersionKind, &unstructured.Unstructured{}) + + crObj := &unstructured.Unstructured{} + crObj.SetGroupVersionKind(w.GroupVersionKind) + sel, err := metav1.LabelSelectorAsSelector(&w.Selector) + if err != nil { + return nil, fmt.Errorf("unable to parse watch selector for %s: %v", w.GroupVersionKind, err) + } + selectorsByObject[crObj] = cache.ObjectSelector{Label: sel} + + chrt, err := loader.LoadDir(w.ChartDir) + if err != nil { + return nil, fmt.Errorf("unable to load chart for %s: %v", w.GroupVersionKind, err) + } + chartNames = append(chartNames, chrt.Name()) + + } + req, err := labels.NewRequirement("helm.sdk.operatorframework.io/chart", selection.In, chartNames) + if err != nil { + return nil, fmt.Errorf("unable to create label requirement for cache default selector: %v", err) + } + defaultSelector := labels.NewSelector().Add(*req) + + return func(config *rest.Config, opts cache.Options) (cache.Cache, error) { + opts.SelectorsByObject = selectorsByObject + opts.DefaultSelector = cache.ObjectSelector{Label: defaultSelector} + if len(watchNamespaces) > 1 { + return cache.MultiNamespacedCacheBuilder(watchNamespaces)(config, opts) + } + if len(watchNamespaces) == 1 { + opts.Namespace = watchNamespaces[0] + } + return cache.New(config, opts) + }, nil +} diff --git a/internal/helm/client/client.go b/internal/helm/client/client.go index 62e18f541a9..8cfebe2b029 100644 --- a/internal/helm/client/client.go +++ b/internal/helm/client/client.go @@ -105,3 +105,45 @@ func containsResourcePolicyKeep(annotations map[string]string) bool { resourcePolicyType = strings.ToLower(strings.TrimSpace(resourcePolicyType)) return resourcePolicyType == kube.KeepPolicy } + +type labelInjectingClient struct { + kube.Interface + labels map[string]string +} + +func NewLabelInjectingClient(base kube.Interface, labels map[string]string) kube.Interface { + return &labelInjectingClient{ + Interface: base, + labels: labels, + } +} + +func (c *labelInjectingClient) Build(reader io.Reader, validate bool) (kube.ResourceList, error) { + resourceList, err := c.Interface.Build(reader, validate) + if err != nil { + return resourceList, err + } + err = resourceList.Visit(func(r *resource.Info, err error) error { + if err != nil { + return err + } + objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(r.Object) + if err != nil { + return err + } + u := &unstructured.Unstructured{Object: objMap} + labels := u.GetLabels() + if labels == nil { + labels = map[string]string{} + } + for k, v := range c.labels { + labels[k] = v + } + u.SetLabels(labels) + return nil + }) + if err != nil { + return nil, err + } + return resourceList, nil +} diff --git a/internal/helm/controller/controller.go b/internal/helm/controller/controller.go index 0b3e6f6e1ed..e2e7fb9f099 100644 --- a/internal/helm/controller/controller.go +++ b/internal/helm/controller/controller.go @@ -16,7 +16,6 @@ package controller import ( "fmt" - "reflect" "strings" "sync" "time" @@ -31,7 +30,6 @@ import ( crthandler "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" - ctrlpredicate "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" "sigs.k8s.io/yaml" @@ -46,7 +44,6 @@ var log = logf.Log.WithName("helm.controller") // WatchOptions contains the necessary values to create a new controller that // manages helm releases in a particular namespace based on a GVK watch. type WatchOptions struct { - Namespace string GVK schema.GroupVersionKind ManagerFactory release.ManagerFactory ReconcilePeriod time.Duration @@ -71,10 +68,6 @@ func Add(mgr manager.Manager, options WatchOptions) error { SuppressOverrideValues: options.SuppressOverrideValues, } - // Register the GVK with the schema - mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{}) - metav1.AddToGroupVersion(mgr.GetScheme(), options.GVK.GroupVersion()) - c, err := controller.New(controllerName, mgr, controller.Options{ Reconciler: r, MaxConcurrentReconciles: options.MaxConcurrentReconciles, @@ -86,18 +79,7 @@ func Add(mgr manager.Manager, options WatchOptions) error { o := &unstructured.Unstructured{} o.SetGroupVersionKind(options.GVK) - var preds []ctrlpredicate.Predicate - p, err := parsePredicateSelector(options.Selector) - - if err != nil { - return err - } - - if p != nil { - preds = append(preds, p) - } - - if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}, preds...); err != nil { + if err := c.Watch(&source.Kind{Type: o}, &libhandler.InstrumentedEnqueueRequestForObject{}); err != nil { return err } @@ -106,33 +88,19 @@ func Add(mgr manager.Manager, options WatchOptions) error { } log.Info("Watching resource", "apiVersion", options.GVK.GroupVersion(), "kind", - options.GVK.Kind, "namespace", options.Namespace, "reconcilePeriod", options.ReconcilePeriod.String()) + options.GVK.Kind, "reconcilePeriod", options.ReconcilePeriod.String()) return nil } -// parsePredicateSelector parses the selector in the WatchOptions and creates a predicate -// that is used to filter resources based on the specified selector -func parsePredicateSelector(selector metav1.LabelSelector) (ctrlpredicate.Predicate, error) { - // If a selector has been specified in watches.yaml, add it to the watch's predicates. - if !reflect.ValueOf(selector).IsZero() { - p, err := ctrlpredicate.LabelSelectorPredicate(selector) - if err != nil { - return nil, fmt.Errorf("error constructing predicate from watches selector: %v", err) - } - return p, nil - } - return nil, nil -} - // watchDependentResources adds a release hook function to the HelmOperatorReconciler // that adds watches for resources in released Helm charts. func watchDependentResources(mgr manager.Manager, r *HelmOperatorReconciler, c controller.Controller) { - owner := &unstructured.Unstructured{} - owner.SetGroupVersionKind(r.GVK) - var m sync.RWMutex watches := map[schema.GroupVersionKind]struct{}{} releaseHook := func(release *rpb.Release) error { + owner := &unstructured.Unstructured{} + owner.SetGroupVersionKind(r.GVK) + owner.SetNamespace(release.Namespace) resources := releaseutil.SplitManifests(release.Manifest) for _, resource := range resources { var u unstructured.Unstructured diff --git a/internal/helm/controller/controller_test.go b/internal/helm/controller/controller_test.go deleted file mode 100644 index 968fdef1140..00000000000 --- a/internal/helm/controller/controller_test.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2021 The Operator-SDK Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package controller - -import ( - "testing" - - "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func TestFilterPredicate(t *testing.T) { - matchLabelPass := make(map[string]string) - matchLabelPass["testKey"] = "testValue" - selectorPass := metav1.LabelSelector{ - MatchLabels: matchLabelPass, - } - noSelector := metav1.LabelSelector{} - - passPredicate, err := parsePredicateSelector(selectorPass) - assert.Equal(t, nil, err, "Verify that no error is thrown on a valid populated selector") - assert.NotEqual(t, nil, passPredicate, "Verify that a predicate is constructed using a valid selector") - - nilPredicate, err := parsePredicateSelector(noSelector) - assert.Equal(t, nil, err, "Verify that no error is thrown on a valid unpopulated selector") - assert.Equal(t, nil, nilPredicate, "Verify correct parsing of an unpopulated selector") -} diff --git a/internal/helm/release/manager_factory.go b/internal/helm/release/manager_factory.go index cb1857fd0b7..d0eeb4704ff 100644 --- a/internal/helm/release/manager_factory.go +++ b/internal/helm/release/manager_factory.go @@ -58,6 +58,10 @@ func (f managerFactory) NewManager(cr *unstructured.Unstructured, overrideValues return nil, fmt.Errorf("failed to load chart dir: %w", err) } + actionConfig.KubeClient = client.NewLabelInjectingClient(actionConfig.KubeClient, map[string]string{ + "helm.sdk.operatorframework.io/chart": crChart.Name(), + }) + releaseName, err := getReleaseName(actionConfig.Releases, crChart.Name(), cr) if err != nil { return nil, fmt.Errorf("failed to get helm release name: %w", err)