Skip to content

Commit

Permalink
operators/olm: record and expose infromers (#3005)
Browse files Browse the repository at this point in the history
Plugins should re-use informers that the host started, since there's no
need to double the number of watches and caches we're doing to reconcile
the same set of objects in one process.

Signed-off-by: Steve Kuznetsov <[email protected]>
  • Loading branch information
stevekuznetsov authored Aug 7, 2023
1 parent 2c3434a commit a7f102a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
32 changes: 29 additions & 3 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ type Operator struct {
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
clientFactory clients.Factory
plugins []plugins.OperatorPlugin
informersByNamespace map[string]*plugins.Informers
}

func (a *Operator) Informers() map[string]*plugins.Informers {
return a.informersByNamespace
}

func NewOperator(ctx context.Context, options ...OperatorOption) (*Operator, error) {
Expand Down Expand Up @@ -159,9 +164,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
protectedCopiedCSVNamespaces: config.protectedCopiedCSVNamespaces,
}

informersByNamespace := map[string]*plugins.Informers{}
// Set up syncing for namespace-scoped resources
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
for _, namespace := range config.watchedNamespaces {
informersByNamespace[namespace] = &plugins.Informers{}
// Wire CSVs
csvInformer := externalversions.NewSharedInformerFactoryWithOptions(
op.client,
Expand All @@ -171,6 +178,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
options.LabelSelector = fmt.Sprintf("!%s", v1alpha1.CopiedLabelKey)
}),
).Operators().V1alpha1().ClusterServiceVersions()
informersByNamespace[namespace].CSVInformer = csvInformer
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
csvQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/csv", namespace))
op.csvQueueSet.Set(namespace, csvQueue)
Expand Down Expand Up @@ -225,6 +233,8 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
},
).Informer()
op.copiedCSVLister = metadatalister.New(copiedCSVInformer.GetIndexer(), gvr)
informersByNamespace[namespace].CopiedCSVInformer = copiedCSVInformer
informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister

// Register separate queue for gcing copied csvs
copiedCSVGCQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/csv-gc", namespace))
Expand All @@ -247,6 +257,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
// Wire OperatorGroup reconciliation
extInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, config.resyncPeriod(), externalversions.WithNamespace(namespace))
operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups()
informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer
op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/og", namespace))
op.ogQueueSet.Set(namespace, ogQueue)
Expand All @@ -266,6 +277,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register OperatorCondition QueueInformer
opConditionInformer := extInformerFactory.Operators().V2().OperatorConditions()
informersByNamespace[namespace].OperatorConditionInformer = opConditionInformer
op.lister.OperatorsV2().RegisterOperatorConditionLister(namespace, opConditionInformer.Lister())
opConditionQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -281,6 +293,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

subInformer := extInformerFactory.Operators().V1alpha1().Subscriptions()
informersByNamespace[namespace].SubscriptionInformer = subInformer
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister())
subQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -298,6 +311,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
// Wire Deployments
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace))
depInformer := k8sInformerFactory.Apps().V1().Deployments()
informersByNamespace[namespace].DeploymentInformer = depInformer
op.lister.AppsV1().RegisterDeploymentLister(namespace, depInformer.Lister())
depQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -314,6 +328,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Set up RBAC informers
roleInformer := k8sInformerFactory.Rbac().V1().Roles()
informersByNamespace[namespace].RoleInformer = roleInformer
op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister())
roleQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -329,6 +344,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings()
informersByNamespace[namespace].RoleBindingInformer = roleBindingInformer
op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister())
roleBindingQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -347,6 +363,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
secretInformer := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace), informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromValidatedSet(map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
})).Core().V1().Secrets()
informersByNamespace[namespace].SecretInformer = secretInformer
op.lister.CoreV1().RegisterSecretLister(namespace, secretInformer.Lister())
secretQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -363,6 +380,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register Service QueueInformer
serviceInformer := k8sInformerFactory.Core().V1().Services()
informersByNamespace[namespace].ServiceInformer = serviceInformer
op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister())
serviceQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -379,6 +397,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register ServiceAccount QueueInformer
serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts()
informersByNamespace[namespace].ServiceAccountInformer = serviceAccountInformer
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
serviceAccountQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand Down Expand Up @@ -429,13 +448,14 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
olmConfigInformer := externalversions.NewSharedInformerFactoryWithOptions(
op.client,
config.resyncPeriod(),
).Operators().V1().OLMConfigs().Informer()
).Operators().V1().OLMConfigs()
informersByNamespace[metav1.NamespaceAll].OLMConfigInformer = olmConfigInformer
olmConfigQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithInformer(olmConfigInformer),
queueinformer.WithInformer(olmConfigInformer.Informer()),
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(op.olmConfigQueue),
queueinformer.WithIndexer(olmConfigInformer.GetIndexer()),
queueinformer.WithIndexer(olmConfigInformer.Informer().GetIndexer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOLMConfig).ToSyncer()),
)
if err != nil {
Expand All @@ -447,6 +467,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod())
clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles()
informersByNamespace[metav1.NamespaceAll].ClusterRoleInformer = clusterRoleInformer
op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister())
clusterRoleQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -462,6 +483,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

