From a4e3db4469c7346a55a3c74dccc8386cfe5b2db0 Mon Sep 17 00:00:00 2001 From: Jeff Peeler Date: Mon, 24 Sep 2018 17:30:57 -0400 Subject: [PATCH] WIP: add operator group control loop So far the loop only looks at the namespace selector and writes the results to the status and an annotation on each deployment. --- pkg/controller/operators/olm/operator.go | 72 ++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index 7b2762f5946..750fe0f47cb 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -3,6 +3,7 @@ package olm import ( "errors" "fmt" + "strings" "time" log "github.com/sirupsen/logrus" @@ -15,6 +16,7 @@ import ( apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1" + "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha2" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/annotator" @@ -131,6 +133,28 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt for _, informer := range depQueueInformers { op.RegisterQueueInformer(informer) } + + // Create an informer for the operator group + operatorGroupInformers := []cache.SharedIndexInformer{} + for _, namespace := range namespaces { + informerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace)) + operatorGroupInformers = append(operatorGroupInformers, informerFactory.Operators().V1alpha2().OperatorGroups().Informer()) + } + + // Register OperatorGroup informers. + operatorGroupQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "operatorgroups") + operatorGroupQueueInformer := queueinformer.New( + operatorGroupQueue, + operatorGroupInformers, + op.syncOperatorGroups, + nil, + "operatorgroups", + metrics.NewMetricsNil(), + ) + for _, informer := range operatorGroupQueueInformer { + op.RegisterQueueInformer(informer) + } + return op, nil } @@ -158,6 +182,54 @@ func (a *Operator) syncDeployment(obj interface{}) (syncError error) { return nil } +func (a *Operator) syncOperatorGroups(obj interface{}) error { + op, ok := obj.(*v1alpha2.OperatorGroup) + if !ok { + log.Debugf("wrong type: %#v\n", obj) + return fmt.Errorf("casting OperatorGroup failed") + } + + // write namespace matches to status field + selector, err := metav1.LabelSelectorAsSelector(&op.Spec.Selector) + if err != nil { + return err + } + operatorGroupOpts := metav1.ListOptions{LabelSelector: selector.String()} + namespaceList, err := a.OpClient.KubernetesInterface().CoreV1().Namespaces().List(operatorGroupOpts) + if err != nil { + return err + } + op.Status.Namespaces = namespaceList.Items + + // make string list made up of comma separated namespaces + var b strings.Builder + nsCount := len(namespaceList.Items) + for i := 0; i < nsCount-1; i++ { + fmt.Fprintf(&b, "%v,", namespaceList.Items[i]) + } + fmt.Fprintf(&b, "%v", namespaceList.Items[nsCount-1]) + + // write namespaces to watch in every deployment + csvsInNamespace := a.csvsInNamespace(op.GetNamespace()) + for csvName, csv := range csvsInNamespace { + strategy, err := a.resolver.UnmarshalStrategy(csv.Spec.InstallStrategy) + if err != nil { + return fmt.Errorf("error unmarshaling strategy from ClusterServiceVersion '%s' with error: %s", csvName, err) + } + + strategyDetailsDeployment, ok := strategy.(*install.StrategyDetailsDeployment) + if !ok { + return fmt.Errorf("could not assert strategy implementation as deployment for CSV %s", csvName) + } + + for _, deploy := range strategyDetailsDeployment.DeploymentSpecs { + deploy.Spec.Template.Annotations["olm.targetNamespaces"] = b.String() + } + } + + return nil +} + // syncClusterServiceVersion is the method that gets called when we see a CSV event in the cluster func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error) { clusterServiceVersion, ok := obj.(*v1alpha1.ClusterServiceVersion)