Skip to content

Commit

Permalink
returning only old cached object
Browse files Browse the repository at this point in the history
  • Loading branch information
gizas committed Jan 9, 2024
1 parent e265b8e commit 6cb89b9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 45 deletions.
18 changes: 7 additions & 11 deletions kubernetes/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package kubernetes
import (
"reflect"
"sync"

"k8s.io/apimachinery/pkg/runtime"
)

// ResourceEventHandler can handle notifications for events that happen to a
Expand Down Expand Up @@ -139,7 +141,7 @@ type podUpdaterStore interface {
// UpdateWatcher is the interface that an object needs to implement to be
// able to use DeltaObject cache event function.
type UpdateWatcher interface {
Deltaobjects() Delta
Oldobject() runtime.Object
}

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
Expand Down Expand Up @@ -176,11 +178,8 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}

// deltaobjects includes the old and new version of caching object that changes in the current update event.
// We compare the cached old version of object with the new one that triggers the update
deltaobjects := n.namespacewatcher.Deltaobjects()
cachednamespaceold, ok := deltaobjects.old.(*Namespace)
oldobject := n.namespacewatcher.Oldobject()
cachednamespaceold, ok := oldobject.(*Namespace)

if ok && ns.Name == cachednamespaceold.Name {
labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels)
Expand Down Expand Up @@ -240,11 +239,8 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) {
n.locker.Lock()
defer n.locker.Unlock()
}

// deltaobjects includes the old and new version of caching object that changes in the current update event.
// We compare the cached old version of object with the new one that triggers the update
deltaobjects := n.nodewatcher.Deltaobjects()
cachednodeold, ok := deltaobjects.old.(*Node)
oldobject := n.nodewatcher.Oldobject()
cachednodeold, ok := oldobject.(*Node)

if ok && node.Name == cachednodeold.Name {
labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels)
Expand Down
62 changes: 28 additions & 34 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type Watcher interface {
// Client returns the kubernetes client object used by the watcher
Client() kubernetes.Interface

// Deltaobjects returns the objects struct that change during the last updated event
Deltaobjects() Delta
// Oldobject returns the old object before change during the last updated event
Oldobject() runtime.Object
}

// WatchOptions controls watch behaviors
Expand All @@ -85,21 +85,16 @@ type item struct {
state string
}

type Delta struct {
old runtime.Object
new runtime.Object
}

type watcher struct {
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
delta Delta
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
oldobject runtime.Object
}

// NewWatcher initializes the watcher client to provide a events handler for
Expand All @@ -115,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) {
var store cache.Store
var queue workqueue.Interface
var delta Delta
var oldobject runtime.Object
informer, _, err := NewInformer(client, resource, opts, indexers)
if err != nil {
return nil, err
Expand All @@ -136,15 +131,15 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource

ctx, cancel := context.WithCancel(context.TODO())
w := &watcher{
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
delta: delta,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
client: client,
informer: informer,
store: store,
queue: queue,
ctx: ctx,
oldobject: oldobject,
stop: cancel,
logger: logp.NewLogger("kubernetes"),
handler: NoOpEventHandlerFuncs{},
}

w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -167,7 +162,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource
// state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update.
w.enqueue(n, add)
}
w.deltaobjects(o, n)
w.oldobjectreturn(o)

},
})
Expand All @@ -190,9 +185,9 @@ func (w *watcher) Client() kubernetes.Interface {
return w.client
}

// Deltaobjects returns the objects struct that change during the last updated event
func (w *watcher) Deltaobjects() Delta {
return w.delta
// Oldbject returns the old object in cache during the last updated event
func (w *watcher) Oldobject() runtime.Object {
return w.oldobject
}

// Start watching pods
Expand Down Expand Up @@ -234,10 +229,9 @@ func (w *watcher) enqueue(obj interface{}, state string) {
w.queue.Add(&item{key, obj, state})
}

// deltaobjects updates the delta struct with the old and the new version of cache objects that are ready to change on update events
func (w *watcher) deltaobjects(o interface{}, n interface{}) {
w.delta.old = o.(runtime.Object)
w.delta.new = n.(runtime.Object)
// oldobjectreturn returns the old version of cache objects before change on update events
func (w *watcher) oldobjectreturn(o interface{}) {
w.oldobject = o.(runtime.Object)

Check failure on line 234 in kubernetes/watcher.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
}

// process gets the top of the work queue and processes the object that is received.
Expand Down

0 comments on commit 6cb89b9

Please sign in to comment.