diff --git a/cmd/reconciler-manager/main.go b/cmd/reconciler-manager/main.go index 6f4a55bbd5..0f48e638b8 100644 --- a/cmd/reconciler-manager/main.go +++ b/cmd/reconciler-manager/main.go @@ -21,6 +21,7 @@ import ( "os" "github.com/go-logr/logr" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +34,7 @@ import ( "kpt.dev/configsync/pkg/profiler" "kpt.dev/configsync/pkg/reconcilermanager" "kpt.dev/configsync/pkg/reconcilermanager/controllers" + "kpt.dev/configsync/pkg/util/customresource" "kpt.dev/configsync/pkg/util/log" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -95,9 +97,10 @@ func main() { } watchFleetMembership := fleetMembershipCRDExists(dynamicClient, mgr.GetRESTMapper(), &setupLog) - crdController := controllers.NewCRDReconciler( + crdController := &controllers.CRDController{} + crdMetaController := controllers.NewCRDMetaController(crdController, mgr.GetCache(), textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD")) - if err := crdController.Register(mgr); err != nil { + if err := crdMetaController.Register(mgr); err != nil { setupLog.Error(err, "failed to register controller", "controller", "CRD") os.Exit(1) } @@ -108,11 +111,15 @@ func main() { mgr.GetClient(), watcher, dynamicClient, textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName(configsync.RepoSyncKind), mgr.GetScheme()) - crdController.SetCRDHandler(configsync.RepoSyncCRDName, func() error { - if err := repoSyncController.Register(mgr, watchFleetMembership); err != nil { - return fmt.Errorf("registering %s controller: %w", configsync.RepoSyncKind, err) + crdController.SetReconciler(kinds.RepoSyncV1Beta1().GroupKind(), func(_ context.Context, crd *apiextensionsv1.CustomResourceDefinition) error { + if customresource.IsEstablished(crd) { + if err := repoSyncController.Register(mgr, watchFleetMembership); err != nil { + return fmt.Errorf("registering %s controller: %w", configsync.RepoSyncKind, err) + } + setupLog.Info("RepoSync controller registration successful") } - setupLog.Info("RepoSync controller registration successful") + // Don't stop the RepoSync controller when its CRD is deleted, + // otherwise we may miss RepoSync object deletion events. return nil }) setupLog.Info("RepoSync controller registration scheduled") @@ -122,11 +129,15 @@ func main() { mgr.GetClient(), watcher, dynamicClient, textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName(configsync.RootSyncKind), mgr.GetScheme()) - crdController.SetCRDHandler(configsync.RootSyncCRDName, func() error { - if err := rootSyncController.Register(mgr, watchFleetMembership); err != nil { - return fmt.Errorf("registering %s controller: %w", configsync.RootSyncKind, err) + crdController.SetReconciler(kinds.RootSyncV1Beta1().GroupKind(), func(_ context.Context, crd *apiextensionsv1.CustomResourceDefinition) error { + if customresource.IsEstablished(crd) { + if err := rootSyncController.Register(mgr, watchFleetMembership); err != nil { + return fmt.Errorf("registering %s controller: %w", configsync.RootSyncKind, err) + } + setupLog.Info("RootSync controller registration successful") } - setupLog.Info("RootSync controller registration successful") + // Don't stop the RootSync controller when its CRD is deleted, + // otherwise we may miss RootSync object deletion events. return nil }) setupLog.Info("RootSync controller registration scheduled") diff --git a/e2e/testcases/custom_resources_test.go b/e2e/testcases/custom_resources_test.go index bf25744770..3d13e42c1f 100644 --- a/e2e/testcases/custom_resources_test.go +++ b/e2e/testcases/custom_resources_test.go @@ -98,13 +98,16 @@ func TestCRDDeleteBeforeRemoveCustomResourceV1(t *testing.T) { nt.T.Fatal(err) } - // Resource Conflict errors from the remediator are not exposed as errors - // in the RootSync status. Instead, the error is recorded as a metric and - // logged as a warning. Then the object is refreshed from the server and - // re-enqueued for remediation. + // Resource conflict errors are recorded as status errors when the + // remediator watches are updated after an apply succeeds, but not when + // watches are updated before the apply attempt or from watch events handled + // by the remediator. So we don't expect to see a resource conflict error + // in the RootSync status until after the next apply attempt fails, which + // won't happen until the next automatic re-sync (1hr default). // - // Validate that deleting the CRD of a managed CR causes at least of of the - // following errors: + // However, we do expect the remediator to get a deletion event for each + // Anvil object after the Anvil CRD is deleted. This can be surfaced as one + // of the following errors: // - NoResourceMatchError // - NoKindMatchError // - ObjectNotFound @@ -117,7 +120,7 @@ func TestCRDDeleteBeforeRemoveCustomResourceV1(t *testing.T) { // TODO: distinguish between management conflict (unexpected manager annotation) and resource conflict (resource version change) nt.Must(nomostest.ValidateMetrics(nt, nomostest.ReconcilerErrorMetrics(nt, rootSyncLabels, firstCommitHash, metrics.ErrorSummary{ - Conflicts: 1, + Conflicts: 1, // at least 1 }))) // Reset discovery client to invalidate the cached Anvil CRD diff --git a/manifests/base/kustomization.yaml b/manifests/base/kustomization.yaml index 1897fe6574..d467cc33c8 100644 --- a/manifests/base/kustomization.yaml +++ b/manifests/base/kustomization.yaml @@ -19,6 +19,7 @@ resources: # Applying hierarchyconfig-crd.yaml allows client-side validation of the HierarchyConfig resources. - ../hierarchyconfig-crd.yaml - ../namespace-selector-crd.yaml +- ../ns-reconciler-cluster-scope-cluster-role.yaml - ../ns-reconciler-base-cluster-role.yaml - ../root-reconciler-base-cluster-role.yaml - ../otel-agent-cm.yaml diff --git a/manifests/ns-reconciler-cluster-scope-cluster-role.yaml b/manifests/ns-reconciler-cluster-scope-cluster-role.yaml new file mode 100644 index 0000000000..123340633e --- /dev/null +++ b/manifests/ns-reconciler-cluster-scope-cluster-role.yaml @@ -0,0 +1,27 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This ClusterRole is used by both root-reconcilers and ns-reconcilers. +# It includes read access for cluster-scope resources. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: configsync.gke.io:ns-reconciler:cluster-scope + labels: + configmanagement.gke.io/system: "true" + configmanagement.gke.io/arch: "csmr" +rules: +- apiGroups: ["apiextensions.k8s.io"] + resources: ["customresourcedefinitions"] + verbs: ["get","list","watch"] diff --git a/manifests/root-reconciler-base-cluster-role.yaml b/manifests/root-reconciler-base-cluster-role.yaml index 2dd1cc9e95..cf78fec6db 100644 --- a/manifests/root-reconciler-base-cluster-role.yaml +++ b/manifests/root-reconciler-base-cluster-role.yaml @@ -32,3 +32,6 @@ rules: - apiGroups: ["kpt.dev"] resources: ["resourcegroups/status"] verbs: ["*"] +- apiGroups: ["apiextensions.k8s.io"] + resources: ["customresourcedefinitions"] + verbs: ["get","list","watch"] diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index e94083085c..78f2c08336 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -37,6 +37,7 @@ import ( "kpt.dev/configsync/pkg/parse/events" "kpt.dev/configsync/pkg/reconciler/finalizer" "kpt.dev/configsync/pkg/reconciler/namespacecontroller" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/remediator" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/remediator/watch" @@ -219,10 +220,11 @@ func Run(opts Options) { klog.Fatalf("Error creating rest config for the remediator: %v", err) } + crdController := &controllers.CRDController{} conflictHandler := conflict.NewHandler() fightHandler := fight.NewHandler() - rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, decls, opts.NumWorkers) + rem, err := remediator.New(opts.ReconcilerScope, opts.SyncName, cfgForWatch, baseApplier, conflictHandler, fightHandler, crdController, decls, opts.NumWorkers) if err != nil { klog.Fatalf("Instantiating Remediator: %v", err) } @@ -331,6 +333,12 @@ func Run(opts Options) { klog.Fatalf("Instantiating Controller Manager: %v", err) } + crdMetaController := controllers.NewCRDMetaController(crdController, mgr.GetCache(), + textlogger.NewLogger(textlogger.NewConfig()).WithName("controllers").WithName("CRD")) + if err := crdMetaController.Register(mgr); err != nil { + klog.Fatalf("Instantiating CRD Controller: %v", err) + } + // This cancelFunc will be used by the Finalizer to stop all the other // controllers (Parser & Remediator). ctx, stopControllers := context.WithCancel(signalCtx) diff --git a/pkg/reconcilermanager/controllers/build_names.go b/pkg/reconcilermanager/controllers/build_names.go index 58025a1f83..005391c9e2 100644 --- a/pkg/reconcilermanager/controllers/build_names.go +++ b/pkg/reconcilermanager/controllers/build_names.go @@ -22,15 +22,25 @@ import ( ) const ( + // RepoSyncClusterScopeClusterRoleName is the name of the ClusterRole with + // cluster-scoped read permissions for the namespace reconciler. + // e.g. configsync.gke.io:ns-reconciler:cluster-scope + RepoSyncClusterScopeClusterRoleName = configsync.GroupName + ":" + core.NsReconcilerPrefix + ":cluster-scope" // RepoSyncBaseClusterRoleName is the namespace reconciler permissions name. // e.g. configsync.gke.io:ns-reconciler RepoSyncBaseClusterRoleName = configsync.GroupName + ":" + core.NsReconcilerPrefix // RootSyncBaseClusterRoleName is the root reconciler base ClusterRole name. // e.g. configsync.gke.io:root-reconciler RootSyncBaseClusterRoleName = configsync.GroupName + ":" + core.RootReconcilerPrefix + // RepoSyncClusterScopeClusterRoleBindingName is the name of the default + // ClusterRoleBinding created for RepoSync objects. This contains basic + // cluster-scoped permissions for RepoSync reconcilers + // (e.g. CustomResourceDefinition watch). + RepoSyncClusterScopeClusterRoleBindingName = RepoSyncClusterScopeClusterRoleName // RepoSyncBaseRoleBindingName is the name of the default RoleBinding created - // for RepoSync objects. This contains basic permissions for RepoSync reconcilers - //(e.g. RepoSync status update). + // for RepoSync objects. This contains basic namespace-scoped permissions + // for RepoSync reconcilers + // (e.g. RepoSync status update). RepoSyncBaseRoleBindingName = RepoSyncBaseClusterRoleName // RootSyncLegacyClusterRoleBindingName is the name of the legacy ClusterRoleBinding created // for RootSync objects. It is always bound to cluster-admin. diff --git a/pkg/reconcilermanager/controllers/crd_controller.go b/pkg/reconcilermanager/controllers/crd_controller.go index 46fdf157cb..2c54377537 100644 --- a/pkg/reconcilermanager/controllers/crd_controller.go +++ b/pkg/reconcilermanager/controllers/crd_controller.go @@ -21,125 +21,154 @@ import ( "github.com/go-logr/logr" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -// CRDHandler is called by the CRDReconciler to handle establishment of a CRD. -type CRDHandler func() error +// CRDReconcileFunc is called by the CRDMetaController to handle CRD updates. +type CRDReconcileFunc func(context.Context, *apiextensionsv1.CustomResourceDefinition) error -var _ reconcile.Reconciler = &CRDReconciler{} +// CRDController keeps track of CRDReconcileFuncs and calls them when the +// CRD changes. Only one reconciler is allowed per GroupKind. +type CRDController struct { + lock sync.RWMutex + reconcilers map[schema.GroupKind]CRDReconcileFunc +} -// CRDReconciler watches CRDs and calls handlers once they are established. -type CRDReconciler struct { - loggingController +// SetReconciler sets the reconciler for the specified CRD. +// The reconciler will be called when the CRD becomes established. +// If the reconciler errors, it will be retried with backoff until success. +// A new reconciler will replace any old reconciler set with the same GroupKind. +func (s *CRDController) SetReconciler(gk schema.GroupKind, crdHandler CRDReconcileFunc) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.reconcilers == nil { + s.reconcilers = make(map[schema.GroupKind]CRDReconcileFunc) + } + s.reconcilers[gk] = crdHandler +} + +// DeleteReconciler removes the reconciler for the specified CRD. +func (s *CRDController) DeleteReconciler(gk schema.GroupKind) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.reconcilers != nil { + delete(s.reconcilers, gk) + } +} + +// Reconcile calls the CRDReconcileFunc registered for this CRD by GroupKind. +func (s *CRDController) Reconcile(ctx context.Context, gk schema.GroupKind, crd *apiextensionsv1.CustomResourceDefinition) error { + // Let getReconciler handle locking to prevent deadlock in case the + // reconciler calls SetReconciler/DeleteReconciler. + reconciler := s.getReconciler(gk) + if reconciler == nil { + // No reconciler for this CRD + return nil + } + if err := reconciler(ctx, crd); err != nil { + return fmt.Errorf("reconciling CRD: %s: %w", gk, err) + } + return nil +} + +func (s *CRDController) getReconciler(gk schema.GroupKind) CRDReconcileFunc { + s.lock.RLock() + defer s.lock.RUnlock() + + reconciler, found := s.reconcilers[gk] + if !found { + return nil + } + return reconciler +} - registerLock sync.Mutex - handlers map[string]CRDHandler - handledCRDs map[string]struct{} +// CRDMetaController watches CRDs and delegates reconciliation to a CRDControllerManager. +type CRDMetaController struct { + loggingController + cache cache.Cache + delegate *CRDController + observedResources map[schema.GroupResource]schema.GroupKind } -// NewCRDReconciler constructs a new CRDReconciler. -func NewCRDReconciler(log logr.Logger) *CRDReconciler { - return &CRDReconciler{ +var _ reconcile.Reconciler = &CRDMetaController{} + +// NewCRDMetaController constructs a new CRDMetaController. +func NewCRDMetaController(delegate *CRDController, cache cache.Cache, log logr.Logger) *CRDMetaController { + return &CRDMetaController{ loggingController: loggingController{ log: log, }, - handlers: make(map[string]CRDHandler), - handledCRDs: make(map[string]struct{}), + cache: cache, + delegate: delegate, + observedResources: make(map[schema.GroupResource]schema.GroupKind), } } -// SetCRDHandler adds an handler for the specified CRD. -// The handler will be called when the CRD becomes established. -// If the handler errors, it will be retried with backoff until success. -// One the handler succeeds, it will not be called again, unless SetCRDHandler -// is called again. -func (r *CRDReconciler) SetCRDHandler(crdName string, crdHandler CRDHandler) { - r.registerLock.Lock() - defer r.registerLock.Unlock() - - r.handlers[crdName] = crdHandler - delete(r.handledCRDs, crdName) -} - -// Reconcile the otel ConfigMap and update the Deployment annotation. -func (r *CRDReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { +// Reconcile checks is the CRD exists and delegates to the CRDController to +// reconcile the update. +func (r *CRDMetaController) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { crdName := req.Name ctx = r.setLoggerValues(ctx, "crd", crdName) - r.registerLock.Lock() - defer r.registerLock.Unlock() - - handler, found := r.handlers[crdName] - if !found { - // No handler for this CRD - return reconcile.Result{}, nil - } - if _, handled := r.handledCRDs[crdName]; handled { - // Already handled - return reconcile.Result{}, nil + // Established if CRD exists and .status.conditions[type="Established"].status = "True" + var kind schema.GroupKind + crdObj := &apiextensionsv1.CustomResourceDefinition{} + if err := r.cache.Get(ctx, req.NamespacedName, crdObj); err != nil { + switch { + // Should never run into NoMatchFound, since CRD is a built-in resource. + case apierrors.IsNotFound(err): + // Lookup last known GroupKind for the CRD name + resource := schema.ParseGroupResource(crdName) + var found bool + kind, found = r.observedResources[resource] + if !found { + // No retry possible + return reconcile.Result{}, reconcile.TerminalError( + fmt.Errorf("failed to handle CRD update for deleted resource: %w", + &meta.NoResourceMatchError{PartialResource: resource.WithVersion("")})) + } + crdObj = nil + default: + // Retry with backoff + return reconcile.Result{}, + fmt.Errorf("getting CRD from cache: %s: %w", crdName, err) + } + } else { + kind = schema.GroupKind{ + Group: crdObj.Spec.Group, + Kind: crdObj.Spec.Names.Kind, + } + resource := schema.GroupResource{ + Group: crdObj.Spec.Group, + Resource: crdObj.Spec.Names.Plural, + } + // Cache last known mapping from GroupResource to GroupKind. + // This lets us lookup the GroupKind using the CRD name. + r.observedResources[resource] = kind } - r.logger(ctx).V(3).Info("reconciling CRD", "crd", crdName) + r.logger(ctx).Info("CRDMetaController handling CRD status update", "name", crdName) - if err := handler(); err != nil { + if err := r.delegate.Reconcile(ctx, kind, crdObj); err != nil { // Retry with backoff - return reconcile.Result{}, - fmt.Errorf("reconciling CRD %s: %w", crdName, err) + return reconcile.Result{}, err } - // Mark CRD as handled - r.handledCRDs[crdName] = struct{}{} - r.logger(ctx).V(3).Info("reconciling CRD successful", "crd", crdName) + r.logger(ctx).V(3).Info("CRDMetaController handled CRD status update", "name", crdName) - return controllerruntime.Result{}, nil + return reconcile.Result{}, nil } -// Register the CRD controller with reconciler-manager. -func (r *CRDReconciler) Register(mgr controllerruntime.Manager) error { +// Register the CRDMetaController with the ReconcilerManager. +func (r *CRDMetaController) Register(mgr controllerruntime.Manager) error { return controllerruntime.NewControllerManagedBy(mgr). - For(&apiextensionsv1.CustomResourceDefinition{}, - builder.WithPredicates( - ignoreDeletesPredicate(), - crdIsEstablishedPredicate())). + For(&apiextensionsv1.CustomResourceDefinition{}). Complete(r) } - -// ignoreDeletesPredicate returns a predicate that handles CREATE, UPDATE, and -// GENERIC events, but not DELETE events. -func ignoreDeletesPredicate() predicate.Predicate { - return predicate.Funcs{ - DeleteFunc: func(_ event.DeleteEvent) bool { - return false - }, - } -} - -// crdIsEstablishedPredicate returns a predicate that only processes events for -// established CRDs. -func crdIsEstablishedPredicate() predicate.Predicate { - return predicate.NewPredicateFuncs(func(obj client.Object) bool { - if crd, ok := obj.(*apiextensionsv1.CustomResourceDefinition); ok { - return crdIsEstablished(crd) - } - return false - }) -} - -// crdIsEstablished returns true if the given CRD is established on the cluster, -// which indicates if discovery knows about it yet. For more info see -// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#create-a-customresourcedefinition -func crdIsEstablished(crd *apiextensionsv1.CustomResourceDefinition) bool { - for _, condition := range crd.Status.Conditions { - if condition.Type == v1.Established && condition.Status == v1.ConditionTrue { - return true - } - } - return false -} diff --git a/pkg/reconcilermanager/controllers/garbage_collector.go b/pkg/reconcilermanager/controllers/garbage_collector.go index bae1e0a0b0..8b6d5a43dc 100644 --- a/pkg/reconcilermanager/controllers/garbage_collector.go +++ b/pkg/reconcilermanager/controllers/garbage_collector.go @@ -237,9 +237,9 @@ func (r *reconcilerBase) deleteDeployment(ctx context.Context, reconcilerRef typ return r.cleanup(ctx, d) } -func (r *RootSyncReconciler) deleteSharedClusterRoleBinding(ctx context.Context, name string, reconcilerRef types.NamespacedName) error { +func (r *reconcilerBase) deleteSharedClusterRoleBinding(ctx context.Context, name string, reconcilerRef types.NamespacedName) error { crbKey := client.ObjectKey{Name: name} - // Update the CRB to delete the subject for the deleted RootSync's reconciler + // Update the CRB to delete the subject for the deleted reconciler crb := &rbacv1.ClusterRoleBinding{} if err := r.client.Get(ctx, crbKey, crb); err != nil { if apierrors.IsNotFound(err) { diff --git a/pkg/reconcilermanager/controllers/reconciler_base.go b/pkg/reconcilermanager/controllers/reconciler_base.go index 281dd1cce7..1cf040636d 100644 --- a/pkg/reconcilermanager/controllers/reconciler_base.go +++ b/pkg/reconcilermanager/controllers/reconciler_base.go @@ -700,6 +700,35 @@ func (r *reconcilerBase) updateRBACBinding(ctx context.Context, reconcilerRef, r return nil } +func (r *reconcilerBase) upsertSharedClusterRoleBinding(ctx context.Context, name, clusterRole string, reconcilerRef, rsRef types.NamespacedName) error { + crbRef := client.ObjectKey{Name: name} + childCRB := &rbacv1.ClusterRoleBinding{} + childCRB.Name = crbRef.Name + + labelMap := ManagedObjectLabelMap(r.syncKind, rsRef) + // Remove sync-name label since the ClusterRoleBinding may be shared + delete(labelMap, metadata.SyncNameLabel) + + op, err := CreateOrUpdate(ctx, r.client, childCRB, func() error { + core.AddLabels(childCRB, labelMap) + childCRB.RoleRef = rolereference(clusterRole, "ClusterRole") + childCRB.Subjects = addSubject(childCRB.Subjects, r.serviceAccountSubject(reconcilerRef)) + // Remove any existing OwnerReferences, now that we're using finalizers. + childCRB.OwnerReferences = nil + return nil + }) + if err != nil { + return err + } + if op != controllerutil.OperationResultNone { + r.logger(ctx).Info("Managed object upsert successful", + logFieldObjectRef, crbRef.String(), + logFieldObjectKind, "ClusterRoleBinding", + logFieldOperation, op) + } + return nil +} + func (r *reconcilerBase) isKnownHostsEnabled(auth configsync.AuthType) bool { if auth == configsync.AuthSSH && r.knownHostExist { return true diff --git a/pkg/reconcilermanager/controllers/reposync_controller.go b/pkg/reconcilermanager/controllers/reposync_controller.go index ee98dd0334..c77fc583c2 100644 --- a/pkg/reconcilermanager/controllers/reposync_controller.go +++ b/pkg/reconcilermanager/controllers/reposync_controller.go @@ -74,7 +74,7 @@ type RepoSyncReconciler struct { // configMapWatches stores which namespaces where we are currently watching ConfigMaps configMapWatches map[string]bool - controller *controller.Controller + controller controller.Controller cache cache.Cache } @@ -260,11 +260,16 @@ func (r *RepoSyncReconciler) upsertManagedObjects(ctx context.Context, reconcile return fmt.Errorf("upserting service account: %w", err) } - // Overwrite reconciler rolebinding. + // Namespace-scoped read/write permissions if _, err := r.upsertSharedRoleBinding(ctx, reconcilerRef, rsRef); err != nil { return fmt.Errorf("upserting role binding: %w", err) } + // Cluster-scoped read permissions + if err := r.upsertSharedClusterRoleBinding(ctx, RepoSyncClusterScopeClusterRoleBindingName, RepoSyncClusterScopeClusterRoleName, reconcilerRef, rsRef); err != nil { + return fmt.Errorf("upserting role binding: %w", err) + } + if err := r.upsertHelmConfigMaps(ctx, rs, labelMap); err != nil { return fmt.Errorf("upserting helm config maps: %w", err) } @@ -459,6 +464,10 @@ func (r *RepoSyncReconciler) deleteManagedObjects(ctx context.Context, reconcile return fmt.Errorf("deleting role binding: %w", err) } + if err := r.deleteSharedClusterRoleBinding(ctx, RepoSyncClusterScopeClusterRoleBindingName, reconcilerRef); err != nil { + return fmt.Errorf("deleting cluster role binding: %w", err) + } + if err := r.deleteHelmConfigMapCopies(ctx, rsRef, nil); err != nil { return fmt.Errorf("deleting helm config maps: %w", err) } @@ -472,6 +481,14 @@ func (r *RepoSyncReconciler) deleteManagedObjects(ctx context.Context, reconcile // Register RepoSync controller with reconciler-manager. func (r *RepoSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetMembership bool) error { + r.lock.Lock() + defer r.lock.Unlock() + + // Avoid re-registering the controller + if r.controller != nil { + return nil + } + controllerBuilder := controllerruntime.NewControllerManagedBy(mgr). WithOptions(controller.Options{ MaxConcurrentReconciles: 1, @@ -504,7 +521,7 @@ func (r *RepoSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetM } ctrlr, err := controllerBuilder.Build(r) - r.controller = &ctrlr + r.controller = ctrlr r.cache = mgr.GetCache() return err } @@ -524,7 +541,7 @@ func (r *RepoSyncReconciler) watchConfigMaps(rs *v1beta1.RepoSync) error { if _, ok := r.configMapWatches[rs.Namespace]; !ok { klog.Infoln("Adding watch for ConfigMaps in namespace ", rs.Namespace) - ctrlr := *r.controller + ctrlr := r.controller if err := ctrlr.Watch(source.Kind(r.cache, withNamespace(&corev1.ConfigMap{}, rs.Namespace), handler.EnqueueRequestsFromMapFunc(r.mapConfigMapToRepoSyncs), diff --git a/pkg/reconcilermanager/controllers/rootsync_controller.go b/pkg/reconcilermanager/controllers/rootsync_controller.go index 6a2c8f0094..cdb7830ec0 100644 --- a/pkg/reconcilermanager/controllers/rootsync_controller.go +++ b/pkg/reconcilermanager/controllers/rootsync_controller.go @@ -79,6 +79,8 @@ type RootSyncReconciler struct { reconcilerBase lock sync.Mutex + + controller controller.Controller } // NewRootSyncReconciler returns a new RootSyncReconciler. @@ -416,6 +418,14 @@ func (r *RootSyncReconciler) deleteManagedObjects(ctx context.Context, reconcile // Register RootSync controller with reconciler-manager. func (r *RootSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetMembership bool) error { + r.lock.Lock() + defer r.lock.Unlock() + + // Avoid re-registering the controller + if r.controller != nil { + return nil + } + controllerBuilder := controllerruntime.NewControllerManagedBy(mgr). WithOptions(controller.Options{ MaxConcurrentReconciles: 1, @@ -451,7 +461,10 @@ func (r *RootSyncReconciler) Register(mgr controllerruntime.Manager, watchFleetM handler.EnqueueRequestsFromMapFunc(r.mapMembershipToRootSyncs), builder.WithPredicates(predicate.GenerationChangedPredicate{})) } - return controllerBuilder.Complete(r) + + ctrlr, err := controllerBuilder.Build(r) + r.controller = ctrlr + return err } func withNamespace(obj client.Object, ns string) client.Object { @@ -1005,7 +1018,7 @@ func (r *RootSyncReconciler) manageRBACBindings(ctx context.Context, reconcilerR } return nil } - // Add the base ClusterRole for basic root reconciler functionality + // Minimum required cluster-scoped & all-namespace read/write permissions if err := r.upsertSharedClusterRoleBinding(ctx, RootSyncBaseClusterRoleBindingName, RootSyncBaseClusterRoleName, reconcilerRef, rsRef); err != nil { return err } @@ -1048,36 +1061,6 @@ func (r *RootSyncReconciler) manageRBACBindings(ctx context.Context, reconcilerR return nil } -func (r *RootSyncReconciler) upsertSharedClusterRoleBinding(ctx context.Context, name, clusterRole string, reconcilerRef, rsRef types.NamespacedName) error { - crbRef := client.ObjectKey{Name: name} - childCRB := &rbacv1.ClusterRoleBinding{} - childCRB.Name = crbRef.Name - - labelMap := ManagedObjectLabelMap(r.syncKind, rsRef) - // Remove sync-name label since the ClusterRoleBinding may be shared - delete(labelMap, metadata.SyncNameLabel) - - op, err := CreateOrUpdate(ctx, r.client, childCRB, func() error { - core.AddLabels(childCRB, labelMap) - childCRB.OwnerReferences = nil - childCRB.RoleRef = rolereference(clusterRole, "ClusterRole") - childCRB.Subjects = addSubject(childCRB.Subjects, r.serviceAccountSubject(reconcilerRef)) - // Remove existing OwnerReferences, now that we're using finalizers. - childCRB.OwnerReferences = nil - return nil - }) - if err != nil { - return err - } - if op != controllerutil.OperationResultNone { - r.logger(ctx).Info("Managed object upsert successful", - logFieldObjectRef, crbRef.String(), - logFieldObjectKind, "ClusterRoleBinding", - logFieldOperation, op) - } - return nil -} - func (r *RootSyncReconciler) createRBACBinding(ctx context.Context, reconcilerRef, rsRef types.NamespacedName, roleRef v1beta1.RootSyncRoleRef) (client.ObjectKey, error) { var binding client.Object if roleRef.Namespace == "" { diff --git a/pkg/remediator/remediator.go b/pkg/remediator/remediator.go index 4beb1af4cc..982f63a38c 100644 --- a/pkg/remediator/remediator.go +++ b/pkg/remediator/remediator.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/klog/v2" "kpt.dev/configsync/pkg/declared" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/remediator/queue" "kpt.dev/configsync/pkg/remediator/reconcile" @@ -101,6 +102,7 @@ func New( applier syncerreconcile.Applier, conflictHandler conflict.Handler, fightHandler fight.Handler, + crdController *controllers.CRDController, decls *declared.Resources, numWorkers int, ) (*Remediator, error) { @@ -117,7 +119,7 @@ func New( conflictHandler: conflictHandler, } - watchMgr, err := watch.NewManager(scope, syncName, cfg, q, decls, nil, conflictHandler) + watchMgr, err := watch.NewManager(scope, syncName, cfg, q, decls, nil, conflictHandler, crdController) if err != nil { return nil, fmt.Errorf("creating watch manager: %w", err) } diff --git a/pkg/remediator/watch/listwatch.go b/pkg/remediator/watch/listwatch.go index a78ba2bbee..765f6a6c9e 100644 --- a/pkg/remediator/watch/listwatch.go +++ b/pkg/remediator/watch/listwatch.go @@ -61,10 +61,10 @@ type ListerWatcherFactory func(gvk schema.GroupVersionKind, namespace string) Li // DynamicListerWatcherFactory provides a ListerWatcher method that implements // the ListerWatcherFactory interface. It uses the specified DynamicClient and -// RESTMapper. +// ResettableRESTMapper. type DynamicListerWatcherFactory struct { DynamicClient *dynamic.DynamicClient - Mapper meta.RESTMapper + Mapper ResettableRESTMapper } // ListerWatcher constructs a ListerWatcher for the specified GroupVersionKind @@ -77,19 +77,30 @@ func (dlwf *DynamicListerWatcherFactory) ListerWatcher(gvk schema.GroupVersionKi func DynamicListerWatcherFactoryFromConfig(cfg *rest.Config) (*DynamicListerWatcherFactory, error) { dynamicClient, err := dynamic.NewForConfig(cfg) if err != nil { - return nil, fmt.Errorf("failed to build dynamic client: %w", err) + return nil, fmt.Errorf("creating DynamicClient: %w", err) } httpClient, err := rest.HTTPClientFor(cfg) if err != nil { - return nil, fmt.Errorf("failed to create HTTPClient: %w", err) + return nil, fmt.Errorf("creating HTTPClient: %w", err) } mapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient) if err != nil { - return nil, fmt.Errorf("failed to build mapper: %w", err) + return nil, fmt.Errorf("creating DynamicRESTMapper: %w", err) + } + // DynamicRESTMapper dynamically and transparently discovers new resources, + // when a NoMatchFound error is encountered, but it doesn't automatically + // invalidate deleted resources. So use ReplaceOnResetRESTMapper to replace + // the whole DynamicRESTMapper when Reset is called. + newMapperFn := func() (meta.RESTMapper, error) { + m, err := apiutil.NewDynamicRESTMapper(cfg, httpClient) + if err != nil { + return nil, fmt.Errorf("creating DynamicRESTMapper: %w", err) + } + return m, nil } return &DynamicListerWatcherFactory{ DynamicClient: dynamicClient, - Mapper: mapper, + Mapper: NewReplaceOnResetRESTMapper(mapper, newMapperFn), }, nil } diff --git a/pkg/remediator/watch/manager.go b/pkg/remediator/watch/manager.go index 6ee3629c8f..21b65e77e6 100644 --- a/pkg/remediator/watch/manager.go +++ b/pkg/remediator/watch/manager.go @@ -17,9 +17,11 @@ package watch import ( "context" "errors" + "fmt" "sync" "golang.org/x/exp/maps" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" @@ -29,10 +31,12 @@ import ( "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/metadata" "kpt.dev/configsync/pkg/metrics" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/remediator/conflict" "kpt.dev/configsync/pkg/remediator/queue" "kpt.dev/configsync/pkg/status" syncerclient "kpt.dev/configsync/pkg/syncer/client" + "kpt.dev/configsync/pkg/util/customresource" ) // Manager accepts new resource lists that are parsed from Git and then @@ -57,7 +61,11 @@ type Manager struct { watcherFactory watcherFactory // mapper is the RESTMapper used by the watcherFactory - mapper meta.RESTMapper + mapper ResettableRESTMapper + + conflictHandler conflict.Handler + + crdController *controllers.CRDController // labelSelector filters watches labelSelector labels.Selector @@ -69,14 +77,13 @@ type Manager struct { // watcherMap maps GVKs to their associated watchers watcherMap map[schema.GroupVersionKind]Runnable // needsUpdate indicates if the Manager's watches need to be updated. - needsUpdate bool - conflictHandler conflict.Handler + needsUpdate bool } // Options contains options for creating a watch manager. type Options struct { watcherFactory watcherFactory - mapper meta.RESTMapper + mapper ResettableRESTMapper } // DefaultOptions return the default options with a ListerWatcherFactory built @@ -95,7 +102,8 @@ func DefaultOptions(cfg *rest.Config) (*Options, error) { // NewManager starts a new watch manager func NewManager(scope declared.Scope, syncName string, cfg *rest.Config, - q *queue.ObjectQueue, decls *declared.Resources, options *Options, ch conflict.Handler) (*Manager, error) { + q *queue.ObjectQueue, decls *declared.Resources, options *Options, + ch conflict.Handler, crdController *controllers.CRDController) (*Manager, error) { if options == nil { var err error options, err = DefaultOptions(cfg) @@ -120,6 +128,7 @@ func NewManager(scope declared.Scope, syncName string, cfg *rest.Config, labelSelector: labelSelector, queue: q, conflictHandler: ch, + crdController: crdController, }, nil } @@ -155,20 +164,21 @@ func (m *Manager) AddWatches(ctx context.Context, gvkMap map[schema.GroupVersion // Start new watchers var errs status.MultiError for gvk := range gvkMap { + // Update the CRD Observer + m.startWatchingCRD(gvk, commit) // Skip watchers that are already started if w, isWatched := m.watcherMap[gvk]; isWatched { w.SetLatestCommit(commit) continue } // Only start watcher if the resource exists. - // Watch will be started later by UpdateWatches, after the applier succeeds. - // TODO: register pending resources and start watching them when the CRD is established + // Pending watches will be started later by the CRD Observer or UpdateWatches. if _, err := m.mapper.RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { switch { case meta.IsNoMatchError(err): statusErr := syncerclient.ConflictWatchResourceDoesNotExist(err, gvk) klog.Infof("Remediator skipped starting resource watch: "+ - "%v. The remediator will start the resource watch after the sync has succeeded.", statusErr) + "%v. The remediator will start the resource watch after the CRD is established.", statusErr) // This is expected behavior before a sync attempt. // It likely means a CR and CRD are in the same ApplySet. // So don't record a resource conflict metric or return an error here. @@ -221,26 +231,30 @@ func (m *Manager) UpdateWatches(ctx context.Context, gvkMap map[schema.GroupVers // Remove all conflict errors for objects with the same GK because // the objects are no longer managed by the reconciler. m.conflictHandler.ClearConflictErrorsWithKind(gvk.GroupKind()) + // Update the CRD Observer + m.stopWatchingCRD(gvk) } } // Start new watchers var errs status.MultiError for gvk := range gvkMap { + // Update the CRD Observer + m.startWatchingCRD(gvk, commit) // Skip watchers that are already started if w, isWatched := m.watcherMap[gvk]; isWatched { w.SetLatestCommit(commit) continue } // Only start watcher if the resource exists. - // All resources should exist and be established by now. If not, error. + // Pending watches will be started later by the CRD Observer or UpdateWatches. if _, err := m.mapper.RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { switch { case meta.IsNoMatchError(err): statusErr := syncerclient.ConflictWatchResourceDoesNotExist(err, gvk) klog.Warningf("Remediator encountered a resource conflict: "+ "%v. To resolve the conflict, the remediator will enqueue a resync "+ - "and restart the resource watch after the sync has succeeded.", statusErr) + "and restart the resource watch after the CRD is established.", statusErr) // This is unexpected behavior after a successful sync. // It likely means that some other controller deleted managed objects shortly after they were applied. // So record a resource conflict metric and return an error. @@ -327,3 +341,65 @@ func (m *Manager) stopWatcher(gvk schema.GroupVersionKind) { w.Stop() delete(m.watcherMap, gvk) } + +func (m *Manager) startWatchingCRD(gvk schema.GroupVersionKind, commit string) { + m.crdController.SetReconciler(gvk.GroupKind(), func(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition) error { + if customresource.IsEstablished(crd) { + if err := discoverResourceForKind(m.mapper, gvk); err != nil { + // Trigger retry by controller-manager + return err + } + // Start watching this resource. + gvkMap := map[schema.GroupVersionKind]struct{}{gvk: {}} + return m.AddWatches(ctx, gvkMap, commit) + } + // Else, if not established + if err := forgetResourceForKind(m.mapper, gvk); err != nil { + // Trigger retry by controller-manager + return err + } + // Don't stop watching until UpdateWatches is called by the Updater, + // confirming that the object has been removed from the ApplySet. + // Otherwise the remediator may miss object deletion events that + // come after the CRD deletion event. + return nil + }) +} + +func (m *Manager) stopWatchingCRD(gvk schema.GroupVersionKind) { + m.crdController.DeleteReconciler(gvk.GroupKind()) +} + +// discoverResourceForKind resets the RESTMapper if needed, to discover the +// resource that maps to the specified kind. +func discoverResourceForKind(mapper ResettableRESTMapper, gvk schema.GroupVersionKind) error { + if _, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + if meta.IsNoMatchError(err) { + klog.Infof("Remediator resetting RESTMapper to discover resource: %v", gvk) + if err := mapper.Reset(); err != nil { + return fmt.Errorf("remediator failed to reset RESTMapper: %w", err) + } + } else { + return fmt.Errorf("remediator failed to map kind to resource: %w", err) + } + } + // Else, mapper already up to date + return nil +} + +// forgetResourceForKind resets the RESTMapper if needed, to forget the resource +// that maps to the specified kind. +func forgetResourceForKind(mapper ResettableRESTMapper, gvk schema.GroupVersionKind) error { + if _, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + if !meta.IsNoMatchError(err) { + return fmt.Errorf("remediator failed to map kind to resource: %w", err) + } + // Else, mapper already up to date + } else { + klog.Infof("Remediator resetting RESTMapper to forget resource: %v", gvk) + if err := mapper.Reset(); err != nil { + return fmt.Errorf("remediator failed to reset RESTMapper: %w", err) + } + } + return nil +} diff --git a/pkg/remediator/watch/manager_test.go b/pkg/remediator/watch/manager_test.go index 7a5c54d121..214fa4f226 100644 --- a/pkg/remediator/watch/manager_test.go +++ b/pkg/remediator/watch/manager_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/kinds" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/status" syncerclient "kpt.dev/configsync/pkg/syncer/client" "kpt.dev/configsync/pkg/syncer/syncertest/fake" @@ -237,11 +238,16 @@ func TestManager_AddWatches(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { fakeMapper := testutil.NewFakeRESTMapper(tc.mappedGVKs...) + newMapperFn := func() (meta.RESTMapper, error) { + // AddWatches doesn't call Reset. So we don't need to impl it. + return fakeMapper, nil + } options := &Options{ watcherFactory: testRunnables(tc.failedWatchers, currentCommit), - mapper: fakeMapper, + mapper: NewReplaceOnResetRESTMapper(fakeMapper, newMapperFn), } - m, err := NewManager(":test", "rs", nil, nil, &declared.Resources{}, options, fake.NewConflictHandler()) + m, err := NewManager(":test", "rs", nil, nil, &declared.Resources{}, + options, fake.NewConflictHandler(), &controllers.CRDController{}) if err != nil { t.Fatal(err) } @@ -414,11 +420,16 @@ func TestManager_UpdateWatches(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { fakeMapper := testutil.NewFakeRESTMapper(tc.mappedGVKs...) + newMapperFn := func() (meta.RESTMapper, error) { + // UpdateWatches doesn't call Reset. So we don't need to impl it. + return fakeMapper, nil + } options := &Options{ watcherFactory: testRunnables(tc.failedWatchers, currentCommit), - mapper: fakeMapper, + mapper: NewReplaceOnResetRESTMapper(fakeMapper, newMapperFn), } - m, err := NewManager(":test", "rs", nil, nil, &declared.Resources{}, options, fake.NewConflictHandler()) + m, err := NewManager(":test", "rs", nil, nil, &declared.Resources{}, + options, fake.NewConflictHandler(), &controllers.CRDController{}) if err != nil { t.Fatal(err) } diff --git a/pkg/remediator/watch/mapper_wrapper.go b/pkg/remediator/watch/mapper_wrapper.go new file mode 100644 index 0000000000..06466d53e8 --- /dev/null +++ b/pkg/remediator/watch/mapper_wrapper.go @@ -0,0 +1,108 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// ResettableRESTMapper is a RESTMapper which is capable of resetting itself +// from discovery. +// +// This interface replaces meta.ResettableRESTMapper so that Reset can return an +// error. If Reset errors, it may be retried. +// This interface does NOT work with meta.MaybeResetRESTMapper. +// +// TODO: Only reset one resource at a time, for efficiency. +type ResettableRESTMapper interface { + meta.RESTMapper + Reset() error +} + +// MapperFunc returns a RESTMapper with an empty/reset cache or an error. +type MapperFunc func() (meta.RESTMapper, error) + +type mapper struct { + mux sync.RWMutex + mapper meta.RESTMapper + mapperFunc MapperFunc +} + +var _ ResettableRESTMapper = &mapper{} + +// NewReplaceOnResetRESTMapper wraps the provided RESTMapper and replaces it +// using the MapperFunc when the Reset method is called. +func NewReplaceOnResetRESTMapper(m meta.RESTMapper, newMapperFunc MapperFunc) ResettableRESTMapper { + return &mapper{ + mapper: m, + mapperFunc: newMapperFunc, + } +} + +// KindFor delegates to mapper.KindFor. +func (m *mapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) { + return m.getMapper().KindFor(resource) +} + +// KindsFor delegates to mapper.KindsFor. +func (m *mapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) { + return m.getMapper().KindsFor(resource) +} + +// RESTMapping delegates to mapper.RESTMapping. +func (m *mapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) { + return m.getMapper().RESTMapping(gk, versions...) +} + +// RESTMappings delegates to mapper.RESTMappings. +func (m *mapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) { + return m.getMapper().RESTMappings(gk, versions...) +} + +// ResourceFor delegates to mapper.ResourcesFor. +func (m *mapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) { + return m.getMapper().ResourceFor(input) +} + +// ResourcesFor delegates to mapper.ResourcesFor. +func (m *mapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) { + return m.getMapper().ResourcesFor(input) +} + +// ResourceSingularizer delegates to mapper.ResourceSingularizer. +func (m *mapper) ResourceSingularizer(resource string) (singular string, err error) { + return m.getMapper().ResourceSingularizer(resource) +} + +// Reset replaces the old mapper with a new one. +func (m *mapper) Reset() error { + m.mux.Lock() + defer m.mux.Unlock() + mapper, err := m.mapperFunc() + if err != nil { + return err + } + m.mapper = mapper + return nil +} + +func (m *mapper) getMapper() meta.RESTMapper { + m.mux.RLock() + defer m.mux.RUnlock() + return m.mapper +} diff --git a/pkg/util/customresource/customresource.go b/pkg/util/customresource/customresource.go new file mode 100644 index 0000000000..4c3e2b0cee --- /dev/null +++ b/pkg/util/customresource/customresource.go @@ -0,0 +1,32 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package customresource + +import apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + +// IsEstablished returns true if the given CRD is established on the cluster, +// which indicates if discovery knows about it yet. For more info see +// https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#create-a-customresourcedefinition +func IsEstablished(crd *apiextensionsv1.CustomResourceDefinition) bool { + if crd == nil { + return false + } + for _, condition := range crd.Status.Conditions { + if condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue { + return true + } + } + return false +} diff --git a/pkg/util/customresource/customresource_test.go b/pkg/util/customresource/customresource_test.go new file mode 100644 index 0000000000..fefaa1a205 --- /dev/null +++ b/pkg/util/customresource/customresource_test.go @@ -0,0 +1,74 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package customresource + +import ( + "testing" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" +) + +func TestIsEstablished(t *testing.T) { + tests := []struct { + name string + crd *apiextensionsv1.CustomResourceDefinition + established bool + }{ + { + name: "not found", + crd: nil, + established: false, + }, + { + name: "established", + crd: &apiextensionsv1.CustomResourceDefinition{ + Status: apiextensionsv1.CustomResourceDefinitionStatus{ + Conditions: []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.Established, + Status: apiextensionsv1.ConditionTrue, + }, + }, + }, + }, + established: true, + }, + { + name: "not established", + crd: &apiextensionsv1.CustomResourceDefinition{ + Status: apiextensionsv1.CustomResourceDefinitionStatus{ + Conditions: []apiextensionsv1.CustomResourceDefinitionCondition{ + { + Type: apiextensionsv1.Established, + Status: apiextensionsv1.ConditionFalse, + }, + }, + }, + }, + established: false, + }, + { + name: "not observed", + crd: &apiextensionsv1.CustomResourceDefinition{}, + established: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := IsEstablished(tc.crd); got != tc.established { + t.Errorf("IsEstablished() = %v, want %v", got, tc.established) + } + }) + } +}