Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Iter
Browse files Browse the repository at this point in the history
ajanikow committed Oct 15, 2024

Unverified

This user has not yet uploaded their public signing key.
1 parent c3c90fe commit 9376d21
Showing 5 changed files with 55 additions and 60 deletions.
8 changes: 5 additions & 3 deletions pkg/operatorV2/operator.go
Original file line number Diff line number Diff line change
@@ -63,7 +63,9 @@ func NewOperator(name, namespace, image string) Operator {
name: name,
namespace: namespace,
image: image,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[operation.Item](), workqueue.TypedRateLimitingQueueConfig[operation.Item]{
Name: name,
}),
}

// Declaration of prometheus interface
@@ -85,7 +87,7 @@ type operator struct {
starters []Starter
handlers []Handler

workqueue workqueue.RateLimitingInterface
workqueue workqueue.TypedRateLimitingInterface[operation.Item]

// Implement prometheus collector
*prometheusMetrics
@@ -155,7 +157,7 @@ func (o *operator) RegisterStarter(starter Starter) error {
}

func (o *operator) EnqueueItem(item operation.Item) {
o.workqueue.Add(item.String())
o.workqueue.Add(item)
}

func (o *operator) RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string, filters ...InformerFilter) error {
27 changes: 7 additions & 20 deletions pkg/operatorV2/operator_worker.go
Original file line number Diff line number Diff line change
@@ -78,27 +78,14 @@ func (o *operator) processNextItem() bool {
return true
}

func (o *operator) processObject(obj interface{}) error {
defer o.workqueue.Done(obj)
var item operation.Item
var key string
var ok bool
func (o *operator) processObject(item operation.Item) error {
defer o.workqueue.Done(item)
var err error

if key, ok = obj.(string); !ok {
o.workqueue.Forget(obj)
return nil
}

if item, err = operation.NewItemFromString(key); err != nil {
o.workqueue.Forget(obj)
return nil
}

if item.Operation != operation.Update {
o.workqueue.Forget(item)
item.Operation = operation.Update
o.workqueue.Forget(obj)
o.workqueue.Add(item.String())
o.workqueue.Add(item)
return nil
}

@@ -113,10 +100,10 @@ func (o *operator) processObject(obj interface{}) error {
item.Name)

if err = o.processItem(item); err != nil {
o.workqueue.AddRateLimited(key)
o.workqueue.AddRateLimited(item)

if !IsReconcile(err) {
message := fmt.Sprintf("error syncing '%s': %s, re-queuing", key, err.Error())
message := fmt.Sprintf("error syncing '%s': %s, re-queuing", item.String(), err.Error())
loggerWorker.Debug(message)
return errors.Errorf(message)
}
@@ -132,7 +119,7 @@ func (o *operator) processObject(obj interface{}) error {
item.Namespace,
item.Name)

o.workqueue.Forget(obj)
o.workqueue.Forget(item)
return nil
}

14 changes: 7 additions & 7 deletions pkg/util/grpc.go
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ import (
const AuthorizationGRPCHeader = "adb-authorization"

func NewGRPCClient[T any](ctx context.Context, in func(cc grpc.ClientConnInterface) T, addr string, opts ...grpc.DialOption) (T, io.Closer, error) {
con, err := NewGRPCConn(ctx, addr, opts...)
con, err := NewGRPCConn(addr, opts...)
if err != nil {
return Default[T](), nil, err
}
@@ -65,7 +65,7 @@ func NewOptionalTLSGRPCConn(ctx context.Context, addr string, tls *tls.Config, o
copy(newOpts[len(opts):], tlsOpts)

// Create conn
conn, err := newGRPCConn(ctx, addr, tlsOpts...)
conn, err := newGRPCConn(addr, tlsOpts...)
if err != nil {
return nil, err
}
@@ -91,26 +91,26 @@ func NewOptionalTLSGRPCConn(ctx context.Context, addr string, tls *tls.Config, o
}
}

return newGRPCConn(ctx, addr, opts...)
return newGRPCConn(addr, opts...)
}

func newGRPCConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
func newGRPCConn(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
var z []grpc.DialOption

z = append(z, grpc.WithTransportCredentials(insecure.NewCredentials()))

z = append(z, opts...)

conn, err := grpc.DialContext(ctx, addr, z...)
conn, err := grpc.NewClient(addr, z...)
if err != nil {
return nil, err
}

return conn, nil
}

func NewGRPCConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return newGRPCConn(ctx, addr, opts...)
func NewGRPCConn(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return newGRPCConn(addr, opts...)
}

func ClientTLS(config *tls.Config) []grpc.DialOption {
62 changes: 34 additions & 28 deletions pkg/util/k8sutil/informer.go
Original file line number Diff line number Diff line change
@@ -50,38 +50,44 @@ func NewResourceWatcher(getter cache.Getter, resource, namespace string,
namespace,
fields.Everything())

_, informer := cache.NewIndexerInformer(source, objType, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
defer func() {
if err := recover(); err != nil {
informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack()))
_, informer := cache.NewInformerWithOptions(cache.InformerOptions{
ListerWatcher: source,
ObjectType: objType,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
defer func() {
if err := recover(); err != nil {
informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack()))
}
}()
if h.AddFunc != nil {
h.AddFunc(obj)
}
}()
if h.AddFunc != nil {
h.AddFunc(obj)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
defer func() {
if err := recover(); err != nil {
informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack()))
},
UpdateFunc: func(oldObj, newObj interface{}) {
defer func() {
if err := recover(); err != nil {
informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack()))
}
}()
if h.UpdateFunc != nil {
h.UpdateFunc(oldObj, newObj)
}
}()
if h.UpdateFunc != nil {
h.UpdateFunc(oldObj, newObj)
}
},
DeleteFunc: func(obj interface{}) {
defer func() {
if err := recover(); err != nil {
informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack()))
},
DeleteFunc: func(obj interface{}) {
defer func() {
if err := recover(); err != nil {
informerLogger.Interface("error", err).Error("Recovered from panic. Stack trace:", string(debug.Stack()))
}
}()
if h.DeleteFunc != nil {
h.DeleteFunc(obj)
}
}()
if h.DeleteFunc != nil {
h.DeleteFunc(obj)
}
},
},
}, cache.Indexers{})
ResyncPeriod: 0,
Indexers: cache.Indexers{},
})

return &ResourceWatcher{
informer: informer,
4 changes: 2 additions & 2 deletions pkg/util/svc/service_test.go
Original file line number Diff line number Diff line change
@@ -46,12 +46,12 @@ func Test_Service(t *testing.T) {

othStart := other.StartWithHealth(ctx, h)

healthConn, err := grpc.DialContext(ctx, st.Address(), grpc.WithTransportCredentials(insecure.NewCredentials()))
healthConn, err := grpc.NewClient(st.Address(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

defer healthConn.Close()

otherConn, err := grpc.DialContext(ctx, othStart.Address(), grpc.WithTransportCredentials(insecure.NewCredentials()))
otherConn, err := grpc.NewClient(othStart.Address(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

defer otherConn.Close()

0 comments on commit 9376d21

Please sign in to comment.