diff --git a/cmd/catalog/main.go b/cmd/catalog/main.go index 08fdde6a82..24c9e01a71 100644 --- a/cmd/catalog/main.go +++ b/cmd/catalog/main.go @@ -95,8 +95,7 @@ func main() { logger := log.New() if *debug { - // TODO: change back to debug level - logger.SetLevel(log.TraceLevel) + logger.SetLevel(log.DebugLevel) } logger.Infof("log level %s", logger.Level) diff --git a/cmd/olm/main.go b/cmd/olm/main.go index 6a1c93d794..9b22e1d5a1 100644 --- a/cmd/olm/main.go +++ b/cmd/olm/main.go @@ -91,8 +91,7 @@ func main() { // Set log level to debug if `debug` flag set logger := log.New() if *debug { - // TODO: Switch back to debug level - logger.SetLevel(log.TraceLevel) + logger.SetLevel(log.DebugLevel) } logger.Infof("log level %s", logger.Level) diff --git a/pkg/api/apis/operators/register.go b/pkg/api/apis/operators/register.go index 9f2130332e..784b040264 100644 --- a/pkg/api/apis/operators/register.go +++ b/pkg/api/apis/operators/register.go @@ -48,7 +48,3 @@ func addKnownTypes(scheme *runtime.Scheme) error { ) return nil } - -func init() { - -} diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 02d93c1c16..7afa9369b2 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -88,7 +88,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Create a new queueinformer-based operator. opClient := operatorclient.NewClientFromConfig(kubeconfigPath, logger) - queueOperator, err := queueinformer.NewOperatorFromClient(opClient.KubernetesInterface().Discovery(), logger) + queueOperator, err := queueinformer.NewOperator(opClient.KubernetesInterface().Discovery(), queueinformer.WithOperatorLogger(logger)) if err != nil { return nil, err } @@ -550,7 +550,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { } // Trigger a resolve, will pick up any subscriptions that depend on the catalog - // o.resolveNamespace(out.GetNamespace()) o.nsResolveQueue.Add(out.GetNamespace()) return nil @@ -915,18 +914,6 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) { logger.Info("syncing") - defer func() { - // make sure to notify subscription loop of installplan changes - if owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind); len(owners) > 0 { - for _, owner := range owners { - logger.WithField("owner", owner).Debug("requeueing installplan owner") - o.subQueueSet.Requeue(plan.GetNamespace(), owner.Name) - } - } else { - logger.Trace("no installplan owner subscriptions found to requeue") - } - }() - if len(plan.Status.Plan) == 0 { logger.Info("skip processing installplan without status - subscription sync responsible for initial status") return @@ -943,6 +930,18 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) { return } + defer func() { + // Notify subscription loop of installplan changes + if owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind); len(owners) > 0 { + for _, owner := range owners { + logger.WithField("owner", owner).Debug("requeueing installplan owner") + o.subQueueSet.Requeue(plan.GetNamespace(), owner.Name) + } + } else { + logger.Trace("no installplan owner subscriptions found to requeue") + } + }() + // Update InstallPlan with status of transition. Log errors if we can't write them to the status. if _, err := o.client.OperatorsV1alpha1().InstallPlans(plan.GetNamespace()).UpdateStatus(outInstallPlan); err != nil { logger = logger.WithField("updateError", err.Error()) diff --git a/pkg/controller/operators/catalog/operator_test.go b/pkg/controller/operators/catalog/operator_test.go index 74756a5263..1ca39b8207 100644 --- a/pkg/controller/operators/catalog/operator_test.go +++ b/pkg/controller/operators/catalog/operator_test.go @@ -664,7 +664,7 @@ func NewFakeOperator(ctx context.Context, namespace string, watchedNamespaces [] } // Create the new operator - queueOperator, err := queueinformer.NewOperatorFromClient(opClientFake.KubernetesInterface().Discovery(), logrus.New()) + queueOperator, err := queueinformer.NewOperator(opClientFake.KubernetesInterface().Discovery()) for _, informer := range sharedInformers { queueOperator.RegisterInformer(informer) } diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index 2219ed26b9..c22583e46a 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -83,18 +83,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat return nil, err } - // // Create a new client for OLM types (CRs) - // crClient, err := client.NewClient(kubeconfigPath) - // if err != nil { - // return nil, err - // } - - // internalClient, err := client.NewInternalClient(kubeconfigPath) - // if err != nil { - // return nil, err - // } - - queueOperator, err := queueinformer.NewOperatorFromClient(config.operatorClient.KubernetesInterface().Discovery(), config.logger) + queueOperator, err := queueinformer.NewOperator(config.operatorClient.KubernetesInterface().Discovery(), queueinformer.WithOperatorLogger(config.logger)) if err != nil { return nil, err } diff --git a/pkg/controller/operators/olm/requirements.go b/pkg/controller/operators/olm/requirements.go index abd96c2244..c0d2156f53 100644 --- a/pkg/controller/operators/olm/requirements.go +++ b/pkg/controller/operators/olm/requirements.go @@ -331,7 +331,7 @@ func (a *Operator) permissionStatus(strategyDetailsDeployment *install.StrategyD statuses := []v1alpha1.RequirementStatus{} for key, status := range statusesSet { - a.logger.WithField("key", key).WithField("status", status).Debugf("appending permission status") + a.logger.WithField("key", key).WithField("status", status).Tracef("appending permission status") statuses = append(statuses, status) } diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index a00aa894af..e766119155 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -3,6 +3,7 @@ package queueinformer import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "k8s.io/client-go/discovery" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -140,3 +141,76 @@ func WithSyncer(syncer kubestate.Syncer) Option { config.syncer = syncer } } + +type operatorConfig struct { + discovery discovery.DiscoveryInterface + queueInformers []*QueueInformer + informers []cache.SharedIndexInformer + logger *logrus.Logger + numWorkers int +} + +type OperatorOption func(*operatorConfig) + +// apply sequentially applies the given options to the config. +func (c *operatorConfig) apply(options []OperatorOption) { + for _, option := range options { + option(c) + } +} + +func newInvalidOperatorConfigError(msg string) error { + return errors.Errorf("invalid queue informer operator config: %s", msg) +} + +// WithOperatorLogger sets the logger used by an Operator. +func WithOperatorLogger(logger *logrus.Logger) OperatorOption { + return func(config *operatorConfig) { + config.logger = logger + } +} + +// WithQueueInformers registers a set of initial QueueInformers with an Operator. +// If the QueueInformer is configured with a SharedIndexInformer, that SharedIndexInformer +// is registered with the Operator automatically. +func WithQueueInformers(queueInformers ...*QueueInformer) OperatorOption { + return func(config *operatorConfig) { + config.queueInformers = queueInformers + } +} + +// WithQueueInformers registers a set of initial Informers with an Operator. +func WithInformers(informers ...cache.SharedIndexInformer) OperatorOption { + return func(config *operatorConfig) { + config.informers = informers + } +} + +// WithNumWorkers sets the number of workers an Operator uses to process each queue. +// It translates directly to the number of queue items processed in parallel for a given queue. +// Specifying zero or less workers is an invariant and will cause an error upon configuration. +// Specifying one worker indicates that each queue will only have one item processed at a time. +func WithNumWorkers(numWorkers int) OperatorOption { + return func(config *operatorConfig) { + config.numWorkers = numWorkers + } +} + +// validate returns an error if the config isn't valid. +func (c *operatorConfig) validate() (err error) { + switch config := c; { + case config.discovery == nil: + err = newInvalidOperatorConfigError("discovery client nil") + case config.numWorkers < 1: + err = newInvalidOperatorConfigError("must specify at least one worker per queue") + } + + return +} + +func defaultOperatorConfig() *operatorConfig { + return &operatorConfig{ + logger: logrus.New(), + numWorkers: 2, + } +} diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index 7dcfe2ae79..66c271d7ef 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -51,6 +51,7 @@ type operator struct { informers []cache.SharedIndexInformer hasSynced cache.InformerSynced mu sync.RWMutex + numWorkers int runInformersOnce sync.Once reconcileOnce sync.Once logger *logrus.Logger @@ -206,8 +207,9 @@ func (o *operator) run(ctx context.Context) { o.logger.Info("starting workers...") for _, queueInformer := range o.queueInformers { - go o.worker(ctx, queueInformer) - go o.worker(ctx, queueInformer) + for w := 0; w < o.numWorkers; w++ { + go o.worker(ctx, queueInformer) + } } close(o.ready) @@ -249,7 +251,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) resource, exists, err := loop.indexer.GetByKey(key) if err != nil { logger.WithError(err).Error("cache get failed") - // queue.Forget(item) + queue.Forget(item) return true } if !exists { @@ -283,18 +285,41 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) return true } -// NewOperatorFromClient returns a new Operator configured to manage the cluster with the given discovery client. -func NewOperatorFromClient(disc discovery.DiscoveryInterface, logger *logrus.Logger) (Operator, error) { +// NewOperator returns a new Operator configured to manage the cluster with the given discovery client. +func NewOperator(disc discovery.DiscoveryInterface, options ...OperatorOption) (Operator, error) { + config := defaultOperatorConfig() + config.discovery = disc + config.apply(options) + if err := config.validate(); err != nil { + return nil, err + } + + return newOperatorFromConfig(config) + +} + +func newOperatorFromConfig(config *operatorConfig) (Operator, error) { op := &operator{ - discovery: disc, - queueInformers: []*QueueInformer{}, - informers: []cache.SharedIndexInformer{}, - logger: logger, - ready: make(chan struct{}), - done: make(chan struct{}), - atLevel: make(chan error, 25), + discovery: config.discovery, + numWorkers: config.numWorkers, + logger: config.logger, + ready: make(chan struct{}), + done: make(chan struct{}), + atLevel: make(chan error, 25), } op.syncCh = op.atLevel + // Register QueueInformers and Informers + for _, queueInformer := range op.queueInformers { + if err := op.RegisterQueueInformer(queueInformer); err != nil { + return nil, err + } + } + for _, informer := range op.informers { + if err := op.RegisterInformer(informer); err != nil { + return nil, err + } + } + return op, nil } diff --git a/pkg/package-server/provider/registry_test.go b/pkg/package-server/provider/registry_test.go index 81946e2d80..c688c4c5e1 100644 --- a/pkg/package-server/provider/registry_test.go +++ b/pkg/package-server/provider/registry_test.go @@ -83,7 +83,7 @@ func NewFakeRegistryProvider(ctx context.Context, clientObjs []runtime.Object, k k8sClientFake := k8sfake.NewSimpleClientset(k8sObjs...) opClientFake := operatorclient.NewClient(k8sClientFake, nil, nil) - op, err := queueinformer.NewOperatorFromClient(opClientFake.KubernetesInterface().Discovery(), logrus.StandardLogger()) + op, err := queueinformer.NewOperator(opClientFake.KubernetesInterface().Discovery()) if err != nil { return nil, err } diff --git a/pkg/package-server/server/server.go b/pkg/package-server/server/server.go index c0876ae0c2..46e395ffd4 100644 --- a/pkg/package-server/server/server.go +++ b/pkg/package-server/server/server.go @@ -165,7 +165,7 @@ func (o *PackageServerOptions) Run(ctx context.Context) error { return err } - queueOperator, err := queueinformer.NewOperatorFromClient(crClient.Discovery(), log.New()) + queueOperator, err := queueinformer.NewOperator(crClient.Discovery()) if err != nil { return err }