clusterRoleBindingInformer := k8sInformerFactory.Rbac().V1().ClusterRoleBindings()
informersByNamespace[metav1.NamespaceAll].ClusterRoleBindingInformer = clusterRoleBindingInformer
op.lister.RbacV1().RegisterClusterRoleBindingLister(clusterRoleBindingInformer.Lister())
clusterRoleBindingQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -478,6 +500,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// register namespace queueinformer
namespaceInformer := k8sInformerFactory.Core().V1().Namespaces()
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsQueueSet = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver")
namespaceInformer.Informer().AddEventHandler(
Expand All @@ -502,6 +525,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register APIService QueueInformer
apiServiceInformer := kagg.NewSharedInformerFactory(op.opClient.ApiregistrationV1Interface(), config.resyncPeriod()).Apiregistration().V1().APIServices()
informersByNamespace[metav1.NamespaceAll].APIServiceInformer = apiServiceInformer
op.lister.APIRegistrationV1().RegisterAPIServiceLister(apiServiceInformer.Lister())
apiServiceQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -519,6 +543,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), config.resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
informersByNamespace[metav1.NamespaceAll].CRDInformer = crdInformer
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand Down Expand Up @@ -572,6 +597,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
op.resolver = &install.StrategyResolver{
OverridesBuilderFunc: overridesBuilderFunc.GetDeploymentInitializer,
}
op.informersByNamespace = informersByNamespace

// initialize plugins
for _, makePlugIn := range operatorPlugInFactoryFuncs {
Expand Down
33 changes: 33 additions & 0 deletions pkg/controller/operators/olm/plugins/operator_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,48 @@ import (
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
operatorsv1informers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions/operators/v1"
operatorsv1alpha1informers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions/operators/v1alpha1"
operatorsv2informers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions/operators/v2"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
"github.com/sirupsen/logrus"
extensionsv1informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
appsv1informers "k8s.io/client-go/informers/apps/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
rbacv1informers "k8s.io/client-go/informers/rbac/v1"
"k8s.io/client-go/metadata/metadatalister"
"k8s.io/client-go/tools/cache"
apiregistrationv1informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
)

// HostOperator is an extensible and observable operator that hosts the plug-in, i.e. which the plug-in is extending
type HostOperator interface {
queueinformer.ObservableOperator
queueinformer.ExtensibleOperator
Informers() map[string]*Informers
}

// Informers exposes informer caches that the host operator has already started, for re-use by plugins.
type Informers struct {
CSVInformer operatorsv1alpha1informers.ClusterServiceVersionInformer
CopiedCSVInformer cache.SharedIndexInformer
CopiedCSVLister metadatalister.Lister
OperatorGroupInformer operatorsv1informers.OperatorGroupInformer
OperatorConditionInformer operatorsv2informers.OperatorConditionInformer
SubscriptionInformer operatorsv1alpha1informers.SubscriptionInformer
DeploymentInformer appsv1informers.DeploymentInformer
RoleInformer rbacv1informers.RoleInformer
RoleBindingInformer rbacv1informers.RoleBindingInformer
SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer
ServiceAccountInformer corev1informers.ServiceAccountInformer
OLMConfigInformer operatorsv1informers.OLMConfigInformer
ClusterRoleInformer rbacv1informers.ClusterRoleInformer
ClusterRoleBindingInformer rbacv1informers.ClusterRoleBindingInformer
NamespaceInformer corev1informers.NamespaceInformer
APIServiceInformer apiregistrationv1informers.APIServiceInformer
CRDInformer extensionsv1informers.CustomResourceDefinitionInformer
}

// OperatorConfig gives access to required configuration from the host operator
Expand Down

0 comments on commit a7f102a

Please sign in to comment.