Skip to content

Commit

Permalink
feat(catalogsource): allow grpc source types that don't require an image
Browse files Browse the repository at this point in the history
instead, they provide an address directly. OLM will not have visibility
into the resources required for running the grpc catalog for this case,
but will still connect and health check.
  • Loading branch information
ecordell authored and njhale committed Feb 14, 2019
1 parent 96456a3 commit 0580e82
Show file tree
Hide file tree
Showing 11 changed files with 405 additions and 118 deletions.
1 change: 0 additions & 1 deletion cmd/catalog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,3 @@ func main() {
_, done := catalogOperator.Run(stopCh)
<-done
}

9 changes: 9 additions & 0 deletions pkg/api/apis/operators/v1alpha1/catalogsource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type CatalogSourceSpec struct {
SourceType SourceType `json:"sourceType"`
ConfigMap string `json:"configMap,omitempty"`
Image string `json:"image,omitempty"`
Address string `json:"address,omitempty"`
Secrets []string `json:"secrets,omitempty"`

// Metadata
Expand All @@ -53,6 +54,7 @@ type CatalogSourceStatus struct {
RegistryServiceStatus *RegistryServiceStatus `json:"registryService,omitempty"`
LastSync metav1.Time `json:"lastSync,omitempty"`
}

type ConfigMapResourceReference struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Expand All @@ -71,6 +73,13 @@ type CatalogSource struct {
Status CatalogSourceStatus `json:"status"`
}

func (c *CatalogSource) Address() string {
if c.Spec.Address != "" {
return c.Spec.Address
}
return c.Status.RegistryServiceStatus.Address()
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type CatalogSourceList struct {
metav1.TypeMeta `json:",inline"`
Expand Down
20 changes: 11 additions & 9 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
watchedNamespaces = []string{metav1.NamespaceAll}
}

// Create a new client for ALM types (CRs)
// Create a new client for OLM types (CRs)
crClient, err := client.NewClient(kubeconfigPath)
if err != nil {
return nil, err
Expand Down Expand Up @@ -381,24 +381,26 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {

return nil
}

logger.Debug("catsrc configmap state good, checking registry pod")
}

reconciler := o.reconciler.ReconcilerForSourceType(catsrc.Spec.SourceType)
reconciler := o.reconciler.ReconcilerForSource(catsrc)
if reconciler == nil {
// TODO: Add failure status on catalogsource and remove from sources
return fmt.Errorf("no reconciler for source type %s", catsrc.Spec.SourceType)
}

logger.Debug("catsrc configmap state good, checking registry pod")

// if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
if catsrc.Status.RegistryServiceStatus == nil || catsrc.Status.RegistryServiceStatus.CreatedAt.Before(&catsrc.Status.LastSync) {
logger.Debug("registry pod scheduled for recheck")
logger.Debug("registry server scheduled recheck")

if err := reconciler.EnsureRegistryServer(out); err != nil {
logger.WithError(err).Warn("couldn't ensure registry server")
return err
}
logger.Debug("ensured registry pod")
logger.Debug("ensured registry server")

out.Status.RegistryServiceStatus.CreatedAt = timeNow()
out.Status.LastSync = timeNow()
Expand All @@ -410,18 +412,18 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
}

o.sourcesLastUpdate = timeNow()
logger.Debug("registry pod recreated")
logger.Debug("registry server recreated")

return nil
}
logger.Debug("registry pod state good")
logger.Debug("registry state good")

// update operator's view of sources
sourcesUpdated := false
func() {
o.sourcesLock.Lock()
defer o.sourcesLock.Unlock()
address := catsrc.Status.RegistryServiceStatus.Address()
address := catsrc.Address()
currentSource, ok := o.sources[sourceKey]
logger = logger.WithField("currentSource", sourceKey)
if !ok || currentSource.Address != address || catsrc.Status.LastSync.After(currentSource.LastConnect.Time) {
Expand Down Expand Up @@ -664,7 +666,7 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string)
logger.WithField("clientState", client.Conn.GetState()).Debug("source")
if client.Conn.GetState() == connectivity.TransientFailure {
logger.WithField("clientState", client.Conn.GetState()).Debug("waiting for connection")
ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
ctx, _ := context.WithTimeout(context.TODO(), 2*time.Second)
changed := client.Conn.WaitForStateChange(ctx, connectivity.TransientFailure)
if !changed {
logger.WithField("clientState", client.Conn.GetState()).Debug("source in transient failure and didn't recover")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func TestSyncSubscriptions(t *testing.T) {
require.NoError(t, err)

o.reconciler = &fakes.FakeReconcilerFactory{
ReconcilerForSourceTypeStub: func(sourceType v1alpha1.SourceType) reconciler.RegistryReconciler {
ReconcilerForSourceStub: func(source *v1alpha1.CatalogSource) reconciler.RegistryReconciler {
return &fakes.FakeRegistryReconciler{
EnsureRegistryServerStub: func(source *v1alpha1.CatalogSource) error {
return nil
Expand Down
19 changes: 19 additions & 0 deletions pkg/controller/registry/reconciler/grpc_address.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package reconciler

import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
)

type GrpcAddressRegistryReconciler struct{}

var _ RegistryReconciler = &GrpcAddressRegistryReconciler{}

func (g *GrpcAddressRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.CatalogSource) error {

catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{
CreatedAt: timeNow(),
Protocol: "grpc",
}

return nil
}
94 changes: 68 additions & 26 deletions pkg/controller/registry/reconciler/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"k8s.io/client-go/informers"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
k8serrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
)

func grpcReconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{}) (*GrpcRegistryReconciler, operatorclient.ClientInterface) {
func grpcReconcilerFactory(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{}) (ReconcilerFactory, operatorclient.ClientInterface) {
opClientFake := operatorclient.NewClient(k8sfake.NewSimpleClientset(k8sObjs...), nil, nil)

// Creates registry pods in response to configmaps
Expand Down Expand Up @@ -46,7 +47,7 @@ func grpcReconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{
lister.CoreV1().RegisterPodLister(testNamespace, podInformer.Lister())
lister.CoreV1().RegisterConfigMapLister(testNamespace, configMapInformer.Lister())

rec := &GrpcRegistryReconciler{
rec := &RegistryReconcilerFactory{
OpClient: opClientFake,
Lister: lister,
}
Expand All @@ -62,7 +63,7 @@ func grpcReconciler(t *testing.T, k8sObjs []runtime.Object, stopc <-chan struct{
return rec, opClientFake
}

func validGrpcCatalogSource(image string) *v1alpha1.CatalogSource {
func validGrpcCatalogSource(image, address string) *v1alpha1.CatalogSource {
return &v1alpha1.CatalogSource{
ObjectMeta: metav1.ObjectMeta{
Name: "img-catalog",
Expand All @@ -71,6 +72,7 @@ func validGrpcCatalogSource(image string) *v1alpha1.CatalogSource {
},
Spec: v1alpha1.CatalogSourceSpec{
Image: image,
Address: address,
SourceType: v1alpha1.SourceTypeGrpc,
},
}
Expand Down Expand Up @@ -103,7 +105,36 @@ func TestGrpcRegistryReconciler(t *testing.T) {
{
testName: "Grpc/NoExistingRegistry/CreateSuccessful",
in: in{
catsrc: validGrpcCatalogSource("test-img"),
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
CreatedAt: timeNow(),
Protocol: "grpc",
ServiceName: "img-catalog",
ServiceNamespace: testNamespace,
Port: "50051",
},
},
},
{
testName: "Grpc/Address/CreateSuccessful",
in: in{
cluster: cluster{},
catsrc: validGrpcCatalogSource("", "catalog.svc.cluster.local:50001"),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
CreatedAt: timeNow(),
Protocol: "grpc",
},
},
},
{
testName: "Grpc/AddressAndImage/CreateSuccessful",
in: in{
cluster: cluster{},
catsrc: validGrpcCatalogSource("img-catalog", "catalog.svc.cluster.local:50001"),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
Expand All @@ -119,9 +150,9 @@ func TestGrpcRegistryReconciler(t *testing.T) {
testName: "Grpc/ExistingRegistry/BadServiceAccount",
in: in{
cluster: cluster{
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "ServiceAccount", "badName"),
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "ServiceAccount", "badName"),
},
catsrc: validGrpcCatalogSource("test-img"),
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
Expand All @@ -137,9 +168,9 @@ func TestGrpcRegistryReconciler(t *testing.T) {
testName: "Grpc/ExistingRegistry/BadService",
in: in{
cluster: cluster{
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "Service", "badName"),
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "Service", "badName"),
},
catsrc: validGrpcCatalogSource("test-img"),
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
Expand All @@ -155,9 +186,9 @@ func TestGrpcRegistryReconciler(t *testing.T) {
testName: "Grpc/ExistingRegistry/BadPod",
in: in{
cluster: cluster{
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "Pod", "badName"),
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "Pod", "badName"),
},
catsrc: validGrpcCatalogSource("test-img"),
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
Expand All @@ -173,9 +204,9 @@ func TestGrpcRegistryReconciler(t *testing.T) {
testName: "Grpc/ExistingRegistry/BadRole",
in: in{
cluster: cluster{
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "Role", "badName"),
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "Role", "badName"),
},
catsrc: validGrpcCatalogSource("test-img"),
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
Expand All @@ -191,9 +222,9 @@ func TestGrpcRegistryReconciler(t *testing.T) {
testName: "Grpc/ExistingRegistry/BadRoleBinding",
in: in{
cluster: cluster{
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img")), "RoleBinding", "badName"),
k8sObjs: modifyObjName(objectsForCatalogSource(validGrpcCatalogSource("test-img", "")), "RoleBinding", "badName"),
},
catsrc: validGrpcCatalogSource("test-img"),
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
Expand All @@ -209,9 +240,9 @@ func TestGrpcRegistryReconciler(t *testing.T) {
testName: "Grpc/ExistingRegistry/OldPod",
in: in{
cluster: cluster{
k8sObjs: objectsForCatalogSource(validGrpcCatalogSource("old-img")),
k8sObjs: objectsForCatalogSource(validGrpcCatalogSource("old-img", "")),
},
catsrc: validGrpcCatalogSource("new-img"),
catsrc: validGrpcCatalogSource("new-img", ""),
},
out: out{
status: &v1alpha1.RegistryServiceStatus{
Expand All @@ -229,7 +260,8 @@ func TestGrpcRegistryReconciler(t *testing.T) {
stopc := make(chan struct{})
defer close(stopc)

rec, client := grpcReconciler(t, tt.in.cluster.k8sObjs, stopc)
factory, client := grpcReconcilerFactory(t, tt.in.cluster.k8sObjs, stopc)
rec := factory.ReconcilerForSource(tt.in.catsrc)

err := rec.EnsureRegistryServer(tt.in.catsrc)

Expand All @@ -240,18 +272,28 @@ func TestGrpcRegistryReconciler(t *testing.T) {
return
}

// if no error, the reconciler should create the same set of kube objects every time
// Check for resource existence
decorated := grpcCatalogSourceDecorator{tt.in.catsrc}

pod := decorated.Pod()
outPod, err := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Get(pod.GetName(), metav1.GetOptions{})
require.NoError(t, err)
require.Equal(t, pod, outPod)

service := decorated.Service()
outService, err := client.KubernetesInterface().CoreV1().Services(service.GetNamespace()).Get(service.GetName(), metav1.GetOptions{})
require.NoError(t, err)
require.Equal(t, service, outService)
outPod, podErr := client.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Get(pod.GetName(), metav1.GetOptions{})
outService, serviceErr := client.KubernetesInterface().CoreV1().Services(service.GetNamespace()).Get(service.GetName(), metav1.GetOptions{})

switch rec.(type){
case *GrpcRegistryReconciler:
// Should be created by a GrpcRegistryReconciler
require.NoError(t, podErr)
require.Equal(t, pod, outPod)
require.NoError(t, serviceErr)
require.Equal(t, service, outService)
case *GrpcAddressRegistryReconciler:
// Should not be created by a GrpcAddressRegistryReconciler
require.Error(t, podErr)
require.True(t, k8serrors.IsNotFound(podErr))
require.NoError(t, err)
require.True(t, k8serrors.IsNotFound(serviceErr))
}

})
}
}
16 changes: 10 additions & 6 deletions pkg/controller/registry/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type RegistryReconciler interface {
}

type ReconcilerFactory interface {
ReconcilerForSourceType(sourceType v1alpha1.SourceType) RegistryReconciler
ReconcilerForSource(source *v1alpha1.CatalogSource) RegistryReconciler
}

type RegistryReconcilerFactory struct {
Expand All @@ -21,18 +21,22 @@ type RegistryReconcilerFactory struct {
ConfigMapServerImage string
}

func (r *RegistryReconcilerFactory) ReconcilerForSourceType(sourceType v1alpha1.SourceType) RegistryReconciler {
switch sourceType {
func (r *RegistryReconcilerFactory) ReconcilerForSource(source *v1alpha1.CatalogSource) RegistryReconciler {
switch source.Spec.SourceType {
case v1alpha1.SourceTypeInternal, v1alpha1.SourceTypeConfigmap:
return &ConfigMapRegistryReconciler{
Lister: r.Lister,
OpClient: r.OpClient,
Image: r.ConfigMapServerImage,
}
case v1alpha1.SourceTypeGrpc:
return &GrpcRegistryReconciler{
Lister: r.Lister,
OpClient: r.OpClient,
if source.Spec.Image != "" {
return &GrpcRegistryReconciler{
Lister: r.Lister,
OpClient: r.OpClient,
}
} else if source.Spec.Address != "" {
return &GrpcAddressRegistryReconciler{}
}
}
return nil
Expand Down
Loading

0 comments on commit 0580e82

Please sign in to comment.