diff --git a/pkg/operatorV2/operator.go b/pkg/operatorV2/operator.go index a192a69f9..6d3ce483c 100644 --- a/pkg/operatorV2/operator.go +++ b/pkg/operatorV2/operator.go @@ -63,7 +63,9 @@ func NewOperator(name, namespace, image string) Operator { name: name, namespace: namespace, image: image, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[operation.Item](), workqueue.TypedRateLimitingQueueConfig[operation.Item]{ + Name: name, + }), } // Declaration of prometheus interface @@ -85,7 +87,7 @@ type operator struct { starters []Starter handlers []Handler - workqueue workqueue.RateLimitingInterface + workqueue workqueue.TypedRateLimitingInterface[operation.Item] // Implement prometheus collector *prometheusMetrics @@ -155,7 +157,7 @@ func (o *operator) RegisterStarter(starter Starter) error { } func (o *operator) EnqueueItem(item operation.Item) { - o.workqueue.Add(item.String()) + o.workqueue.Add(item) } func (o *operator) RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string, filters ...InformerFilter) error { diff --git a/pkg/operatorV2/operator_worker.go b/pkg/operatorV2/operator_worker.go index 620004bb7..b568ab881 100644 --- a/pkg/operatorV2/operator_worker.go +++ b/pkg/operatorV2/operator_worker.go @@ -78,27 +78,14 @@ func (o *operator) processNextItem() bool { return true } -func (o *operator) processObject(obj interface{}) error { - defer o.workqueue.Done(obj) - var item operation.Item - var key string - var ok bool +func (o *operator) processObject(item operation.Item) error { + defer o.workqueue.Done(item) var err error - if key, ok = obj.(string); !ok { - o.workqueue.Forget(obj) - return nil - } - - if item, err = operation.NewItemFromString(key); err != nil { - o.workqueue.Forget(obj) - return nil - } - if item.Operation != operation.Update { + o.workqueue.Forget(item) item.Operation = operation.Update - o.workqueue.Forget(obj) - o.workqueue.Add(item.String()) + o.workqueue.Add(item) return nil } @@ -113,10 +100,10 @@ func (o *operator) processObject(obj interface{}) error { item.Name) if err = o.processItem(item); err != nil { - o.workqueue.AddRateLimited(key) + o.workqueue.AddRateLimited(item) if !IsReconcile(err) { - message := fmt.Sprintf("error syncing '%s': %s, re-queuing", key, err.Error()) + message := fmt.Sprintf("error syncing '%s': %s, re-queuing", item.String(), err.Error()) loggerWorker.Debug(message) return errors.Errorf(message) } @@ -132,7 +119,7 @@ func (o *operator) processObject(obj interface{}) error { item.Namespace, item.Name) - o.workqueue.Forget(obj) + o.workqueue.Forget(item) return nil } diff --git a/pkg/util/grpc.go b/pkg/util/grpc.go index 2ed559baf..254887b16 100644 --- a/pkg/util/grpc.go +++ b/pkg/util/grpc.go @@ -39,7 +39,7 @@ import ( const AuthorizationGRPCHeader = "adb-authorization" func NewGRPCClient[T any](ctx context.Context, in func(cc grpc.ClientConnInterface) T, addr string, opts ...grpc.DialOption) (T, io.Closer, error) { - con, err := NewGRPCConn(ctx, addr, opts...) + con, err := NewGRPCConn(addr, opts...) if err != nil { return Default[T](), nil, err } @@ -65,7 +65,7 @@ func NewOptionalTLSGRPCConn(ctx context.Context, addr string, tls *tls.Config, o copy(newOpts[len(opts):], tlsOpts) // Create conn - conn, err := newGRPCConn(ctx, addr, tlsOpts...) + conn, err := newGRPCConn(addr, tlsOpts...) if err != nil { return nil, err } @@ -91,17 +91,17 @@ func NewOptionalTLSGRPCConn(ctx context.Context, addr string, tls *tls.Config, o } } - return newGRPCConn(ctx, addr, opts...) + return newGRPCConn(addr, opts...) } -func newGRPCConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func newGRPCConn(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { var z []grpc.DialOption z = append(z, grpc.WithTransportCredentials(insecure.NewCredentials())) z = append(z, opts...) - conn, err := grpc.DialContext(ctx, addr, z...) + conn, err := grpc.NewClient(addr, z...) if err != nil { return nil, err } @@ -109,8 +109,8 @@ func newGRPCConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*gr return conn, nil } -func NewGRPCConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - return newGRPCConn(ctx, addr, opts...) +func NewGRPCConn(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return newGRPCConn(addr, opts...) } func ClientTLS(config *tls.Config) []grpc.DialOption { diff --git a/pkg/util/k8sutil/informer.go b/pkg/util/k8sutil/informer.go index d73291cf4..65956ee3d 100644 --- a/pkg/util/k8sutil/informer.go +++ b/pkg/util/k8sutil/informer.go @@ -50,38 +50,44 @@ func NewResourceWatcher(getter cache.Getter, resource, namespace string, namespace, fields.Everything()) - _, informer := cache.NewIndexerInformer(source, objType, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - defer func() { - if err := recover(); err != nil { - informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack())) + _, informer := cache.NewInformerWithOptions(cache.InformerOptions{ + ListerWatcher: source, + ObjectType: objType, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + defer func() { + if err := recover(); err != nil { + informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack())) + } + }() + if h.AddFunc != nil { + h.AddFunc(obj) } - }() - if h.AddFunc != nil { - h.AddFunc(obj) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - defer func() { - if err := recover(); err != nil { - informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack())) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + defer func() { + if err := recover(); err != nil { + informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack())) + } + }() + if h.UpdateFunc != nil { + h.UpdateFunc(oldObj, newObj) } - }() - if h.UpdateFunc != nil { - h.UpdateFunc(oldObj, newObj) - } - }, - DeleteFunc: func(obj interface{}) { - defer func() { - if err := recover(); err != nil { - informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack())) + }, + DeleteFunc: func(obj interface{}) { + defer func() { + if err := recover(); err != nil { + informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack())) + } + }() + if h.DeleteFunc != nil { + h.DeleteFunc(obj) } - }() - if h.DeleteFunc != nil { - h.DeleteFunc(obj) - } + }, }, - }, cache.Indexers{}) + ResyncPeriod: 0, + Indexers: cache.Indexers{}, + }) return &ResourceWatcher{ informer: informer, diff --git a/pkg/util/svc/service_test.go b/pkg/util/svc/service_test.go index dea43dbe0..9ec97a12b 100644 --- a/pkg/util/svc/service_test.go +++ b/pkg/util/svc/service_test.go @@ -46,12 +46,12 @@ func Test_Service(t *testing.T) { othStart := other.StartWithHealth(ctx, h) - healthConn, err := grpc.DialContext(ctx, st.Address(), grpc.WithTransportCredentials(insecure.NewCredentials())) + healthConn, err := grpc.NewClient(st.Address(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer healthConn.Close() - otherConn, err := grpc.DialContext(ctx, othStart.Address(), grpc.WithTransportCredentials(insecure.NewCredentials())) + otherConn, err := grpc.NewClient(othStart.Address(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer otherConn.Close()