From fe7bb014d0d3cd30cc6b284e2a75848fa8ba8796 Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Thu, 26 Sep 2024 16:33:37 -0700 Subject: [PATCH] fix: remediator missing custom resource events Prior to this change, the remediator watches were only being started for new custom resources after the apply attempt had fully completed. This left some time after the object was applied that the remediator could miss events made by third-parties. Normally, this would be fine, because the remediator would revert any change after the watch was started. But if a DELETE event was missed, the object wouldn't be recreated until the next apply attempt. This change adds a CRD Controller to the remediator that watches CRDs and executes any registered handlers when the CRD is established, unestablished, or deleted. The remediator now registers CRD handlers for each resource type it watches, starting watchers as soon as possible, without waiting for the next apply attempt. --- cmd/reconciler-manager/main.go | 31 ++- e2e/testcases/custom_resources_test.go | 17 +- manifests/base/kustomization.yaml | 1 + ...reconciler-cluster-scope-cluster-role.yaml | 27 +++ .../root-reconciler-base-cluster-role.yaml | 3 + pkg/reconciler/reconciler.go | 10 +- .../controllers/build_names.go | 14 +- .../controllers/crd_controller.go | 212 ++++++++++-------- .../controllers/garbage_collector.go | 4 +- .../controllers/reconciler_base.go | 29 +++ .../controllers/reposync_controller.go | 25 ++- .../controllers/rootsync_controller.go | 47 ++-- pkg/remediator/remediator.go | 4 +- pkg/remediator/watch/manager.go | 49 +++- pkg/remediator/watch/manager_test.go | 7 +- pkg/util/customresource/customresource.go | 32 +++ .../customresource/customresource_test.go | 74 ++++++ 17 files changed, 425 insertions(+), 161 deletions(-) create mode 100644 manifests/ns-reconciler-cluster-scope-cluster-role.yaml create mode 100644 pkg/util/customresource/customresource.go create mode 100644 pkg/util/customresource/customresource_test.go diff --git a/cmd/reconciler-manager/main.go b/cmd/reconciler-manager/main.go index 6f4a55bbd5..0f48e638b8 100644 --- a/cmd/reconciler-manager/main.go +++ b/cmd/reconciler-manager/main.go @@ -21,6 +21,7 @@ import ( "os" "github.com/go-logr/logr" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +34,7 @@ import ( "kpt.dev/configsync/pkg/profiler" "kpt.dev/configsync/pkg/reconcilermanager" "kpt.dev/configsync/pkg/reconcilermanager/controllers" + "kpt.dev/configsync/pkg/util/customresource" "kpt.dev/configsync/pkg/util/log" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -95,9 +97,10 @@ func main() { } watchFleetMembership := fleetMembershipCRDExists(dynamicClient, mgr.GetRESTMapper(), &setupLog) - crdController := controllers.NewCRDReconciler( + crdController := &controllers.CRDController{} + crdMetaController := controllers.NewCRDMetaController(crdController, mgr.GetCache(), textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD")) - if err := crdController.Register(mgr); err != nil { + if err := crdMetaController.Register(mgr); err != nil { setupLog.Error(err, "failed to register controller", "controller", "CRD") os.Exit(1) } @@ -108,11 +111,15 @@ func main() { mgr.GetClient(), watcher, dynamicClient, textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName(configsync.RepoSyncKind), mgr.GetScheme()) - crdController.SetCRDHandler(configsync.RepoSyncCRDName, func() error { - if err := repoSyncController.Register(mgr, watchFleetMembership); err != nil { - return fmt.Errorf("registering %s controller: %w", configsync.RepoSyncKind, err) + crdController.SetReconciler(kinds.RepoSyncV1Beta1().GroupKind(), func(_ context.Context, crd *apiextensionsv1.CustomResourceDefinition) error { + if customresource.IsEstablished(crd) { + if err := repoSyncController.Register(mgr, watchFleetMembership); err != nil { + return fmt.Errorf("registering %s controller: %w", configsync.RepoSyncKind, err) + } + setupLog.Info("RepoSync controller registration successful") } - setupLog.Info("RepoSync controller registration successful") + // Don't stop the RepoSync controller when its CRD is deleted, + // otherwise we may miss RepoSync object deletion events. return nil }) setupLog.Info("RepoSync controller registration scheduled") @@ -122,11 +129,15 @@ func main() { mgr.GetClient(), watcher, dynamicClient, textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName(configsync.RootSyncKind), mgr.GetScheme()) - crdController.SetCRDHandler(configsync.RootSyncCRDName, func() error { - if err := rootSyncController.Register(mgr, watchFleetMembership); err != nil { - return fmt.Errorf("registering %s controller: %w", configsync.RootSyncKind, err) + crdController.SetReconciler(kinds.RootSyncV1Beta1().GroupKind(), func(_ context.Context, crd *apiextensionsv1.CustomResourceDefinition) error { + if customresource.IsEstablished(crd) { + if err := rootSyncController.Register(mgr, watchFleetMembership); err != nil { + return fmt.Errorf("registering %s controller: %w", configsync.RootSyncKind, err) + } + setupLog.Info("RootSync controller registration successful") } - setupLog.Info("RootSync controller registration successful") + // Don't stop the RootSync controller when its CRD is deleted, + // otherwise we may miss RootSync object deletion events. return nil }) setupLog.Info("RootSync controller registration scheduled") diff --git a/e2e/testcases/custom_resources_test.go b/e2e/testcases/custom_resources_test.go index bf25744770..3d13e42c1f 100644 --- a/e2e/testcases/custom_resources_test.go +++ b/e2e/testcases/custom_resources_test.go @@ -98,13 +98,16 @@ func TestCRDDeleteBeforeRemoveCustomResourceV1(t *testing.T) { nt.T.Fatal(err) } - // Resource Conflict errors from the remediator are not exposed as errors - // in the RootSync status. Instead, the error is recorded as a metric and - // logged as a warning. Then the object is refreshed from the server and - // re-enqueued for remediation. + // Resource conflict errors are recorded as status errors when the + // remediator watches are updated after an apply succeeds, but not when + // watches are updated before the apply attempt or from watch events handled + // by the remediator. So we don't expect to see a resource conflict error + // in the RootSync status until after the next apply attempt fails, which + // won't happen until the next automatic re-sync (1hr default). // - // Validate that deleting the CRD of a managed CR causes at least of of the - // following errors: + // However, we do expect the remediator to get a deletion event for each + // Anvil object after the Anvil CRD is deleted. This can be surfaced as one + // of the following errors: // - NoResourceMatchError // - NoKindMatchError // - ObjectNotFound @@ -117,7 +120,7 @@ func TestCRDDeleteBeforeRemoveCustomResourceV1(t *testing.T) { // TODO: distinguish between management conflict (unexpected manager annotation) and resource conflict (resource version change) nt.Must(nomostest.ValidateMetrics(nt, nomostest.ReconcilerErrorMetrics(nt, rootSyncLabels, firstCommitHash, metrics.ErrorSummary{ - Conflicts: 1, + Conflicts: 1, // at least 1 }))) // Reset discovery client to invalidate the cached Anvil CRD diff --git a/manifests/base/kustomization.yaml b/manifests/base/kustomization.yaml index 1897fe6574..d467cc33c8 100644 --- a/manifests/base/kustomization.yaml +++ b/manifests/base/kustomization.yaml @@ -19,6 +19,7 @@ resources: # Applying hierarchyconfig-crd.yaml allows client-side validation of the HierarchyConfig resources. - ../hierarchyconfig-crd.yaml - ../namespace-selector-crd.yaml +- ../ns-reconciler-cluster-scope-cluster-role.yaml - ../ns-reconciler-base-cluster-role.yaml - ../root-reconciler-base-cluster-role.yaml - ../otel-agent-cm.yaml diff --git a/manifests/ns-reconciler-cluster-scope-cluster-role.yaml b/manifests/ns-reconciler-cluster-scope-cluster-role.yaml new file mode 100644 index 0000000000..123340633e --- /dev/null +++ b/manifests/ns-reconciler-cluster-scope-cluster-role.yaml @@ -0,0 +1,27 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This ClusterRole is used by both root-reconcilers and ns-reconcilers. +# It includes read access for cluster-scope resources. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: configsync.gke.io:ns-reconciler:cluster-scope + labels: + configmanagement.gke.io/system: "true" + configmanagement.gke.io/arch: "csmr" +rules: +- apiGroups: ["apiextensions.k8s.io"] + resources: ["customresourcedefinitions"] + verbs: ["get","list","watch"] diff --git a/manifests/root-reconciler-base-cluster-role.yaml b/manifests/root-reconciler-base-cluster-role.yaml index 2dd1cc9e95..cf78fec6db 100644 --- a/manifests/root-reconciler-base-cluster-role.yaml +++ b/manifests/root-reconciler-base-cluster-role.yaml @@ -32,3 +32,6 @@ rules: - apiGroups: ["kpt.dev"] resources: ["resourcegroups/status"] verbs: ["*"] +- apiGroups: ["apiextensions.k8s.io"] + resources: ["customresourcedefinitions"] + verbs: ["get","list","watch"] diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index e94083085c..78f2c08336 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -37,6 +37,7 @@ import ( "kpt.dev/configsync/pkg/parse/events" "kpt.dev/configsync/pkg/reconciler/finalizer" "kpt.dev/configsync/pkg/reconciler/namespacecontroller" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/remediator" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/remediator/watch" @@ -219,10 +220,11 @@ func Run(opts Options) { klog.Fatalf("Error creating rest config for the remediator: %v", err) } + crdController := &controllers.CRDController{} conflictHandler := conflict.NewHandler() fightHandler := fight.NewHandler() - rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, decls, opts.NumWorkers) + rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, crdController, decls, opts.NumWorkers) if err != nil { klog.Fatalf("Instantiating Remediator: %v", err) } @@ -331,6 +333,12 @@ func Run(opts Options) { klog.Fatalf("Instantiating Controller Manager: %v", err) } + crdMetaController := controllers.NewCRDMetaController(crdController, mgr.GetCache(), + textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD")) + if err := crdMetaController.Register(mgr); err != nil { + klog.Fatalf("Instantiating CRD Controller: %v", err) + } + // This cancelFunc will be used by the Finalizer to stop all the other // controllers (Parser & Remediator). ctx, stopControllers := context.WithCancel(signalCtx) diff --git a/pkg/reconcilermanager/controllers/build_names.go b/pkg/reconcilermanager/controllers/build_names.go index 58025a1f83..005391c9e2 100644 --- a/pkg/reconcilermanager/controllers/build_names.go +++ b/pkg/reconcilermanager/controllers/build_names.go @@ -22,15 +22,25 @@ import ( ) const ( + // RepoSyncClusterScopeClusterRoleName is the name of the ClusterRole with + // cluster-scoped read permissions for the namespace reconciler. + // e.g. configsync.gke.io:ns-reconciler:cluster-scope + RepoSyncClusterScopeClusterRoleName = configsync.GroupName + ":" + core.NsReconcilerPrefix + ":cluster-scope" // RepoSyncBaseClusterRoleName is the namespace reconciler permissions name. // e.g. configsync.gke.io:ns-reconciler RepoSyncBaseClusterRoleName = configsync.GroupName + ":" + core.NsReconcilerPrefix // RootSyncBaseClusterRoleName is the root reconciler base ClusterRole name. // e.g. configsync.gke.io:root-reconciler RootSyncBaseClusterRoleName = configsync.GroupName + ":" + core.RootReconcilerPrefix + // RepoSyncClusterScopeClusterRoleBindingName is the name of the default + // ClusterRoleBinding created for RepoSync objects. This contains basic + // cluster-scoped permissions for RepoSync reconcilers + // (e.g. CustomResourceDefinition watch). + RepoSyncClusterScopeClusterRoleBindingName = RepoSyncClusterScopeClusterRoleName // RepoSyncBaseRoleBindingName is the name of the default RoleBinding created - // for RepoSync objects. This contains basic permissions for RepoSync reconcilers - //(e.g. RepoSync status update). + // for RepoSync objects. This contains basic namespace-scoped permissions + // for RepoSync reconcilers + // (e.g. RepoSync status update). RepoSyncBaseRoleBindingName = RepoSyncBaseClusterRoleName // RootSyncLegacyClusterRoleBindingName is the name of the legacy ClusterRoleBinding created // for RootSync objects. It is always bound to cluster-admin. diff --git a/pkg/reconcilermanager/controllers/crd_controller.go b/pkg/reconcilermanager/controllers/crd_controller.go index 46fdf157cb..36dc5063e8 100644 --- a/pkg/reconcilermanager/controllers/crd_controller.go +++ b/pkg/reconcilermanager/controllers/crd_controller.go @@ -21,125 +21,153 @@ import ( "github.com/go-logr/logr" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -// CRDHandler is called by the CRDReconciler to handle establishment of a CRD. -type CRDHandler func() error +// CRDReconcileFunc is called by the CRDMetaController to handle CRD updates. +type CRDReconcileFunc func(context.Context, *apiextensionsv1.CustomResourceDefinition) error -var _ reconcile.Reconciler = &CRDReconciler{} - -// CRDReconciler watches CRDs and calls handlers once they are established. -type CRDReconciler struct { - loggingController - - registerLock sync.Mutex - handlers map[string]CRDHandler - handledCRDs map[string]struct{} +// CRDController keeps track of CRDReconcileFuncs and calls them when the +// CRD changes. Only one reconciler is allowed per GroupKind. +type CRDController struct { + lock sync.RWMutex + reconcilers map[schema.GroupKind]CRDReconcileFunc } -// NewCRDReconciler constructs a new CRDReconciler. -func NewCRDReconciler(log logr.Logger) *CRDReconciler { - return &CRDReconciler{ - loggingController: loggingController{ - log: log, - }, - handlers: make(map[string]CRDHandler), - handledCRDs: make(map[string]struct{}), - } -} +// SetReconciler sets the reconciler for the specified CRD. +// The reconciler will be called when the CRD becomes established. +// If the reconciler errors, it will be retried with backoff until success. +// New reconcilers with replace old reconciler for the same GroupKind. +func (s *CRDController) SetReconciler(gk schema.GroupKind, crdHandler CRDReconcileFunc) { + s.lock.Lock() + defer s.lock.Unlock() -// SetCRDHandler adds an handler for the specified CRD. -// The handler will be called when the CRD becomes established. -// If the handler errors, it will be retried with backoff until success. -// One the handler succeeds, it will not be called again, unless SetCRDHandler -// is called again. -func (r *CRDReconciler) SetCRDHandler(crdName string, crdHandler CRDHandler) { - r.registerLock.Lock() - defer r.registerLock.Unlock() - - r.handlers[crdName] = crdHandler - delete(r.handledCRDs, crdName) + if s.reconcilers == nil { + s.reconcilers = make(map[schema.GroupKind]CRDReconcileFunc) + } + s.reconcilers[gk] = crdHandler } -// Reconcile the otel ConfigMap and update the Deployment annotation. -func (r *CRDReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - crdName := req.Name - ctx = r.setLoggerValues(ctx, "crd", crdName) +// DeleteReconciler removes the reconciler for the specified CRD. +func (s *CRDController) DeleteReconciler(gk schema.GroupKind) { + s.lock.Lock() + defer s.lock.Unlock() - r.registerLock.Lock() - defer r.registerLock.Unlock() + if s.reconcilers != nil { + delete(s.reconcilers, gk) + } +} - handler, found := r.handlers[crdName] - if !found { +// Reconcile calls the CRDReconcileFunc registered for this CRD by GroupKind. +func (s *CRDController) Reconcile(ctx context.Context, gk schema.GroupKind, crd *apiextensionsv1.CustomResourceDefinition) error { + handler := s.getHandler(gk) + if handler == nil { // No handler for this CRD - return reconcile.Result{}, nil + return nil } - if _, handled := r.handledCRDs[crdName]; handled { - // Already handled - return reconcile.Result{}, nil + // Handler must not be called inside the lock or it could deadlock. + if err := handler(ctx, crd); err != nil { + return fmt.Errorf("reconciling CRD: %s: %w", gk, err) } + return nil +} - r.logger(ctx).V(3).Info("reconciling CRD", "crd", crdName) +func (s *CRDController) getHandler(gk schema.GroupKind) CRDReconcileFunc { + s.lock.RLock() + defer s.lock.RUnlock() - if err := handler(); err != nil { - // Retry with backoff - return reconcile.Result{}, - fmt.Errorf("reconciling CRD %s: %w", crdName, err) + handler, found := s.reconcilers[gk] + if !found { + return nil } - // Mark CRD as handled - r.handledCRDs[crdName] = struct{}{} - - r.logger(ctx).V(3).Info("reconciling CRD successful", "crd", crdName) - - return controllerruntime.Result{}, nil + return handler } -// Register the CRD controller with reconciler-manager. -func (r *CRDReconciler) Register(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr). - For(&apiextensionsv1.CustomResourceDefinition{}, - builder.WithPredicates( - ignoreDeletesPredicate(), - crdIsEstablishedPredicate())). - Complete(r) +// CRDMetaController watches CRDs and delegates reconciliation to a CRDControllerManager. +type CRDMetaController struct { + loggingController + cache cache.Cache + delegate *CRDController + observedResources map[schema.GroupResource]schema.GroupKind } -// ignoreDeletesPredicate returns a predicate that handles CREATE, UPDATE, and -// GENERIC events, but not DELETE events. -func ignoreDeletesPredicate() predicate.Predicate { - return predicate.Funcs{ - DeleteFunc: func(_ event.DeleteEvent) bool { - return false +var _ reconcile.Reconciler = &CRDMetaController{} + +// NewCRDMetaController constructs a new CRDMetaController. +func NewCRDMetaController(delegate *CRDController, cache cache.Cache, log logr.Logger) *CRDMetaController { + return &CRDMetaController{ + loggingController: loggingController{ + log: log, }, + cache: cache, + delegate: delegate, + observedResources: make(map[schema.GroupResource]schema.GroupKind), } } -// crdIsEstablishedPredicate returns a predicate that only processes events for -// established CRDs. -func crdIsEstablishedPredicate() predicate.Predicate { - return predicate.NewPredicateFuncs(func(obj client.Object) bool { - if crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition); ok { - return crdIsEstablished(crd) - } - return false - }) -} +// Reconcile checks is the CRD exists and delegates to the CRDController to +// reconcile the update. +func (r *CRDMetaController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + crdName := req.Name + ctx = r.setLoggerValues(ctx, "crd", crdName) -// crdIsEstablished returns true if the given CRD is established on the cluster, -// which indicates if discovery knows about it yet. For more info see -// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#create-a-customresourcedefinition -func crdIsEstablished(crd *apiextensionsv1.CustomResourceDefinition) bool { - for _, condition := range crd.Status.Conditions { - if condition.Type == v1.Established && condition.Status == v1.ConditionTrue { - return true + // Established if CRD exists and .status.conditions[type="Established"].status = "True" + var kind schema.GroupKind + crdObj := &apiextensionsv1.CustomResourceDefinition{} + if err := r.cache.Get(ctx, req.NamespacedName, crdObj); err != nil { + switch { + // Should never run into NoMatchFound, since CRD is a built-in resource. + case apierrors.IsNotFound(err): + // Lookup last known GroupKind for the CRD name + resource := schema.ParseGroupResource(crdName) + var found bool + kind, found = r.observedResources[resource] + if !found { + // No retry possible + return reconcile.Result{}, reconcile.TerminalError( + fmt.Errorf("failed to handle CRD update for deleted resource: %w", + &meta.NoResourceMatchError{PartialResource: resource.WithVersion("")})) + } + crdObj = nil + default: + // Retry with backoff + return reconcile.Result{}, + fmt.Errorf("getting CRD from cache: %s: %w", crdName, err) + } + } else { + kind = schema.GroupKind{ + Group: crdObj.Spec.Group, + Kind: crdObj.Spec.Names.Kind, + } + resource := schema.GroupResource{ + Group: crdObj.Spec.Group, + Resource: crdObj.Spec.Names.Plural, } + // Cache last known mapping from GroupResource to GroupKind. + // This lets us lookup the GroupKind using the CRD name. + r.observedResources[resource] = kind } - return false + + r.logger(ctx).Info("CRDMetaController handling CRD status update", "name", crdName) + + if err := r.delegate.Reconcile(ctx, kind, crdObj); err != nil { + // Retry with backoff + return reconcile.Result{}, err + } + + r.logger(ctx).V(3).Info("CRDMetaController handled CRD status update", "name", crdName) + + return reconcile.Result{}, nil +} + +// Register the CRDMetaController with the ReconcilerManager. +func (r *CRDMetaController) Register(mgr controllerruntime.Manager) error { + return controllerruntime.NewControllerManagedBy(mgr). + For(&apiextensionsv1.CustomResourceDefinition{}). + Complete(r) } diff --git a/pkg/reconcilermanager/controllers/garbage_collector.go b/pkg/reconcilermanager/controllers/garbage_collector.go index bae1e0a0b0..8b6d5a43dc 100644 --- a/pkg/reconcilermanager/controllers/garbage_collector.go +++ b/pkg/reconcilermanager/controllers/garbage_collector.go @@ -237,9 +237,9 @@ func (r *reconcilerBase) deleteDeployment(ctx context.Context, reconcilerRef typ return r.cleanup(ctx, d) } -func (r *RootSyncReconciler) deleteSharedClusterRoleBinding(ctx context.Context, name string, reconcilerRef types.NamespacedName) error { +func (r *reconcilerBase) deleteSharedClusterRoleBinding(ctx context.Context, name string, reconcilerRef types.NamespacedName) error { crbKey := client.ObjectKey{Name: name} - // Update the CRB to delete the subject for the deleted RootSync's reconciler + // Update the CRB to delete the subject for the deleted reconciler crb := &rbacv1.ClusterRoleBinding{} if err := r.client.Get(ctx, crbKey, crb); err != nil { if apierrors.IsNotFound(err) { diff --git a/pkg/reconcilermanager/controllers/reconciler_base.go b/pkg/reconcilermanager/controllers/reconciler_base.go index 281dd1cce7..1cf040636d 100644 --- a/pkg/reconcilermanager/controllers/reconciler_base.go +++ b/pkg/reconcilermanager/controllers/reconciler_base.go @@ -700,6 +700,35 @@ func (r *reconcilerBase) updateRBACBinding(ctx context.Context, reconcilerRef, r return nil } +func (r *reconcilerBase) upsertSharedClusterRoleBinding(ctx context.Context, name, clusterRole string, reconcilerRef, rsRef types.NamespacedName) error { + crbRef := client.ObjectKey{Name: name} + childCRB := &rbacv1.ClusterRoleBinding{} + childCRB.Name = crbRef.Name + + labelMap := ManagedObjectLabelMap(r.syncKind, rsRef) + // Remove sync-name label since the ClusterRoleBinding may be shared + delete(labelMap, metadata.SyncNameLabel) + + op, err := CreateOrUpdate(ctx, r.client, childCRB, func() error { + core.AddLabels(childCRB, labelMap) + childCRB.RoleRef = rolereference(clusterRole, "ClusterRole") + childCRB.Subjects = addSubject(childCRB.Subjects, r.serviceAccountSubject(reconcilerRef)) + // Remove any existing OwnerReferences, now that we're using finalizers. + childCRB.OwnerReferences = nil + return nil + }) + if err != nil { + return err + } + if op != controllerutil.OperationResultNone { + r.logger(ctx).Info("Managed object upsert successful", + logFieldObjectRef, crbRef.String(), + logFieldObjectKind, "ClusterRoleBinding", + logFieldOperation, op) + } + return nil +} + func (r *reconcilerBase) isKnownHostsEnabled(auth configsync.AuthType) bool { if auth == configsync.AuthSSH && r.knownHostExist { return true diff --git a/pkg/reconcilermanager/controllers/reposync_controller.go b/pkg/reconcilermanager/controllers/reposync_controller.go index ee98dd0334..c77fc583c2 100644 --- a/pkg/reconcilermanager/controllers/reposync_controller.go +++ b/pkg/reconcilermanager/controllers/reposync_controller.go @@ -74,7 +74,7 @@ type RepoSyncReconciler struct { // configMapWatches stores which namespaces where we are currently watching ConfigMaps configMapWatches map[string]bool - controller *controller.Controller + controller controller.Controller cache cache.Cache } @@ -260,11 +260,16 @@ func (r *RepoSyncReconciler) upsertManagedObjects(ctx context.Context, reconcile return fmt.Errorf("upserting service account: %w", err) } - // Overwrite reconciler rolebinding. + // Namespace-scoped read/write permissions if _, err := r.upsertSharedRoleBinding(ctx, reconcilerRef, rsRef); err != nil { return fmt.Errorf("upserting role binding: %w", err) } + // Cluster-scoped read permissions + if err := r.upsertSharedClusterRoleBinding(ctx, RepoSyncClusterScopeClusterRoleBindingName, RepoSyncClusterScopeClusterRoleName, reconcilerRef, rsRef); err != nil { + return fmt.Errorf("upserting role binding: %w", err) + } + if err := r.upsertHelmConfigMaps(ctx, rs, labelMap); err != nil { return fmt.Errorf("upserting helm config maps: %w", err) } @@ -459,6 +464,10 @@ func (r *RepoSyncReconciler) deleteManagedObjects(ctx context.Context, reconcile return fmt.Errorf("deleting role binding: %w", err) } + if err := r.deleteSharedClusterRoleBinding(ctx, RepoSyncClusterScopeClusterRoleBindingName, reconcilerRef); err != nil { + return fmt.Errorf("deleting cluster role binding: %w", err) + } + if err := r.deleteHelmConfigMapCopies(ctx, rsRef, nil); err != nil { return fmt.Errorf("deleting helm config maps: %w", err) } @@ -472,6 +481,14 @@ func (r *RepoSyncReconciler) deleteManagedObjects(ctx context.Context, reconcile // Register RepoSync controller with reconciler-manager. func (r *RepoSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetMembership bool) error { + r.lock.Lock() + defer r.lock.Unlock() + + // Avoid re-registering the controller + if r.controller != nil { + return nil + } + controllerBuilder := controllerruntime.NewControllerManagedBy(mgr). WithOptions(controller.Options{ MaxConcurrentReconciles: 1, @@ -504,7 +521,7 @@ func (r *RepoSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetM } ctrlr, err := controllerBuilder.Build(r) - r.controller = &ctrlr + r.controller = ctrlr r.cache = mgr.GetCache() return err } @@ -524,7 +541,7 @@ func (r *RepoSyncReconciler) watchConfigMaps(rs *v1beta1.RepoSync) error { if _, ok := r.configMapWatches[rs.Namespace]; !ok { klog.Infoln("Adding watch for ConfigMaps in namespace ", rs.Namespace) - ctrlr := *r.controller + ctrlr := r.controller if err := ctrlr.Watch(source.Kind(r.cache, withNamespace(&corev1.ConfigMap{}, rs.Namespace), handler.EnqueueRequestsFromMapFunc(r.mapConfigMapToRepoSyncs), diff --git a/pkg/reconcilermanager/controllers/rootsync_controller.go b/pkg/reconcilermanager/controllers/rootsync_controller.go index 6a2c8f0094..cdb7830ec0 100644 --- a/pkg/reconcilermanager/controllers/rootsync_controller.go +++ b/pkg/reconcilermanager/controllers/rootsync_controller.go @@ -79,6 +79,8 @@ type RootSyncReconciler struct { reconcilerBase lock sync.Mutex + + controller controller.Controller } // NewRootSyncReconciler returns a new RootSyncReconciler. @@ -416,6 +418,14 @@ func (r *RootSyncReconciler) deleteManagedObjects(ctx context.Context, reconcile // Register RootSync controller with reconciler-manager. func (r *RootSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetMembership bool) error { + r.lock.Lock() + defer r.lock.Unlock() + + // Avoid re-registering the controller + if r.controller != nil { + return nil + } + controllerBuilder := controllerruntime.NewControllerManagedBy(mgr). WithOptions(controller.Options{ MaxConcurrentReconciles: 1, @@ -451,7 +461,10 @@ func (r *RootSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetM handler.EnqueueRequestsFromMapFunc(r.mapMembershipToRootSyncs), builder.WithPredicates(predicate.GenerationChangedPredicate{})) } - return controllerBuilder.Complete(r) + + ctrlr, err := controllerBuilder.Build(r) + r.controller = ctrlr + return err } func withNamespace(obj client.Object, ns string) client.Object { @@ -1005,7 +1018,7 @@ func (r *RootSyncReconciler) manageRBACBindings(ctx context.Context, reconcilerR } return nil } - // Add the base ClusterRole for basic root reconciler functionality + // Minimum required cluster-scoped & all-namespace read/write permissions if err := r.upsertSharedClusterRoleBinding(ctx, RootSyncBaseClusterRoleBindingName, RootSyncBaseClusterRoleName, reconcilerRef, rsRef); err != nil { return err } @@ -1048,36 +1061,6 @@ func (r *RootSyncReconciler) manageRBACBindings(ctx context.Context, reconcilerR return nil } -func (r *RootSyncReconciler) upsertSharedClusterRoleBinding(ctx context.Context, name, clusterRole string, reconcilerRef, rsRef types.NamespacedName) error { - crbRef := client.ObjectKey{Name: name} - childCRB := &rbacv1.ClusterRoleBinding{} - childCRB.Name = crbRef.Name - - labelMap := ManagedObjectLabelMap(r.syncKind, rsRef) - // Remove sync-name label since the ClusterRoleBinding may be shared - delete(labelMap, metadata.SyncNameLabel) - - op, err := CreateOrUpdate(ctx, r.client, childCRB, func() error { - core.AddLabels(childCRB, labelMap) - childCRB.OwnerReferences = nil - childCRB.RoleRef = rolereference(clusterRole, "ClusterRole") - childCRB.Subjects = addSubject(childCRB.Subjects, r.serviceAccountSubject(reconcilerRef)) - // Remove existing OwnerReferences, now that we're using finalizers. - childCRB.OwnerReferences = nil - return nil - }) - if err != nil { - return err - } - if op != controllerutil.OperationResultNone { - r.logger(ctx).Info("Managed object upsert successful", - logFieldObjectRef, crbRef.String(), - logFieldObjectKind, "ClusterRoleBinding", - logFieldOperation, op) - } - return nil -} - func (r *RootSyncReconciler) createRBACBinding(ctx context.Context, reconcilerRef, rsRef types.NamespacedName, roleRef v1beta1.RootSyncRoleRef) (client.ObjectKey, error) { var binding client.Object if roleRef.Namespace == "" { diff --git a/pkg/remediator/remediator.go b/pkg/remediator/remediator.go index 4beb1af4cc..982f63a38c 100644 --- a/pkg/remediator/remediator.go +++ b/pkg/remediator/remediator.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/declared" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/remediator/queue" "kpt.dev/configsync/pkg/remediator/reconcile" @@ -101,6 +102,7 @@ func New( applier syncerreconcile.Applier, conflictHandler conflict.Handler, fightHandler fight.Handler, + crdController *controllers.CRDController, decls *declared.Resources, numWorkers int, ) (*Remediator, error) { @@ -117,7 +119,7 @@ func New( conflictHandler: conflictHandler, } - watchMgr, err := watch.NewManager(scope, syncName, cfg, q, decls, nil, conflictHandler) + watchMgr, err := watch.NewManager(scope, syncName, cfg, q, decls, nil, conflictHandler, crdController) if err != nil { return nil, fmt.Errorf("creating watch manager: %w", err) } diff --git a/pkg/remediator/watch/manager.go b/pkg/remediator/watch/manager.go index 6ee3629c8f..6c99a12d27 100644 --- a/pkg/remediator/watch/manager.go +++ b/pkg/remediator/watch/manager.go @@ -20,6 +20,7 @@ import ( "sync" "golang.org/x/exp/maps" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" @@ -29,10 +30,12 @@ import ( "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/metrics" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/remediator/queue" "kpt.dev/configsync/pkg/status" syncerclient "kpt.dev/configsync/pkg/syncer/client" + "kpt.dev/configsync/pkg/util/customresource" ) // Manager accepts new resource lists that are parsed from Git and then @@ -59,6 +62,10 @@ type Manager struct { // mapper is the RESTMapper used by the watcherFactory mapper meta.RESTMapper + conflictHandler conflict.Handler + + crdController *controllers.CRDController + // labelSelector filters watches labelSelector labels.Selector @@ -69,8 +76,7 @@ type Manager struct { // watcherMap maps GVKs to their associated watchers watcherMap map[schema.GroupVersionKind]Runnable // needsUpdate indicates if the Manager's watches need to be updated. - needsUpdate bool - conflictHandler conflict.Handler + needsUpdate bool } // Options contains options for creating a watch manager. @@ -95,7 +101,8 @@ func DefaultOptions(cfg *rest.Config) (*Options, error) { // NewManager starts a new watch manager func NewManager(scope declared.Scope, syncName string, cfg *rest.Config, - q *queue.ObjectQueue, decls *declared.Resources, options *Options, ch conflict.Handler) (*Manager, error) { + q *queue.ObjectQueue, decls *declared.Resources, options *Options, + ch conflict.Handler, crdController *controllers.CRDController) (*Manager, error) { if options == nil { var err error options, err = DefaultOptions(cfg) @@ -120,6 +127,7 @@ func NewManager(scope declared.Scope, syncName string, cfg *rest.Config, labelSelector: labelSelector, queue: q, conflictHandler: ch, + crdController: crdController, }, nil } @@ -155,20 +163,21 @@ func (m *Manager) AddWatches(ctx context.Context, gvkMap map[schema.GroupVersion // Start new watchers var errs status.MultiError for gvk := range gvkMap { + // Update the CRD Observer + m.startWatchingCRD(gvk, commit) // Skip watchers that are already started if w, isWatched := m.watcherMap[gvk]; isWatched { w.SetLatestCommit(commit) continue } // Only start watcher if the resource exists. - // Watch will be started later by UpdateWatches, after the applier succeeds. - // TODO: register pending resources and start watching them when the CRD is established + // Pending watches will be started later by the CRD Observer or UpdateWatches. if _, err := m.mapper.RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { switch { case meta.IsNoMatchError(err): statusErr := syncerclient.ConflictWatchResourceDoesNotExist(err, gvk) klog.Infof("Remediator skipped starting resource watch: "+ - "%v. The remediator will start the resource watch after the sync has succeeded.", statusErr) + "%v. The remediator will start the resource watch after the CRD is established.", statusErr) // This is expected behavior before a sync attempt. // It likely means a CR and CRD are in the same ApplySet. // So don't record a resource conflict metric or return an error here. @@ -221,26 +230,30 @@ func (m *Manager) UpdateWatches(ctx context.Context, gvkMap map[schema.GroupVers // Remove all conflict errors for objects with the same GK because // the objects are no longer managed by the reconciler. m.conflictHandler.ClearConflictErrorsWithKind(gvk.GroupKind()) + // Update the CRD Observer + m.stopWatchingCRD(gvk) } } // Start new watchers var errs status.MultiError for gvk := range gvkMap { + // Update the CRD Observer + m.startWatchingCRD(gvk, commit) // Skip watchers that are already started if w, isWatched := m.watcherMap[gvk]; isWatched { w.SetLatestCommit(commit) continue } // Only start watcher if the resource exists. - // All resources should exist and be established by now. If not, error. + // Pending watches will be started later by the CRD Observer or UpdateWatches. if _, err := m.mapper.RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { switch { case meta.IsNoMatchError(err): statusErr := syncerclient.ConflictWatchResourceDoesNotExist(err, gvk) klog.Warningf("Remediator encountered a resource conflict: "+ "%v. To resolve the conflict, the remediator will enqueue a resync "+ - "and restart the resource watch after the sync has succeeded.", statusErr) + "and restart the resource watch after the CRD is established.", statusErr) // This is unexpected behavior after a successful sync. // It likely means that some other controller deleted managed objects shortly after they were applied. // So record a resource conflict metric and return an error. @@ -327,3 +340,23 @@ func (m *Manager) stopWatcher(gvk schema.GroupVersionKind) { w.Stop() delete(m.watcherMap, gvk) } + +func (m *Manager) startWatchingCRD(gvk schema.GroupVersionKind, commit string) { + m.crdController.SetReconciler(gvk.GroupKind(), func(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition) error { + gvkMap := map[schema.GroupVersionKind]struct{}{ + gvk: {}, + } + if customresource.IsEstablished(crd) { + return m.AddWatches(ctx, gvkMap, commit) + } + // Don't stop watching until UpdateWatches is called by the Updater, + // confirming that the object has been removed from the ApplySet. + // Otherwise we might miss object deletion events due to the CRD having + // been deleted. + return nil + }) +} + +func (m *Manager) stopWatchingCRD(gvk schema.GroupVersionKind) { + m.crdController.DeleteReconciler(gvk.GroupKind()) +} diff --git a/pkg/remediator/watch/manager_test.go b/pkg/remediator/watch/manager_test.go index 7a5c54d121..053f209b82 100644 --- a/pkg/remediator/watch/manager_test.go +++ b/pkg/remediator/watch/manager_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/kinds" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/status" syncerclient "kpt.dev/configsync/pkg/syncer/client" "kpt.dev/configsync/pkg/syncer/syncertest/fake" @@ -241,7 +242,8 @@ func TestManager_AddWatches(t *testing.T) { watcherFactory: testRunnables(tc.failedWatchers, currentCommit), mapper: fakeMapper, } - m, err := NewManager(":test", "rs", nil, nil, &declared.Resources{}, options, fake.NewConflictHandler()) + m, err := NewManager(":test", "rs", nil, nil, &declared.Resources{}, + options, fake.NewConflictHandler(), &controllers.CRDController{}) if err != nil { t.Fatal(err) } @@ -418,7 +420,8 @@ func TestManager_UpdateWatches(t *testing.T) { watcherFactory: testRunnables(tc.failedWatchers, currentCommit), mapper: fakeMapper, } - m, err := NewManager(":test", "rs", nil, nil, &declared.Resources{}, options, fake.NewConflictHandler()) + m, err := NewManager(":test", "rs", nil, nil, &declared.Resources{}, + options, fake.NewConflictHandler(), &controllers.CRDController{}) if err != nil { t.Fatal(err) } diff --git a/pkg/util/customresource/customresource.go b/pkg/util/customresource/customresource.go new file mode 100644 index 0000000000..4c3e2b0cee --- /dev/null +++ b/pkg/util/customresource/customresource.go @@ -0,0 +1,32 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package customresource + +import apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + +// IsEstablished returns true if the given CRD is established on the cluster, +// which indicates if discovery knows about it yet. For more info see +// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#create-a-customresourcedefinition +func IsEstablished(crd *apiextensionsv1.CustomResourceDefinition) bool { + if crd == nil { + return false + } + for _, condition := range crd.Status.Conditions { + if condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue { + return true + } + } + return false +} diff --git a/pkg/util/customresource/customresource_test.go b/pkg/util/customresource/customresource_test.go new file mode 100644 index 0000000000..fefaa1a205 --- /dev/null +++ b/pkg/util/customresource/customresource_test.go @@ -0,0 +1,74 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package customresource + +import ( + "testing" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" +) + +func TestIsEstablished(t *testing.T) { + tests := []struct { + name string + crd *apiextensionsv1.CustomResourceDefinition + established bool + }{ + { + name: "not found", + crd: nil, + established: false, + }, + { + name: "established", + crd: &apiextensionsv1.CustomResourceDefinition{ + Status: apiextensionsv1.CustomResourceDefinitionStatus{ + Conditions: []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.Established, + Status: apiextensionsv1.ConditionTrue, + }, + }, + }, + }, + established: true, + }, + { + name: "not established", + crd: &apiextensionsv1.CustomResourceDefinition{ + Status: apiextensionsv1.CustomResourceDefinitionStatus{ + Conditions: []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.Established, + Status: apiextensionsv1.ConditionFalse, + }, + }, + }, + }, + established: false, + }, + { + name: "not observed", + crd: &apiextensionsv1.CustomResourceDefinition{}, + established: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := IsEstablished(tc.crd); got != tc.established { + t.Errorf("IsEstablished() = %v, want %v", got, tc.established) + } + }) + } +}