From 686803737693d3cbac939d282f628054e5f4d9c0 Mon Sep 17 00:00:00 2001 From: Evan Cordell Date: Thu, 24 Jan 2019 11:02:52 -0500 Subject: [PATCH] feat(catalogsource): allow grpc source types that don't require an image instead, they provide an address directly. OLM will not have visibility into the resources required for running the grpc catalog for this case, but will still connect and health check. --- cmd/catalog/main.go | 1 - .../operators/v1alpha1/catalogsource_types.go | 9 + pkg/controller/operators/catalog/operator.go | 20 +- .../operators/catalog/subscriptions_test.go | 2 +- .../registry/reconciler/grpc_address.go | 19 ++ .../registry/reconciler/grpc_test.go | 94 +++++--- .../registry/reconciler/reconciler.go | 16 +- pkg/fakes/fake_reconciler_reconciler.go | 84 +++---- pkg/package-server/provider/registry.go | 2 +- test/e2e/catalog_e2e_test.go | 209 +++++++++++++++++- test/e2e/util_test.go | 67 +++--- 11 files changed, 405 insertions(+), 118 deletions(-) create mode 100644 pkg/controller/registry/reconciler/grpc_address.go diff --git a/cmd/catalog/main.go b/cmd/catalog/main.go index 542d7a81374..6657af26811 100644 --- a/cmd/catalog/main.go +++ b/cmd/catalog/main.go @@ -99,4 +99,3 @@ func main() { _, done := catalogOperator.Run(stopCh) <-done } - diff --git a/pkg/api/apis/operators/v1alpha1/catalogsource_types.go b/pkg/api/apis/operators/v1alpha1/catalogsource_types.go index 6b316c6a35b..fa242de36dc 100644 --- a/pkg/api/apis/operators/v1alpha1/catalogsource_types.go +++ b/pkg/api/apis/operators/v1alpha1/catalogsource_types.go @@ -27,6 +27,7 @@ type CatalogSourceSpec struct { SourceType SourceType `json:"sourceType"` ConfigMap string `json:"configMap,omitempty"` Image string `json:"image,omitempty"` + Address string `json:"address,omitempty"` Secrets []string `json:"secrets,omitempty"` // Metadata @@ -53,6 +54,7 @@ type CatalogSourceStatus struct { RegistryServiceStatus *RegistryServiceStatus `json:"registryService,omitempty"` LastSync metav1.Time `json:"lastSync,omitempty"` } + type ConfigMapResourceReference struct { Name string `json:"name"` Namespace string `json:"namespace"` @@ -71,6 +73,13 @@ type CatalogSource struct { Status CatalogSourceStatus `json:"status"` } +func (c *CatalogSource) Address() string { + if c.Spec.Address != "" { + return c.Spec.Address + } + return c.Status.RegistryServiceStatus.Address() +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type CatalogSourceList struct { metav1.TypeMeta `json:",inline"` diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 5af8344e694..fb82124010f 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -77,7 +77,7 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti watchedNamespaces = []string{metav1.NamespaceAll} } - // Create a new client for ALM types (CRs) + // Create a new client for OLM types (CRs) crClient, err := client.NewClient(kubeconfigPath) if err != nil { return nil, err @@ -381,24 +381,26 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { return nil } + + logger.Debug("catsrc configmap state good, checking registry pod") } - reconciler := o.reconciler.ReconcilerForSourceType(catsrc.Spec.SourceType) + reconciler := o.reconciler.ReconcilerForSource(catsrc) if reconciler == nil { + // TODO: Add failure status on catalogsource and remove from sources return fmt.Errorf("no reconciler for source type %s", catsrc.Spec.SourceType) } - logger.Debug("catsrc configmap state good, checking registry pod") // if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) { - logger.Debug("registry pod scheduled for recheck") + logger.Debug("registry server scheduled recheck") if err := reconciler.EnsureRegistryServer(out); err != nil { logger.WithError(err).Warn("couldn't ensure registry server") return err } - logger.Debug("ensured registry pod") + logger.Debug("ensured registry server") out.Status.RegistryServiceStatus.CreatedAt = timeNow() out.Status.LastSync = timeNow() @@ -410,18 +412,18 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { } o.sourcesLastUpdate = timeNow() - logger.Debug("registry pod recreated") + logger.Debug("registry server recreated") return nil } - logger.Debug("registry pod state good") + logger.Debug("registry state good") // update operator's view of sources sourcesUpdated := false func() { o.sourcesLock.Lock() defer o.sourcesLock.Unlock() - address := catsrc.Status.RegistryServiceStatus.Address() + address := catsrc.Address() currentSource, ok := o.sources[sourceKey] logger = logger.WithField("currentSource", sourceKey) if !ok || currentSource.Address != address || catsrc.Status.LastSync.After(currentSource.LastConnect.Time) { @@ -664,7 +666,7 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string) logger.WithField("clientState", client.Conn.GetState()).Debug("source") if client.Conn.GetState() == connectivity.TransientFailure { logger.WithField("clientState", client.Conn.GetState()).Debug("waiting for connection") - ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second) + ctx, _ := context.WithTimeout(context.TODO(), 2*time.Second) changed := client.Conn.WaitForStateChange(ctx, connectivity.TransientFailure) if !changed { logger.WithField("clientState", client.Conn.GetState()).Debug("source in transient failure and didn't recover") diff --git a/pkg/controller/operators/catalog/subscriptions_test.go b/pkg/controller/operators/catalog/subscriptions_test.go index 1c343a66495..88b03fc7767 100644 --- a/pkg/controller/operators/catalog/subscriptions_test.go +++ b/pkg/controller/operators/catalog/subscriptions_test.go @@ -499,7 +499,7 @@ func TestSyncSubscriptions(t *testing.T) { require.NoError(t, err) o.reconciler = &fakes.FakeReconcilerFactory{ - ReconcilerForSourceTypeStub: func(sourceType v1alpha1.SourceType) reconciler.RegistryReconciler { + ReconcilerForSourceStub: func(source *v1alpha1.CatalogSource) reconciler.RegistryReconciler { return &fakes.FakeRegistryReconciler{ EnsureRegistryServerStub: func(source *v1alpha1.CatalogSource) error { return nil diff --git a/pkg/controller/registry/reconciler/grpc_address.go b/pkg/controller/registry/reconciler/grpc_address.go new file mode 100644 index 00000000000..88548769c94 --- /dev/null +++ b/pkg/controller/registry/reconciler/grpc_address.go @@ -0,0 +1,19 @@ +package reconciler + +import ( + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" +) + +type GrpcAddressRegistryReconciler struct{} + +var _ RegistryReconciler = &GrpcAddressRegistryReconciler{} + +func (g *GrpcAddressRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error { + + catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{ + CreatedAt: timeNow(), + Protocol: "grpc", + } + + return nil +} diff --git a/pkg/controller/registry/reconciler/grpc_test.go b/pkg/controller/registry/reconciler/grpc_test.go index b6ea2f47adb..918833c1b95 100644 --- a/pkg/controller/registry/reconciler/grpc_test.go +++ b/pkg/controller/registry/reconciler/grpc_test.go @@ -11,13 +11,14 @@ import ( "k8s.io/client-go/informers" k8sfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister" ) -func grpcReconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{}) (*GrpcRegistryReconciler, operatorclient.ClientInterface) { +func grpcReconcilerFactory(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{}) (ReconcilerFactory, operatorclient.ClientInterface) { opClientFake := operatorclient.NewClient(k8sfake.NewSimpleClientset(k8sObjs...), nil, nil) // Creates registry pods in response to configmaps @@ -46,7 +47,7 @@ func grpcReconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{ lister.CoreV1().RegisterPodLister(testNamespace, podInformer.Lister()) lister.CoreV1().RegisterConfigMapLister(testNamespace, configMapInformer.Lister()) - rec := &GrpcRegistryReconciler{ + rec := &RegistryReconcilerFactory{ OpClient: opClientFake, Lister: lister, } @@ -62,7 +63,7 @@ func grpcReconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{ return rec, opClientFake } -func validGrpcCatalogSource(image string) *v1alpha1.CatalogSource { +func validGrpcCatalogSource(image, address string) *v1alpha1.CatalogSource { return &v1alpha1.CatalogSource{ ObjectMeta: metav1.ObjectMeta{ Name: "img-catalog", @@ -71,6 +72,7 @@ func validGrpcCatalogSource(image string) *v1alpha1.CatalogSource { }, Spec: v1alpha1.CatalogSourceSpec{ Image: image, + Address: address, SourceType: v1alpha1.SourceTypeGrpc, }, } @@ -103,7 +105,36 @@ func TestGrpcRegistryReconciler(t *testing.T) { { testName: "Grpc/NoExistingRegistry/CreateSuccessful", in: in{ - catsrc: validGrpcCatalogSource("test-img"), + catsrc: validGrpcCatalogSource("test-img", ""), + }, + out: out{ + status: &v1alpha1.RegistryServiceStatus{ + CreatedAt: timeNow(), + Protocol: "grpc", + ServiceName: "img-catalog", + ServiceNamespace: testNamespace, + Port: "50051", + }, + }, + }, + { + testName: "Grpc/Address/CreateSuccessful", + in: in{ + cluster: cluster{}, + catsrc: validGrpcCatalogSource("", "catalog.svc.cluster.local:50001"), + }, + out: out{ + status: &v1alpha1.RegistryServiceStatus{ + CreatedAt: timeNow(), + Protocol: "grpc", + }, + }, + }, + { + testName: "Grpc/AddressAndImage/CreateSuccessful", + in: in{ + cluster: cluster{}, + catsrc: validGrpcCatalogSource("img-catalog", "catalog.svc.cluster.local:50001"), }, out: out{ status: &v1alpha1.RegistryServiceStatus{ @@ -119,9 +150,9 @@ func TestGrpcRegistryReconciler(t *testing.T) { testName: "Grpc/ExistingRegistry/BadServiceAccount", in: in{ cluster: cluster{ - k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "ServiceAccount", "badName"), + k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "ServiceAccount", "badName"), }, - catsrc: validGrpcCatalogSource("test-img"), + catsrc: validGrpcCatalogSource("test-img", ""), }, out: out{ status: &v1alpha1.RegistryServiceStatus{ @@ -137,9 +168,9 @@ func TestGrpcRegistryReconciler(t *testing.T) { testName: "Grpc/ExistingRegistry/BadService", in: in{ cluster: cluster{ - k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "Service", "badName"), + k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "Service", "badName"), }, - catsrc: validGrpcCatalogSource("test-img"), + catsrc: validGrpcCatalogSource("test-img", ""), }, out: out{ status: &v1alpha1.RegistryServiceStatus{ @@ -155,9 +186,9 @@ func TestGrpcRegistryReconciler(t *testing.T) { testName: "Grpc/ExistingRegistry/BadPod", in: in{ cluster: cluster{ - k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "Pod", "badName"), + k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "Pod", "badName"), }, - catsrc: validGrpcCatalogSource("test-img"), + catsrc: validGrpcCatalogSource("test-img", ""), }, out: out{ status: &v1alpha1.RegistryServiceStatus{ @@ -173,9 +204,9 @@ func TestGrpcRegistryReconciler(t *testing.T) { testName: "Grpc/ExistingRegistry/BadRole", in: in{ cluster: cluster{ - k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "Role", "badName"), + k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "Role", "badName"), }, - catsrc: validGrpcCatalogSource("test-img"), + catsrc: validGrpcCatalogSource("test-img", ""), }, out: out{ status: &v1alpha1.RegistryServiceStatus{ @@ -191,9 +222,9 @@ func TestGrpcRegistryReconciler(t *testing.T) { testName: "Grpc/ExistingRegistry/BadRoleBinding", in: in{ cluster: cluster{ - k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "RoleBinding", "badName"), + k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "RoleBinding", "badName"), }, - catsrc: validGrpcCatalogSource("test-img"), + catsrc: validGrpcCatalogSource("test-img", ""), }, out: out{ status: &v1alpha1.RegistryServiceStatus{ @@ -209,9 +240,9 @@ func TestGrpcRegistryReconciler(t *testing.T) { testName: "Grpc/ExistingRegistry/OldPod", in: in{ cluster: cluster{ - k8sObjs: objectsForCatalogSource(validGrpcCatalogSource("old-img")), + k8sObjs: objectsForCatalogSource(validGrpcCatalogSource("old-img", "")), }, - catsrc: validGrpcCatalogSource("new-img"), + catsrc: validGrpcCatalogSource("new-img", ""), }, out: out{ status: &v1alpha1.RegistryServiceStatus{ @@ -229,7 +260,8 @@ func TestGrpcRegistryReconciler(t *testing.T) { stopc := make(chan struct{}) defer close(stopc) - rec, client := grpcReconciler(t, tt.in.cluster.k8sObjs, stopc) + factory, client := grpcReconcilerFactory(t, tt.in.cluster.k8sObjs, stopc) + rec := factory.ReconcilerForSource(tt.in.catsrc) err := rec.EnsureRegistryServer(tt.in.catsrc) @@ -240,18 +272,28 @@ func TestGrpcRegistryReconciler(t *testing.T) { return } - // if no error, the reconciler should create the same set of kube objects every time + // Check for resource existence decorated := grpcCatalogSourceDecorator{tt.in.catsrc} - pod := decorated.Pod() - outPod, err := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Get(pod.GetName(), metav1.GetOptions{}) - require.NoError(t, err) - require.Equal(t, pod, outPod) - service := decorated.Service() - outService, err := client.KubernetesInterface().CoreV1().Services(service.GetNamespace()).Get(service.GetName(), metav1.GetOptions{}) - require.NoError(t, err) - require.Equal(t, service, outService) + outPod, podErr := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Get(pod.GetName(), metav1.GetOptions{}) + outService, serviceErr := client.KubernetesInterface().CoreV1().Services(service.GetNamespace()).Get(service.GetName(), metav1.GetOptions{}) + + switch rec.(type){ + case *GrpcRegistryReconciler: + // Should be created by a GrpcRegistryReconciler + require.NoError(t, podErr) + require.Equal(t, pod, outPod) + require.NoError(t, serviceErr) + require.Equal(t, service, outService) + case *GrpcAddressRegistryReconciler: + // Should not be created by a GrpcAddressRegistryReconciler + require.Error(t, podErr) + require.True(t, k8serrors.IsNotFound(podErr)) + require.NoError(t, err) + require.True(t, k8serrors.IsNotFound(serviceErr)) + } + }) } } diff --git a/pkg/controller/registry/reconciler/reconciler.go b/pkg/controller/registry/reconciler/reconciler.go index 95ebf88dad6..0808e0711f8 100644 --- a/pkg/controller/registry/reconciler/reconciler.go +++ b/pkg/controller/registry/reconciler/reconciler.go @@ -12,7 +12,7 @@ type RegistryReconciler interface { } type ReconcilerFactory interface { - ReconcilerForSourceType(sourceType v1alpha1.SourceType) RegistryReconciler + ReconcilerForSource(source *v1alpha1.CatalogSource) RegistryReconciler } type RegistryReconcilerFactory struct { @@ -21,8 +21,8 @@ type RegistryReconcilerFactory struct { ConfigMapServerImage string } -func (r *RegistryReconcilerFactory) ReconcilerForSourceType(sourceType v1alpha1.SourceType) RegistryReconciler { - switch sourceType { +func (r *RegistryReconcilerFactory) ReconcilerForSource(source *v1alpha1.CatalogSource) RegistryReconciler { + switch source.Spec.SourceType { case v1alpha1.SourceTypeInternal, v1alpha1.SourceTypeConfigmap: return &ConfigMapRegistryReconciler{ Lister: r.Lister, @@ -30,9 +30,13 @@ func (r *RegistryReconcilerFactory) ReconcilerForSourceType(sourceType v1alpha1. Image: r.ConfigMapServerImage, } case v1alpha1.SourceTypeGrpc: - return &GrpcRegistryReconciler{ - Lister: r.Lister, - OpClient: r.OpClient, + if source.Spec.Image != "" { + return &GrpcRegistryReconciler{ + Lister: r.Lister, + OpClient: r.OpClient, + } + } else if source.Spec.Address != "" { + return &GrpcAddressRegistryReconciler{} } } return nil diff --git a/pkg/fakes/fake_reconciler_reconciler.go b/pkg/fakes/fake_reconciler_reconciler.go index 8118aa50498..3069786dd36 100644 --- a/pkg/fakes/fake_reconciler_reconciler.go +++ b/pkg/fakes/fake_reconciler_reconciler.go @@ -9,77 +9,77 @@ import ( ) type FakeReconcilerFactory struct { - ReconcilerForSourceTypeStub func(v1alpha1.SourceType) reconciler.RegistryReconciler - reconcilerForSourceTypeMutex sync.RWMutex - reconcilerForSourceTypeArgsForCall []struct { - arg1 v1alpha1.SourceType + ReconcilerForSourceStub func(*v1alpha1.CatalogSource) reconciler.RegistryReconciler + reconcilerForSourceMutex sync.RWMutex + reconcilerForSourceArgsForCall []struct { + arg1 *v1alpha1.CatalogSource } - reconcilerForSourceTypeReturns struct { + reconcilerForSourceReturns struct { result1 reconciler.RegistryReconciler } - reconcilerForSourceTypeReturnsOnCall map[int]struct { + reconcilerForSourceReturnsOnCall map[int]struct { result1 reconciler.RegistryReconciler } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *FakeReconcilerFactory) ReconcilerForSourceType(arg1 v1alpha1.SourceType) reconciler.RegistryReconciler { - fake.reconcilerForSourceTypeMutex.Lock() - ret, specificReturn := fake.reconcilerForSourceTypeReturnsOnCall[len(fake.reconcilerForSourceTypeArgsForCall)] - fake.reconcilerForSourceTypeArgsForCall = append(fake.reconcilerForSourceTypeArgsForCall, struct { - arg1 v1alpha1.SourceType +func (fake *FakeReconcilerFactory) ReconcilerForSource(arg1 *v1alpha1.CatalogSource) reconciler.RegistryReconciler { + fake.reconcilerForSourceMutex.Lock() + ret, specificReturn := fake.reconcilerForSourceReturnsOnCall[len(fake.reconcilerForSourceArgsForCall)] + fake.reconcilerForSourceArgsForCall = append(fake.reconcilerForSourceArgsForCall, struct { + arg1 *v1alpha1.CatalogSource }{arg1}) - fake.recordInvocation("ReconcilerForSourceType", []interface{}{arg1}) - fake.reconcilerForSourceTypeMutex.Unlock() - if fake.ReconcilerForSourceTypeStub != nil { - return fake.ReconcilerForSourceTypeStub(arg1) + fake.recordInvocation("ReconcilerForSource", []interface{}{arg1}) + fake.reconcilerForSourceMutex.Unlock() + if fake.ReconcilerForSourceStub != nil { + return fake.ReconcilerForSourceStub(arg1) } if specificReturn { return ret.result1 } - fakeReturns := fake.reconcilerForSourceTypeReturns + fakeReturns := fake.reconcilerForSourceReturns return fakeReturns.result1 } -func (fake *FakeReconcilerFactory) ReconcilerForSourceTypeCallCount() int { - fake.reconcilerForSourceTypeMutex.RLock() - defer fake.reconcilerForSourceTypeMutex.RUnlock() - return len(fake.reconcilerForSourceTypeArgsForCall) +func (fake *FakeReconcilerFactory) ReconcilerForSourceCallCount() int { + fake.reconcilerForSourceMutex.RLock() + defer fake.reconcilerForSourceMutex.RUnlock() + return len(fake.reconcilerForSourceArgsForCall) } -func (fake *FakeReconcilerFactory) ReconcilerForSourceTypeCalls(stub func(v1alpha1.SourceType) reconciler.RegistryReconciler) { - fake.reconcilerForSourceTypeMutex.Lock() - defer fake.reconcilerForSourceTypeMutex.Unlock() - fake.ReconcilerForSourceTypeStub = stub +func (fake *FakeReconcilerFactory) ReconcilerForSourceCalls(stub func(*v1alpha1.CatalogSource) reconciler.RegistryReconciler) { + fake.reconcilerForSourceMutex.Lock() + defer fake.reconcilerForSourceMutex.Unlock() + fake.ReconcilerForSourceStub = stub } -func (fake *FakeReconcilerFactory) ReconcilerForSourceTypeArgsForCall(i int) v1alpha1.SourceType { - fake.reconcilerForSourceTypeMutex.RLock() - defer fake.reconcilerForSourceTypeMutex.RUnlock() - argsForCall := fake.reconcilerForSourceTypeArgsForCall[i] +func (fake *FakeReconcilerFactory) ReconcilerForSourceArgsForCall(i int) *v1alpha1.CatalogSource { + fake.reconcilerForSourceMutex.RLock() + defer fake.reconcilerForSourceMutex.RUnlock() + argsForCall := fake.reconcilerForSourceArgsForCall[i] return argsForCall.arg1 } -func (fake *FakeReconcilerFactory) ReconcilerForSourceTypeReturns(result1 reconciler.RegistryReconciler) { - fake.reconcilerForSourceTypeMutex.Lock() - defer fake.reconcilerForSourceTypeMutex.Unlock() - fake.ReconcilerForSourceTypeStub = nil - fake.reconcilerForSourceTypeReturns = struct { +func (fake *FakeReconcilerFactory) ReconcilerForSourceReturns(result1 reconciler.RegistryReconciler) { + fake.reconcilerForSourceMutex.Lock() + defer fake.reconcilerForSourceMutex.Unlock() + fake.ReconcilerForSourceStub = nil + fake.reconcilerForSourceReturns = struct { result1 reconciler.RegistryReconciler }{result1} } -func (fake *FakeReconcilerFactory) ReconcilerForSourceTypeReturnsOnCall(i int, result1 reconciler.RegistryReconciler) { - fake.reconcilerForSourceTypeMutex.Lock() - defer fake.reconcilerForSourceTypeMutex.Unlock() - fake.ReconcilerForSourceTypeStub = nil - if fake.reconcilerForSourceTypeReturnsOnCall == nil { - fake.reconcilerForSourceTypeReturnsOnCall = make(map[int]struct { +func (fake *FakeReconcilerFactory) ReconcilerForSourceReturnsOnCall(i int, result1 reconciler.RegistryReconciler) { + fake.reconcilerForSourceMutex.Lock() + defer fake.reconcilerForSourceMutex.Unlock() + fake.ReconcilerForSourceStub = nil + if fake.reconcilerForSourceReturnsOnCall == nil { + fake.reconcilerForSourceReturnsOnCall = make(map[int]struct { result1 reconciler.RegistryReconciler }) } - fake.reconcilerForSourceTypeReturnsOnCall[i] = struct { + fake.reconcilerForSourceReturnsOnCall[i] = struct { result1 reconciler.RegistryReconciler }{result1} } @@ -87,8 +87,8 @@ func (fake *FakeReconcilerFactory) ReconcilerForSourceTypeReturnsOnCall(i int, r func (fake *FakeReconcilerFactory) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.reconcilerForSourceTypeMutex.RLock() - defer fake.reconcilerForSourceTypeMutex.RUnlock() + fake.reconcilerForSourceMutex.RLock() + defer fake.reconcilerForSourceMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/package-server/provider/registry.go b/pkg/package-server/provider/registry.go index 5afff2a9f5a..c9fd2a5a049 100644 --- a/pkg/package-server/provider/registry.go +++ b/pkg/package-server/provider/registry.go @@ -152,7 +152,7 @@ func (p *RegistryProvider) syncCatalogSource(obj interface{}) (syncError error) } logger.Info("attempting to add a new grpc connection") - conn, err := grpc.Dial(source.Status.RegistryServiceStatus.Address(), grpc.WithInsecure()) + conn, err := grpc.Dial(source.Address(), grpc.WithInsecure()) if err != nil { logger.WithField("err", err.Error()).Errorf("could not connect to registry service") syncError = err diff --git a/test/e2e/catalog_e2e_test.go b/test/e2e/catalog_e2e_test.go index 6d35b49d1c1..2fe1f0145c2 100644 --- a/test/e2e/catalog_e2e_test.go +++ b/test/e2e/catalog_e2e_test.go @@ -12,12 +12,14 @@ import ( "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/wait" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" @@ -187,7 +189,7 @@ func TestConfigMapUpdateTriggersRegistryPodRollout(t *testing.T) { require.NoError(t, err) fetchedUpdatedCatalog, err := fetchCatalogSource(t, crc, mainCatalogName, testNamespace, func(catalog *v1alpha1.CatalogSource) bool { - if catalog.Status.LastSync != fetchedInitialCatalog.Status.LastSync { + if catalog.Status.LastSync != fetchedInitialCatalog.Status.LastSync && catalog.Status.ConfigMapResource.ResourceVersion != fetchedInitialCatalog.Status.ConfigMapResource.ResourceVersion { fmt.Println("catalog updated") return true } @@ -240,6 +242,7 @@ func TestConfigMapReplaceTriggersRegistryPodRollout(t *testing.T) { dependentPackageName := genName("nginxdep-") mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName) + dependentPackageStable := fmt.Sprintf("%s-stable", dependentPackageName) stableChannel := "stable" @@ -312,6 +315,146 @@ func TestConfigMapReplaceTriggersRegistryPodRollout(t *testing.T) { } +func TestGrpcAddressCatalogSource(t *testing.T) { + // Create an internal (configmap) CatalogSource with stable and dependency csv + // Create an internal (configmap) replacement CatalogSource with a stable, stable-replacement, and dependency csv + // Strip all OwnerReferences to both CatalogSources from generated resources (pods, services, rbac, etc...) + // Delete both CatalogSources, leaving their registry pods behind (no OwnerReferences = no GC) + // Create an "address" CatalogSource with a Spec.Address field set to the stable CatalogSource's service + // Create a Subscription to the stable package + // Wait for the stable Subscription to be Successful + // Wait for the stable CSV to be Successful + // Update the "address" CatalogSources's Spec.Address field with the PodIP of the replacement CatalogSource's registry pod + // Wait for the replacement CSV to be Successful + + defer cleaner.NotifyTestComplete(t, true) + + mainPackageName := genName("nginx-") + dependentPackageName := genName("nginxdep-") + + mainPackageStable := fmt.Sprintf("%s-stable", mainPackageName) + mainPackageReplacement := fmt.Sprintf("%s-replacement", mainPackageStable) + dependentPackageStable := fmt.Sprintf("%s-stable", dependentPackageName) + + stableChannel := "stable" + + mainNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil) + dependentNamedStrategy := newNginxInstallStrategy(genName("dep-"), nil, nil) + + crdPlural := genName("ins-") + + dependentCRD := newCRD(crdPlural) + mainCSV := newCSV(mainPackageStable, testNamespace, "", *semver.New("0.1.0"), nil, []apiextensions.CustomResourceDefinition{dependentCRD}, mainNamedStrategy) + replacementCSV := newCSV(mainPackageReplacement, testNamespace, mainPackageStable, *semver.New("0.2.0"), nil, []apiextensions.CustomResourceDefinition{dependentCRD}, mainNamedStrategy) + dependentCSV := newCSV(dependentPackageStable, testNamespace, "", *semver.New("0.1.0"), []apiextensions.CustomResourceDefinition{dependentCRD}, nil, dependentNamedStrategy) + + c := newKubeClient(t) + crc := newCRClient(t) + + mainCatalogName := genName("mock-ocs-main-") + replacementCatalogName := genName("mock-ocs-main-with-replacement-") + + // Create separate manifests for each CatalogSource + mainManifests := []registry.PackageManifest{ + { + PackageName: mainPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: mainPackageStable}, + }, + DefaultChannelName: stableChannel, + }, + } + + replacementManifests := []registry.PackageManifest{ + { + PackageName: mainPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: mainPackageReplacement}, + }, + DefaultChannelName: stableChannel, + }, + } + + dependentManifests := []registry.PackageManifest{ + { + PackageName: dependentPackageName, + Channels: []registry.PackageChannel{ + {Name: stableChannel, CurrentCSVName: dependentPackageStable}, + }, + DefaultChannelName: stableChannel, + }, + } + + // Create configmap catalogsources + createInternalCatalogSource(t, c, crc, mainCatalogName, testNamespace, append(mainManifests, dependentManifests...), []apiextensions.CustomResourceDefinition{dependentCRD}, []v1alpha1.ClusterServiceVersion{mainCSV, dependentCSV}) + createInternalCatalogSource(t, c, crc, replacementCatalogName, testNamespace, append(replacementManifests, dependentManifests...), []apiextensions.CustomResourceDefinition{dependentCRD}, []v1alpha1.ClusterServiceVersion{replacementCSV, mainCSV, dependentCSV}) + + // Strip all OwnerReferences from generated catalog resources + cleanup := stripConfigMapCatalogResourceOwnerReferences(t, c, crc, mainCatalogName, testNamespace) + defer cleanup() + cleanupReplacement := stripConfigMapCatalogResourceOwnerReferences(t, c, crc, replacementCatalogName, testNamespace) + defer cleanupReplacement() + + // Delete CatalogSources - will leave running grpc pods behind + err := crc.OperatorsV1alpha1().CatalogSources(testNamespace).Delete(mainCatalogName, metav1.NewDeleteOptions(0)) + require.NoError(t, err) + err = crc.OperatorsV1alpha1().CatalogSources(testNamespace).Delete(replacementCatalogName, metav1.NewDeleteOptions(0)) + require.NoError(t, err) + + mainCatalogAddress := fmt.Sprintf("%s.%s.svc.cluster.local:%d", mainCatalogName, testNamespace, 50051) + addressCatalogName := genName("address-catalog-") + + // Create a CatalogSource pointing to the grpc pod + addressSource := &v1alpha1.CatalogSource{ + TypeMeta: metav1.TypeMeta{ + Kind: v1alpha1.CatalogSourceKind, + APIVersion: v1alpha1.CatalogSourceCRDAPIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: addressCatalogName, + Namespace: testNamespace, + }, + Spec: v1alpha1.CatalogSourceSpec{ + SourceType: v1alpha1.SourceTypeGrpc, + Address: mainCatalogAddress, + }, + } + + addressSource, err = crc.OperatorsV1alpha1().CatalogSources(testNamespace).Create(addressSource) + if err != nil && !errors.IsAlreadyExists(err) { + require.NoError(t, err) + } + defer func(){ + err := crc.OperatorsV1alpha1().CatalogSources(testNamespace).Delete(addressSource.GetName(), &metav1.DeleteOptions{}) + require.NoError(t, err) + }() + + // Create Subscription + subscriptionName := genName("sub-") + createSubscriptionForCatalog(t, crc, testNamespace, subscriptionName, addressSource.GetName(), mainPackageName, stableChannel, "", v1alpha1.ApprovalAutomatic) + + subscription, err := fetchSubscription(t, crc, testNamespace, subscriptionName, subscriptionStateAtLatestChecker) + require.NoError(t, err) + require.NotNil(t, subscription) + _, err = fetchCSV(t, crc, subscription.Status.CurrentCSV, testNamespace, csvSucceededChecker) + require.NoError(t, err) + + // Update the catalog's address to point at the other registry pod's cluster ip + pods, err := c.KubernetesInterface().CoreV1().Pods(testNamespace).List(metav1.ListOptions{LabelSelector: "olm.catalogSource=" + replacementCatalogName}) + require.NoError(t, err) + require.Equal(t, 1, len(pods.Items)) + pod := pods.Items[0] + addressSource, err = crc.OperatorsV1alpha1().CatalogSources(testNamespace).Get(addressSource.GetName(), metav1.GetOptions{}) + require.NoError(t, err) + addressSource.Spec.Address = fmt.Sprintf("%s:%s", pod.Status.PodIP, "50051") + _, err = crc.OperatorsV1alpha1().CatalogSources(testNamespace).Update(addressSource) + require.NoError(t, err) + + // Wait for the replacement CSV to be installed + _, err = awaitCSV(t, crc, testNamespace, replacementCSV.GetName(), csvSucceededChecker) + require.NoError(t, err) +} + func getOperatorDeployment(c operatorclient.ClientInterface, namespace string, operatorLabels labels.Set) (*appsv1.Deployment, error) { deployments, err := c.ListDeploymentsWithLabels(namespace, operatorLabels) if err != nil || deployments == nil || len(deployments.Items) != 1 { @@ -360,3 +503,67 @@ func rescaleDeployment(c operatorclient.ClientInterface, deployment *appsv1.Depl return err } + +func stripConfigMapCatalogResourceOwnerReferences(t *testing.T, c operatorclient.ClientInterface, crc versioned.Interface, catalogName, catalogNamespace string) (cleanup func()) { + // Attempt to get the catalog source before creating install plan + fetchedInitialCatalog, err := fetchCatalogSource(t, crc, catalogName, catalogNamespace, catalogSourceRegistryPodSynced) + require.NoError(t, err) + // Get initial configmap + configMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(catalogNamespace).Get(fetchedInitialCatalog.Spec.ConfigMap, metav1.GetOptions{}) + require.NoError(t, err) + + // Check pod created + initialPods, err := c.KubernetesInterface().CoreV1().Pods(catalogNamespace).List(metav1.ListOptions{LabelSelector: "olm.configMapResourceVersion=" + configMap.ResourceVersion}) + require.NoError(t, err) + require.Equal(t, 1, len(initialPods.Items)) + + // remove ownerreferences from created resources + pod := initialPods.Items[0] + pod.SetOwnerReferences(nil) + _, err = c.KubernetesInterface().CoreV1().Pods(catalogNamespace).Update(&pod) + require.NoError(t, err) + + configMap.SetOwnerReferences(nil) + _, err = c.KubernetesInterface().CoreV1().ConfigMaps(catalogNamespace).Update(configMap) + require.NoError(t, err) + + service, err := c.KubernetesInterface().CoreV1().Services(catalogNamespace).Get(catalogName, metav1.GetOptions{}) + require.NoError(t, err) + service.SetOwnerReferences(nil) + _, err = c.KubernetesInterface().CoreV1().Services(catalogNamespace).Update(service) + require.NoError(t, err) + + serviceAccount, err := c.KubernetesInterface().CoreV1().ServiceAccounts(catalogNamespace).Get(catalogName+"-configmap-server", metav1.GetOptions{}) + require.NoError(t, err) + serviceAccount.SetOwnerReferences(nil) + _, err = c.KubernetesInterface().CoreV1().ServiceAccounts(catalogNamespace).Update(serviceAccount) + require.NoError(t, err) + + r, err := c.KubernetesInterface().RbacV1().Roles(catalogNamespace).Get(catalogName+"-configmap-reader", metav1.GetOptions{}) + require.NoError(t, err) + r.SetOwnerReferences(nil) + _, err = c.KubernetesInterface().RbacV1().Roles(catalogNamespace).Update(r) + require.NoError(t, err) + + rb, err := c.KubernetesInterface().RbacV1().RoleBindings(catalogNamespace).Get(catalogName+"-server-configmap-reader", metav1.GetOptions{}) + require.NoError(t, err) + rb.SetOwnerReferences(nil) + _, err = c.KubernetesInterface().RbacV1().RoleBindings(catalogNamespace).Update(rb) + require.NoError(t, err) + + return func(){ + options := &metav1.DeleteOptions{} + err := c.KubernetesInterface().CoreV1().ConfigMaps(catalogNamespace).Delete(configMap.GetName(), options) + require.NoError(t, err) + err = c.KubernetesInterface().CoreV1().Pods(catalogNamespace).Delete(pod.GetName(), options) + require.NoError(t, err) + err = c.KubernetesInterface().CoreV1().Services(catalogNamespace).Delete(service.GetName(), options) + require.NoError(t, err) + c.KubernetesInterface().CoreV1().ServiceAccounts(catalogNamespace).Delete(serviceAccount.GetName(), options) + require.NoError(t, err) + err = c.KubernetesInterface().RbacV1().Roles(catalogNamespace).Delete(r.GetName(), options) + require.NoError(t, err) + c.KubernetesInterface().RbacV1().RoleBindings(catalogNamespace).Delete(rb.GetName(), options) + require.NoError(t, err) + } +} \ No newline at end of file diff --git a/test/e2e/util_test.go b/test/e2e/util_test.go index 082550a0294..30f027ab91a 100644 --- a/test/e2e/util_test.go +++ b/test/e2e/util_test.go @@ -344,6 +344,40 @@ func buildServiceAccountCleanupFunc(t *testing.T, c operatorclient.ClientInterfa } func createInternalCatalogSource(t *testing.T, c operatorclient.ClientInterface, crc versioned.Interface, name, namespace string, manifests []registry.PackageManifest, crds []apiextensions.CustomResourceDefinition, csvs []v1alpha1.ClusterServiceVersion) (*v1alpha1.CatalogSource, cleanupFunc) { + configMap, configMapCleanup := createConfigMapForCatalogData(t, c, name, namespace, manifests, crds, csvs) + + // Create an internal CatalogSource custom resource pointing to the ConfigMap + catalogSource := &v1alpha1.CatalogSource{ + TypeMeta: metav1.TypeMeta{ + Kind: v1alpha1.CatalogSourceKind, + APIVersion: v1alpha1.CatalogSourceCRDAPIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.CatalogSourceSpec{ + SourceType: "internal", + ConfigMap: configMap.GetName(), + }, + } + catalogSource.SetNamespace(namespace) + + t.Logf("Creating catalog source %s in namespace %s...", name, namespace) + catalogSource, err := crc.OperatorsV1alpha1().CatalogSources(namespace).Create(catalogSource) + if err != nil && !errors.IsAlreadyExists(err) { + require.NoError(t, err) + } + t.Logf("Catalog source %s created", name) + + cleanupInternalCatalogSource := func() { + configMapCleanup() + buildCatalogSourceCleanupFunc(t, crc, namespace, catalogSource)() + } + return catalogSource, cleanupInternalCatalogSource +} + +func createConfigMapForCatalogData(t *testing.T, c operatorclient.ClientInterface, name, namespace string, manifests []registry.PackageManifest, crds []apiextensions.CustomResourceDefinition, csvs []v1alpha1.ClusterServiceVersion) (*corev1.ConfigMap, cleanupFunc) { // Create a config map containing the PackageManifests and CSVs configMapName := fmt.Sprintf("%s-configmap", name) catalogConfigMap := &corev1.ConfigMap{ @@ -382,40 +416,11 @@ func createInternalCatalogSource(t *testing.T, c operatorclient.ClientInterface, catalogConfigMap.Data[registry.ConfigMapCSVName] = string(csvsRaw) } - _, err := c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Create(catalogConfigMap) - if err != nil && !errors.IsAlreadyExists(err) { - require.NoError(t, err) - } - - // Create an internal CatalogSource custom resource pointing to the ConfigMap - catalogSource := &v1alpha1.CatalogSource{ - TypeMeta: metav1.TypeMeta{ - Kind: v1alpha1.CatalogSourceKind, - APIVersion: v1alpha1.CatalogSourceCRDAPIVersion, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - Spec: v1alpha1.CatalogSourceSpec{ - SourceType: "internal", - ConfigMap: configMapName, - }, - } - catalogSource.SetNamespace(namespace) - - t.Logf("Creating catalog source %s in namespace %s...", name, namespace) - catalogSource, err = crc.OperatorsV1alpha1().CatalogSources(namespace).Create(catalogSource) + createdConfigMap, err := c.KubernetesInterface().CoreV1().ConfigMaps(namespace).Create(catalogConfigMap) if err != nil && !errors.IsAlreadyExists(err) { require.NoError(t, err) } - t.Logf("Catalog source %s created", name) - - cleanupInternalCatalogSource := func() { - buildConfigMapCleanupFunc(t, c, namespace, catalogConfigMap)() - buildCatalogSourceCleanupFunc(t, crc, namespace, catalogSource)() - } - return catalogSource, cleanupInternalCatalogSource + return createdConfigMap, buildConfigMapCleanupFunc(t, c, namespace, createdConfigMap) } func serializeCRD(t *testing.T, crd apiextensions.CustomResourceDefinition) string {