Skip to content

Commit

Permalink
WIP: add operator group control loop
Browse files Browse the repository at this point in the history
So far the loop only looks at the namespace selector and writes the
results to the status and an annotation on each deployment.
  • Loading branch information
Jeff Peeler committed Sep 24, 2018
1 parent f876b79 commit a4e3db4
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package olm
import (
"errors"
"fmt"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -15,6 +16,7 @@ import (
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha2"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/annotator"
Expand Down Expand Up @@ -131,6 +133,28 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt
for _, informer := range depQueueInformers {
op.RegisterQueueInformer(informer)
}

// Create an informer for the operator group
operatorGroupInformers := []cache.SharedIndexInformer{}
for _, namespace := range namespaces {
informerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace))
operatorGroupInformers = append(operatorGroupInformers, informerFactory.Operators().V1alpha2().OperatorGroups().Informer())
}

// Register OperatorGroup informers.
operatorGroupQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "operatorgroups")
operatorGroupQueueInformer := queueinformer.New(
operatorGroupQueue,
operatorGroupInformers,
op.syncOperatorGroups,
nil,
"operatorgroups",
metrics.NewMetricsNil(),
)
for _, informer := range operatorGroupQueueInformer {
op.RegisterQueueInformer(informer)
}

return op, nil
}

Expand Down Expand Up @@ -158,6 +182,54 @@ func (a *Operator) syncDeployment(obj interface{}) (syncError error) {
return nil
}

func (a *Operator) syncOperatorGroups(obj interface{}) error {
op, ok := obj.(*v1alpha2.OperatorGroup)
if !ok {
log.Debugf("wrong type: %#v\n", obj)
return fmt.Errorf("casting OperatorGroup failed")
}

// write namespace matches to status field
selector, err := metav1.LabelSelectorAsSelector(&op.Spec.Selector)
if err != nil {
return err
}
operatorGroupOpts := metav1.ListOptions{LabelSelector: selector.String()}
namespaceList, err := a.OpClient.KubernetesInterface().CoreV1().Namespaces().List(operatorGroupOpts)
if err != nil {
return err
}
op.Status.Namespaces = namespaceList.Items

// make string list made up of comma separated namespaces
var b strings.Builder
nsCount := len(namespaceList.Items)
for i := 0; i < nsCount-1; i++ {
fmt.Fprintf(&b, "%v,", namespaceList.Items[i])
}
fmt.Fprintf(&b, "%v", namespaceList.Items[nsCount-1])

// write namespaces to watch in every deployment
csvsInNamespace := a.csvsInNamespace(op.GetNamespace())
for csvName, csv := range csvsInNamespace {
strategy, err := a.resolver.UnmarshalStrategy(csv.Spec.InstallStrategy)
if err != nil {
return fmt.Errorf("error unmarshaling strategy from ClusterServiceVersion '%s' with error: %s", csvName, err)
}

strategyDetailsDeployment, ok := strategy.(*install.StrategyDetailsDeployment)
if !ok {
return fmt.Errorf("could not assert strategy implementation as deployment for CSV %s", csvName)
}

for _, deploy := range strategyDetailsDeployment.DeploymentSpecs {
deploy.Spec.Template.Annotations["olm.targetNamespaces"] = b.String()
}
}

return nil
}

// syncClusterServiceVersion is the method that gets called when we see a CSV event in the cluster
func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error) {
clusterServiceVersion, ok := obj.(*v1alpha1.ClusterServiceVersion)
Expand Down

0 comments on commit a4e3db4

Please sign in to comment.