From 8cec21ab59c6152c1aab8e445836292a80d45c7a Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Fri, 31 May 2019 09:39:40 -0400 Subject: [PATCH 1/4] Enable OLM to update CRD when there is only one owner of that CRD Signed-off-by: Vu Dinh --- pkg/controller/operators/catalog/operator.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index d0f48402cf..8dbbcd7c27 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "reflect" "sync" "time" @@ -1034,6 +1035,19 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error { // Attempt to create the CRD. _, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Create(&crd) if k8serrors.IsAlreadyExists(err) { + currentCRD, _ := o.OpClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Get(crd.GetName(), metav1.GetOptions{}) + // Compare 2 CRDs to see if it needs to be updatetd + if !reflect.DeepEqual(crd, *currentCRD) { + // Verify CRD ownership, only attempt to update if + // CRD has only one owner + if len(existingCRDOwners[currentCRD.GetName()]) == 1 { + // Attempt to update CRD + _, err = o.OpClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Update(&crd) + if err != nil { + return errorwrap.Wrapf(err, "error update CRD: %s", step.Resource.Name) + } + } + } // If it already existed, mark the step as Present. plan.Status.Plan[i].Status = v1alpha1.StepStatusPresent continue From 924c565dca77b6b0efebfcdae27c9d1208d772a7 Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Tue, 4 Jun 2019 17:12:05 -0400 Subject: [PATCH 2/4] Add IndexFunc using ProvidedAPIs as key to look up CSVs Signed-off-by: Vu Dinh --- pkg/controller/operators/catalog/operator.go | 44 ++++++++----- pkg/lib/index/api.go | 66 ++++++++++++++++++++ 2 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 pkg/lib/index/api.go diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 8dbbcd7c27..99c04ad8e7 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -33,6 +33,7 @@ import ( "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver" + index "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/index" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" @@ -57,21 +58,22 @@ const ( type Operator struct { queueinformer.Operator - logger *logrus.Logger - clock utilclock.Clock - opClient operatorclient.ClientInterface - client versioned.Interface - lister operatorlister.OperatorLister - catsrcQueueSet *queueinformer.ResourceQueueSet - subQueueSet *queueinformer.ResourceQueueSet - ipQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.RateLimitingInterface - namespace string - sources map[resolver.CatalogKey]resolver.SourceRef - sourcesLock sync.RWMutex - sourcesLastUpdate metav1.Time - resolver resolver.Resolver - reconciler reconciler.RegistryReconcilerFactory + logger *logrus.Logger + clock utilclock.Clock + opClient operatorclient.ClientInterface + client versioned.Interface + lister operatorlister.OperatorLister + catsrcQueueSet *queueinformer.ResourceQueueSet + subQueueSet *queueinformer.ResourceQueueSet + ipQueueSet *queueinformer.ResourceQueueSet + nsResolveQueue workqueue.RateLimitingInterface + namespace string + sources map[resolver.CatalogKey]resolver.SourceRef + sourcesLock sync.RWMutex + sourcesLastUpdate metav1.Time + resolver resolver.Resolver + reconciler reconciler.RegistryReconcilerFactory + csvProvidedAPIsIndexer map[string]cache.Indexer } // NewOperator creates a new Catalog Operator. @@ -123,6 +125,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister()) op.RegisterInformer(csvInformer.Informer()) + csvInformer.Informer().AddIndexers(cache.Indexers{index.ProvidedAPIsIndexFuncKey: index.ProvidedAPIsIndexFunc}) + csvIndexer := csvInformer.Informer().GetIndexer() + op.csvProvidedAPIsIndexer[namespace] = csvIndexer + // TODO: Add namespace resolve sync // Wire InstallPlans @@ -1040,8 +1046,14 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error { if !reflect.DeepEqual(crd, *currentCRD) { // Verify CRD ownership, only attempt to update if // CRD has only one owner - if len(existingCRDOwners[currentCRD.GetName()]) == 1 { + // Example: provided=database.coreos.com/v1alpha1/EtcdCluster + matchedCSV, err := index.APIsIndexValues(o.csvProvidedAPIsIndexer, crd) + if err != nil { + return errorwrap.Wrapf(err, "error find matched CSV: %s", step.Resource.Name) + } + if len(matchedCSV) == 1 { // Attempt to update CRD + crd.SetResourceVersion(currentCRD.GetResourceVersion()) _, err = o.OpClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Update(&crd) if err != nil { return errorwrap.Wrapf(err, "error update CRD: %s", step.Resource.Name) diff --git a/pkg/lib/index/api.go b/pkg/lib/index/api.go new file mode 100644 index 0000000000..16f4085c07 --- /dev/null +++ b/pkg/lib/index/api.go @@ -0,0 +1,66 @@ +package indexer + +import ( + "fmt" + "strings" + + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/client-go/tools/cache" +) + +const ( + // ProvidedAPIsIndexFuncKey is the recommended key to use for registering the index func with an indexer. + ProvidedAPIsIndexFuncKey string = "providedAPIs" +) + +func ProvidedAPIsIndexFunc(obj interface{}) ([]string, error) { + indicies := []string{} + + csv, ok := obj.(*v1alpha1.ClusterServiceVersion) + if !ok { + return indicies, fmt.Errorf("invalid object of type: %T", obj) + } + + for _, crd := range csv.Spec.CustomResourceDefinitions.Owned { + parts := strings.SplitN(crd.Name, ".", 2) + if len(parts) < 2 { + return indicies, fmt.Errorf("couldn't parse plural.group from crd name: %s", crd.Name) + } + indicies = append(indicies, fmt.Sprintf("provided=%s/%s/%s", parts[1], crd.Version, crd.Kind)) + } + for _, api := range csv.Spec.APIServiceDefinitions.Owned { + indicies = append(indicies, fmt.Sprintf("provided=%s/%s/%s", api.Group, api.Version, api.Kind)) + } + + return indicies, nil +} + +// APIsIndexValues returns the names of CSVs that own the given CRD +func APIsIndexValues(indexers map[string]cache.Indexer, crd v1beta1ext.CustomResourceDefinition) (map[string]struct{}, error) { + csvSet := map[string]struct{}{} + crdSpec := map[string]struct{}{} + for _, v := range crd.Spec.Versions { + crdSpec[fmt.Sprintf("provided=%s/%s/%s", crd.Spec.Group, v.Name, crd.Spec.Names.Kind)] = struct{}{} + } + if crd.Spec.Version != "" { + crdSpec[fmt.Sprintf("provided=%s/%s/%s", crd.Spec.Group, crd.Spec.Version, crd.Spec.Names.Kind)] = struct{}{} + } + for _, indexer := range indexers { + for key, _ := range crdSpec { + csvs, err := indexer.ByIndex(ProvidedAPIsIndexFuncKey, key) + if err != nil { + return nil, err + } + for _, csv := range csvs { + csv, ok := csv.(*v1alpha1.ClusterServiceVersion) + if !ok { + continue + } + // Add to set + csvSet[csv.GetName()] = struct{}{} + } + } + } + return csvSet, nil +} From 24c59dcd6cbed96d555f84191ce4a13218093706 Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Fri, 7 Jun 2019 01:55:24 -0400 Subject: [PATCH 3/4] Add e2e test case for single owner CRD update Signed-off-by: Vu Dinh --- pkg/controller/operators/catalog/operator.go | 20 ++- pkg/lib/index/api.go | 13 +- test/e2e/installplan_e2e_test.go | 179 +++++++++++++++++++ 3 files changed, 204 insertions(+), 8 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 99c04ad8e7..6923ad9a90 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -15,6 +15,7 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -251,6 +252,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo } + // Register CustomResourceDefinition QueueInformer + customResourceDefinitionInformer := extinf.NewSharedInformerFactory(op.OpClient.ApiextensionsV1beta1Interface(), wakeupInterval).Apiextensions().V1beta1().CustomResourceDefinitions() + op.RegisterQueueInformer(queueinformer.NewInformer( + workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "customresourcedefinitions"), + customResourceDefinitionInformer.Informer(), + op.syncObject, + &cache.ResourceEventHandlerFuncs{ + DeleteFunc: op.handleDeletion, + }, + "customresourcedefinitions", + metrics.NewMetricsNil(), + logger, + )) + op.lister.APIExtensionsV1beta1().RegisterCustomResourceDefinitionLister(customResourceDefinitionInformer.Lister()) + // Namespace sync for resolving subscriptions namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod).Core().V1().Namespaces() op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) @@ -1041,13 +1057,13 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error { // Attempt to create the CRD. _, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Create(&crd) if k8serrors.IsAlreadyExists(err) { - currentCRD, _ := o.OpClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Get(crd.GetName(), metav1.GetOptions{}) + currentCRD, _ := o.lister.APIExtensionsV1beta1().CustomResourceDefinitionLister().Get(crd.GetName()) // Compare 2 CRDs to see if it needs to be updatetd if !reflect.DeepEqual(crd, *currentCRD) { // Verify CRD ownership, only attempt to update if // CRD has only one owner // Example: provided=database.coreos.com/v1alpha1/EtcdCluster - matchedCSV, err := index.APIsIndexValues(o.csvProvidedAPIsIndexer, crd) + matchedCSV, err := index.CRDProviderNames(o.csvProvidedAPIsIndexer, crd) if err != nil { return errorwrap.Wrapf(err, "error find matched CSV: %s", step.Resource.Name) } diff --git a/pkg/lib/index/api.go b/pkg/lib/index/api.go index 16f4085c07..42632a407a 100644 --- a/pkg/lib/index/api.go +++ b/pkg/lib/index/api.go @@ -14,6 +14,7 @@ const ( ProvidedAPIsIndexFuncKey string = "providedAPIs" ) +// ProvidedAPIsIndexFunc returns indicies from the owned CRDs and APIs of the given object (CSV) func ProvidedAPIsIndexFunc(obj interface{}) ([]string, error) { indicies := []string{} @@ -27,24 +28,24 @@ func ProvidedAPIsIndexFunc(obj interface{}) ([]string, error) { if len(parts) < 2 { return indicies, fmt.Errorf("couldn't parse plural.group from crd name: %s", crd.Name) } - indicies = append(indicies, fmt.Sprintf("provided=%s/%s/%s", parts[1], crd.Version, crd.Kind)) + indicies = append(indicies, fmt.Sprintf("%s/%s/%s", parts[1], crd.Version, crd.Kind)) } for _, api := range csv.Spec.APIServiceDefinitions.Owned { - indicies = append(indicies, fmt.Sprintf("provided=%s/%s/%s", api.Group, api.Version, api.Kind)) + indicies = append(indicies, fmt.Sprintf("%s/%s/%s", api.Group, api.Version, api.Kind)) } return indicies, nil } -// APIsIndexValues returns the names of CSVs that own the given CRD -func APIsIndexValues(indexers map[string]cache.Indexer, crd v1beta1ext.CustomResourceDefinition) (map[string]struct{}, error) { +// CRDProviderNames returns the names of CSVs that own the given CRD +func CRDProviderNames(indexers map[string]cache.Indexer, crd v1beta1ext.CustomResourceDefinition) (map[string]struct{}, error) { csvSet := map[string]struct{}{} crdSpec := map[string]struct{}{} for _, v := range crd.Spec.Versions { - crdSpec[fmt.Sprintf("provided=%s/%s/%s", crd.Spec.Group, v.Name, crd.Spec.Names.Kind)] = struct{}{} + crdSpec[fmt.Sprintf("%s/%s/%s", crd.Spec.Group, v.Name, crd.Spec.Names.Kind)] = struct{}{} } if crd.Spec.Version != "" { - crdSpec[fmt.Sprintf("provided=%s/%s/%s", crd.Spec.Group, crd.Spec.Version, crd.Spec.Names.Kind)] = struct{}{} + crdSpec[fmt.Sprintf("%s/%s/%s", crd.Spec.Group, crd.Spec.Version, crd.Spec.Names.Kind)] = struct{}{} } for _, indexer := range indexers { for key, _ := range crdSpec { diff --git a/test/e2e/installplan_e2e_test.go b/test/e2e/installplan_e2e_test.go index 2878346edf..a50b58d6b2 100644 --- a/test/e2e/installplan_e2e_test.go +++ b/test/e2e/installplan_e2e_test.go @@ -13,6 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -641,6 +642,184 @@ func TestCreateInstallPlanWithPreExistingCRDOwners(t *testing.T) { }) } +func TestUpdateInstallPlan(t *testing.T) { + defer cleaner.NotifyTestComplete(t, true) + t.Run("UpdateSingleExistingCRDOwner", func(t *testing.T) { + defer cleaner.NotifyTestComplete(t, true) + + mainPackageName := genName("nginx-") + + mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName) + + stableChannel := "stable" + + mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil) + + crdPlural := genName("ins-") + crdName := crdPlural + ".cluster.com" + mainCRD := apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: crdName, + }, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "cluster.com", + Versions: []apiextensions.CustomResourceDefinitionVersion{ + { + Name: "v1alpha1", + Served: true, + Storage: true, + }, + }, + Names: apiextensions.CustomResourceDefinitionNames{ + Plural: crdPlural, + Singular: crdPlural, + Kind: crdPlural, + ListKind: "list" + crdPlural, + }, + Scope: "Namespaced", + }, + } + + updatedCRD := apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: crdName, + }, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "cluster.com", + Versions: []apiextensions.CustomResourceDefinitionVersion{ + { + Name: "v1alpha1", + Served: true, + Storage: true, + }, + { + Name: "v1alpha2", + Served: true, + Storage: false, + }, + }, + Names: apiextensions.CustomResourceDefinitionNames{ + Plural: crdPlural, + Singular: crdPlural, + Kind: crdPlural, + ListKind: "list" + crdPlural, + }, + Scope: "Namespaced", + }, + } + + expectedCRDVersions := map[v1beta1.CustomResourceDefinitionVersion]struct{}{} + for _, version := range updatedCRD.Spec.Versions { + key := v1beta1.CustomResourceDefinitionVersion{ + Name: version.Name, + Served: version.Served, + Storage: version.Storage, + } + expectedCRDVersions[key] = struct{}{} + } + + mainCSV := newCSV(mainPackageStable, testNamespace, "", semver.MustParse("0.1.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy) + + c := newKubeClient(t) + crc := newCRClient(t) + defer func() { + require.NoError(t, crc.OperatorsV1alpha1().Subscriptions(testNamespace).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})) + }() + + mainCatalogName := genName("mock-ocs-main-") + + // Create separate manifests for each CatalogSource + mainManifests := []registry.PackageManifest{ + { + PackageName: mainPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: mainPackageStable}, + }, + DefaultChannelName: stableChannel, + }, + } + + // Create the catalog sources + _, cleanupMainCatalogSource := createInternalCatalogSource(t, c, crc, mainCatalogName, testNamespace, mainManifests, []apiextensions.CustomResourceDefinition{mainCRD}, []v1alpha1.ClusterServiceVersion{mainCSV}) + defer cleanupMainCatalogSource() + // Attempt to get the catalog source before creating install plan + _, err := fetchCatalogSource(t, crc, mainCatalogName, testNamespace, catalogSourceRegistryPodSynced) + require.NoError(t, err) + + subscriptionName := genName("sub-nginx-") + subscriptionCleanup := createSubscriptionForCatalog(t, crc, testNamespace, subscriptionName, mainCatalogName, mainPackageName, stableChannel, "", v1alpha1.ApprovalAutomatic) + defer subscriptionCleanup() + + subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasInstallPlanChecker) + require.NoError(t, err) + require.NotNil(t, subscription) + require.NotNil(t, subscription.Status.InstallPlanRef) + require.Equal(t, mainCSV.GetName(), subscription.Status.CurrentCSV) + + installPlanName := subscription.Status.InstallPlanRef.Name + + // Wait for InstallPlan to be status: Complete before checking resource presence + fetchedInstallPlan, err := fetchInstallPlan(t, crc, installPlanName, buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseComplete)) + require.NoError(t, err) + + require.Equal(t, v1alpha1.InstallPlanPhaseComplete, fetchedInstallPlan.Status.Phase) + + // Fetch installplan again to check for unnecessary control loops + fetchedInstallPlan, err = fetchInstallPlan(t, crc, fetchedInstallPlan.GetName(), func(fip *v1alpha1.InstallPlan) bool { + compareResources(t, fetchedInstallPlan, fip) + return true + }) + require.NoError(t, err) + + // Verify CSV is created + _, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvAnyChecker) + require.NoError(t, err) + + // Create new CSV to replace the one CSV + updatedCSV := newCSV(mainPackageStable+"-v2", testNamespace, mainPackageStable, semver.MustParse("0.1.1"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy) + + // Update manifest + updatedManifests := []registry.PackageManifest{ + { + PackageName: mainPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: updatedCSV.GetName()}, + }, + DefaultChannelName: stableChannel, + }, + } + + updateInternalCatalog(t, c, crc, mainCatalogName, testNamespace, []apiextensions.CustomResourceDefinition{updatedCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, updatedCSV}, updatedManifests) + + // Wait for subscription to update + updatedSubscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasCurrentCSV(updatedCSV.GetName())) + require.NoError(t, err) + + // Verify installplan created and installed + fetchedUpdatedInstallPlan, err := fetchInstallPlan(t, crc, updatedSubscription.Status.InstallPlanRef.Name, buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseComplete)) + require.NoError(t, err) + require.NotEqual(t, fetchedInstallPlan.GetName(), fetchedUpdatedInstallPlan.GetName()) + + // Wait for csv to update + _, err = awaitCSV(t, crc, testNamespace, updatedCSV.GetName(), csvAnyChecker) + require.NoError(t, err) + + // Get the CRD to see if it is updated + fetchedCRD, err := c.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Get(crdName, metav1.GetOptions{}) + require.NoError(t, err) + + for _, version := range fetchedCRD.Spec.Versions { + key := v1beta1.CustomResourceDefinitionVersion{ + Name: version.Name, + Served: version.Served, + Storage: version.Storage, + } + _, ok := expectedCRDVersions[key] + require.True(t, ok, "couldn't find %v in expected CRD versions: %#v", key, expectedCRDVersions) + } + }) +} + // TestCreateInstallPlanWithPermissions creates an InstallPlan with a CSV containing a set of permissions to be resolved. func TestCreateInstallPlanWithPermissions(t *testing.T) { defer cleaner.NotifyTestComplete(t, true) From 930163cccd8cd265993cd9576e311da8b7151865 Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Mon, 17 Jun 2019 06:24:06 -0400 Subject: [PATCH 4/4] Add a new test case to cover preexisting CRD case Signed-off-by: Vu Dinh --- pkg/controller/operators/catalog/operator.go | 50 ++--- pkg/lib/index/api.go | 6 +- test/e2e/installplan_e2e_test.go | 214 ++++++++++++++++--- 3 files changed, 213 insertions(+), 57 deletions(-) diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 6923ad9a90..06954584b2 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -102,17 +102,18 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Allocate the new instance of an Operator. op := &Operator{ - Operator: queueOperator, - logger: logger, - clock: clock, - opClient: opClient, - client: crClient, - lister: lister, - namespace: operatorNamespace, - sources: make(map[resolver.CatalogKey]resolver.SourceRef), - resolver: resolver.NewOperatorsV1alpha1Resolver(lister), - catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), - subQueueSet: queueinformer.NewEmptyResourceQueueSet(), + Operator: queueOperator, + logger: logger, + clock: clock, + opClient: opClient, + client: crClient, + lister: lister, + namespace: operatorNamespace, + sources: make(map[resolver.CatalogKey]resolver.SourceRef), + resolver: resolver.NewOperatorsV1alpha1Resolver(lister), + catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(), + subQueueSet: queueinformer.NewEmptyResourceQueueSet(), + csvProvidedAPIsIndexer: map[string]cache.Indexer{}, } op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now) @@ -253,19 +254,18 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo } // Register CustomResourceDefinition QueueInformer - customResourceDefinitionInformer := extinf.NewSharedInformerFactory(op.OpClient.ApiextensionsV1beta1Interface(), wakeupInterval).Apiextensions().V1beta1().CustomResourceDefinitions() - op.RegisterQueueInformer(queueinformer.NewInformer( - workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "customresourcedefinitions"), - customResourceDefinitionInformer.Informer(), - op.syncObject, - &cache.ResourceEventHandlerFuncs{ - DeleteFunc: op.handleDeletion, - }, - "customresourcedefinitions", - metrics.NewMetricsNil(), - logger, - )) - op.lister.APIExtensionsV1beta1().RegisterCustomResourceDefinitionLister(customResourceDefinitionInformer.Lister()) + crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsV1beta1Interface(), resyncPeriod).Apiextensions().V1beta1().CustomResourceDefinitions() + op.lister.APIExtensionsV1beta1().RegisterCustomResourceDefinitionLister(crdInformer.Lister()) + crdQueueInformer, err := queueinformer.NewQueueInformer( + ctx, + queueinformer.WithLogger(op.logger), + queueinformer.WithInformer(crdInformer.Informer()), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)), + ) + if err != nil { + return nil, err + } + op.RegisterQueueInformer(crdQueueInformer) // Namespace sync for resolving subscriptions namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod).Core().V1().Namespaces() @@ -1070,7 +1070,7 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error { if len(matchedCSV) == 1 { // Attempt to update CRD crd.SetResourceVersion(currentCRD.GetResourceVersion()) - _, err = o.OpClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Update(&crd) + _, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Update(&crd) if err != nil { return errorwrap.Wrapf(err, "error update CRD: %s", step.Resource.Name) } diff --git a/pkg/lib/index/api.go b/pkg/lib/index/api.go index 42632a407a..1213766f17 100644 --- a/pkg/lib/index/api.go +++ b/pkg/lib/index/api.go @@ -48,13 +48,13 @@ func CRDProviderNames(indexers map[string]cache.Indexer, crd v1beta1ext.CustomRe crdSpec[fmt.Sprintf("%s/%s/%s", crd.Spec.Group, crd.Spec.Version, crd.Spec.Names.Kind)] = struct{}{} } for _, indexer := range indexers { - for key, _ := range crdSpec { + for key := range crdSpec { csvs, err := indexer.ByIndex(ProvidedAPIsIndexFuncKey, key) if err != nil { return nil, err } - for _, csv := range csvs { - csv, ok := csv.(*v1alpha1.ClusterServiceVersion) + for _, item := range csvs { + csv, ok := item.(*v1alpha1.ClusterServiceVersion) if !ok { continue } diff --git a/test/e2e/installplan_e2e_test.go b/test/e2e/installplan_e2e_test.go index a50b58d6b2..016860b3f4 100644 --- a/test/e2e/installplan_e2e_test.go +++ b/test/e2e/installplan_e2e_test.go @@ -647,7 +647,7 @@ func TestUpdateInstallPlan(t *testing.T) { t.Run("UpdateSingleExistingCRDOwner", func(t *testing.T) { defer cleaner.NotifyTestComplete(t, true) - mainPackageName := genName("nginx-") + mainPackageName := genName("nginx-update-") mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName) @@ -655,7 +655,7 @@ func TestUpdateInstallPlan(t *testing.T) { mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil) - crdPlural := genName("ins-") + crdPlural := genName("ins-update-") crdName := crdPlural + ".cluster.com" mainCRD := apiextensions.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{ @@ -708,16 +708,6 @@ func TestUpdateInstallPlan(t *testing.T) { }, } - expectedCRDVersions := map[v1beta1.CustomResourceDefinitionVersion]struct{}{} - for _, version := range updatedCRD.Spec.Versions { - key := v1beta1.CustomResourceDefinitionVersion{ - Name: version.Name, - Served: version.Served, - Storage: version.Storage, - } - expectedCRDVersions[key] = struct{}{} - } - mainCSV := newCSV(mainPackageStable, testNamespace, "", semver.MustParse("0.1.0"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy) c := newKubeClient(t) @@ -726,7 +716,7 @@ func TestUpdateInstallPlan(t *testing.T) { require.NoError(t, crc.OperatorsV1alpha1().Subscriptions(testNamespace).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})) }() - mainCatalogName := genName("mock-ocs-main-") + mainCatalogName := genName("mock-ocs-main-update-") // Create separate manifests for each CatalogSource mainManifests := []registry.PackageManifest{ @@ -746,7 +736,7 @@ func TestUpdateInstallPlan(t *testing.T) { _, err := fetchCatalogSource(t, crc, mainCatalogName, testNamespace, catalogSourceRegistryPodSynced) require.NoError(t, err) - subscriptionName := genName("sub-nginx-") + subscriptionName := genName("sub-nginx-update-") subscriptionCleanup := createSubscriptionForCatalog(t, crc, testNamespace, subscriptionName, mainCatalogName, mainPackageName, stableChannel, "", v1alpha1.ApprovalAutomatic) defer subscriptionCleanup() @@ -775,47 +765,213 @@ func TestUpdateInstallPlan(t *testing.T) { _, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvAnyChecker) require.NoError(t, err) - // Create new CSV to replace the one CSV - updatedCSV := newCSV(mainPackageStable+"-v2", testNamespace, mainPackageStable, semver.MustParse("0.1.1"), []apiextensions.CustomResourceDefinition{mainCRD}, nil, mainNamedStrategy) + updateInternalCatalog(t, c, crc, mainCatalogName, testNamespace, []apiextensions.CustomResourceDefinition{updatedCRD}, []v1alpha1.ClusterServiceVersion{mainCSV}, mainManifests) + + // Update the subscription resource + err = crc.OperatorsV1alpha1().Subscriptions(testNamespace).DeleteCollection(metav1.NewDeleteOptions(0), metav1.ListOptions{}) + require.NoError(t, err) + + // existing cleanup should remove this + createSubscriptionForCatalog(t, crc, testNamespace, subscriptionName, mainCatalogName, mainPackageName, stableChannel, "", v1alpha1.ApprovalAutomatic) + + // Wait for subscription to update + updatedSubscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasInstallPlanChecker) + require.NoError(t, err) + + // Verify installplan created and installed + fetchedUpdatedInstallPlan, err := fetchInstallPlan(t, crc, updatedSubscription.Status.InstallPlanRef.Name, buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseComplete)) + require.NoError(t, err) + require.NotEqual(t, fetchedInstallPlan.GetName(), fetchedUpdatedInstallPlan.GetName()) + + // Wait for csv to update + _, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvAnyChecker) + require.NoError(t, err) + + // Get the CRD to see if it is updated + fetchedCRD, err := c.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Get(crdName, metav1.GetOptions{}) + require.NoError(t, err) + require.Equal(t, len(fetchedCRD.Spec.Versions), len(updatedCRD.Spec.Versions), "The CRD versions counts don't match") + + fetchedCRDVersions := map[v1beta1.CustomResourceDefinitionVersion]struct{}{} + for _, version := range fetchedCRD.Spec.Versions { + key := v1beta1.CustomResourceDefinitionVersion{ + Name: version.Name, + Served: version.Served, + Storage: version.Storage, + } + fetchedCRDVersions[key] = struct{}{} + } + + for _, version := range updatedCRD.Spec.Versions { + key := v1beta1.CustomResourceDefinitionVersion{ + Name: version.Name, + Served: version.Served, + Storage: version.Storage, + } + _, ok := fetchedCRDVersions[key] + require.True(t, ok, "couldn't find %v in fetched CRD versions: %#v", key, fetchedCRDVersions) + } + }) + + t.Run("UpdatePreexistingCRDFailed", func(t *testing.T) { + defer cleaner.NotifyTestComplete(t, true) + + c := newKubeClient(t) + crc := newCRClient(t) + defer func() { + require.NoError(t, crc.OperatorsV1alpha1().Subscriptions(testNamespace).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})) + }() + + mainPackageName := genName("nginx-update2-") + + mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName) + + stableChannel := "stable" + + mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil) + + crdPlural := genName("ins-update2-") + crdName := crdPlural + ".cluster.com" + mainCRD := apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: crdName, + }, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "cluster.com", + Versions: []apiextensions.CustomResourceDefinitionVersion{ + { + Name: "v1alpha1", + Served: true, + Storage: true, + }, + }, + Names: apiextensions.CustomResourceDefinitionNames{ + Plural: crdPlural, + Singular: crdPlural, + Kind: crdPlural, + ListKind: "list" + crdPlural, + }, + Scope: "Namespaced", + }, + } + + updatedCRD := apiextensions.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: crdName, + }, + Spec: apiextensions.CustomResourceDefinitionSpec{ + Group: "cluster.com", + Versions: []apiextensions.CustomResourceDefinitionVersion{ + { + Name: "v1alpha1", + Served: true, + Storage: true, + }, + { + Name: "v1alpha2", + Served: true, + Storage: false, + }, + }, + Names: apiextensions.CustomResourceDefinitionNames{ + Plural: crdPlural, + Singular: crdPlural, + Kind: crdPlural, + ListKind: "list" + crdPlural, + }, + Scope: "Namespaced", + }, + } + + expectedCRDVersions := map[v1beta1.CustomResourceDefinitionVersion]struct{}{} + for _, version := range mainCRD.Spec.Versions { + key := v1beta1.CustomResourceDefinitionVersion{ + Name: version.Name, + Served: version.Served, + Storage: version.Storage, + } + expectedCRDVersions[key] = struct{}{} + } + + // Create the initial CSV + cleanupCRD, err := createCRD(c, mainCRD) + require.NoError(t, err) + defer cleanupCRD() + + mainCSV := newCSV(mainPackageStable, testNamespace, "", semver.MustParse("0.1.0"), nil, nil, mainNamedStrategy) + + mainCatalogName := genName("mock-ocs-main-update2-") - // Update manifest - updatedManifests := []registry.PackageManifest{ + // Create separate manifests for each CatalogSource + mainManifests := []registry.PackageManifest{ { PackageName: mainPackageName, Channels: []registry.PackageChannel{ - {Name: stableChannel, CurrentCSVName: updatedCSV.GetName()}, + {Name: stableChannel, CurrentCSVName: mainPackageStable}, }, DefaultChannelName: stableChannel, }, } - updateInternalCatalog(t, c, crc, mainCatalogName, testNamespace, []apiextensions.CustomResourceDefinition{updatedCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, updatedCSV}, updatedManifests) + // Create the catalog sources + _, cleanupMainCatalogSource := createInternalCatalogSource(t, c, crc, mainCatalogName, testNamespace, mainManifests, []apiextensions.CustomResourceDefinition{updatedCRD}, []v1alpha1.ClusterServiceVersion{mainCSV}) + defer cleanupMainCatalogSource() + // Attempt to get the catalog source before creating install plan + _, err = fetchCatalogSource(t, crc, mainCatalogName, testNamespace, catalogSourceRegistryPodSynced) + require.NoError(t, err) - // Wait for subscription to update - updatedSubscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasCurrentCSV(updatedCSV.GetName())) + subscriptionName := genName("sub-nginx-update2-") + subscriptionCleanup := createSubscriptionForCatalog(t, crc, testNamespace, subscriptionName, mainCatalogName, mainPackageName, stableChannel, "", v1alpha1.ApprovalAutomatic) + defer subscriptionCleanup() + + subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionHasInstallPlanChecker) require.NoError(t, err) + require.NotNil(t, subscription) + require.NotNil(t, subscription.Status.InstallPlanRef) + require.Equal(t, mainCSV.GetName(), subscription.Status.CurrentCSV) - // Verify installplan created and installed - fetchedUpdatedInstallPlan, err := fetchInstallPlan(t, crc, updatedSubscription.Status.InstallPlanRef.Name, buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseComplete)) + installPlanName := subscription.Status.InstallPlanRef.Name + + // Wait for InstallPlan to be status: Complete before checking resource presence + fetchedInstallPlan, err := fetchInstallPlan(t, crc, installPlanName, buildInstallPlanPhaseCheckFunc(v1alpha1.InstallPlanPhaseComplete)) require.NoError(t, err) - require.NotEqual(t, fetchedInstallPlan.GetName(), fetchedUpdatedInstallPlan.GetName()) - // Wait for csv to update - _, err = awaitCSV(t, crc, testNamespace, updatedCSV.GetName(), csvAnyChecker) + require.Equal(t, v1alpha1.InstallPlanPhaseComplete, fetchedInstallPlan.Status.Phase) + + // Fetch installplan again to check for unnecessary control loops + fetchedInstallPlan, err = fetchInstallPlan(t, crc, fetchedInstallPlan.GetName(), func(fip *v1alpha1.InstallPlan) bool { + compareResources(t, fetchedInstallPlan, fip) + return true + }) + require.NoError(t, err) + + // Verify CSV is created + _, err = awaitCSV(t, crc, testNamespace, mainCSV.GetName(), csvAnyChecker) require.NoError(t, err) // Get the CRD to see if it is updated fetchedCRD, err := c.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Get(crdName, metav1.GetOptions{}) require.NoError(t, err) + require.Equal(t, len(fetchedCRD.Spec.Versions), len(mainCRD.Spec.Versions), "The CRD versions counts don't match") + fetchedCRDVersions := map[v1beta1.CustomResourceDefinitionVersion]struct{}{} for _, version := range fetchedCRD.Spec.Versions { key := v1beta1.CustomResourceDefinitionVersion{ Name: version.Name, Served: version.Served, Storage: version.Storage, } - _, ok := expectedCRDVersions[key] - require.True(t, ok, "couldn't find %v in expected CRD versions: %#v", key, expectedCRDVersions) + fetchedCRDVersions[key] = struct{}{} + } + + for _, version := range mainCRD.Spec.Versions { + key := v1beta1.CustomResourceDefinitionVersion{ + Name: version.Name, + Served: version.Served, + Storage: version.Storage, + } + _, ok := fetchedCRDVersions[key] + require.True(t, ok, "couldn't find %v in fetched CRD versions: %#v", key, fetchedCRDVersions) } }) }