diff --git a/kubernetes/eventhandler.go b/kubernetes/eventhandler.go index 70abfefb5..bb5f448c5 100644 --- a/kubernetes/eventhandler.go +++ b/kubernetes/eventhandler.go @@ -20,6 +20,8 @@ package kubernetes import ( "reflect" "sync" + + "k8s.io/apimachinery/pkg/runtime" ) // ResourceEventHandler can handle notifications for events that happen to a @@ -139,7 +141,7 @@ type podUpdaterStore interface { // UpdateWatcher is the interface that an object needs to implement to be // able to use DeltaObject cache event function. type UpdateWatcher interface { - Deltaobjects() Delta + Oldobject() runtime.Object } // namespacePodUpdater notifies updates on pods when their namespaces are updated. @@ -176,11 +178,8 @@ func (n *namespacePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - - // deltaobjects includes the old and new version of caching object that changes in the current update event. - // We compare the cached old version of object with the new one that triggers the update - deltaobjects := n.namespacewatcher.Deltaobjects() - cachednamespaceold, ok := deltaobjects.old.(*Namespace) + oldobject := n.namespacewatcher.Oldobject() + cachednamespaceold, ok := oldobject.(*Namespace) if ok && ns.Name == cachednamespaceold.Name { labelscheck := isEqualMetadata(ns.ObjectMeta.Labels, cachednamespaceold.ObjectMeta.Labels) @@ -240,11 +239,8 @@ func (n *nodePodUpdater) OnUpdate(obj interface{}) { n.locker.Lock() defer n.locker.Unlock() } - - // deltaobjects includes the old and new version of caching object that changes in the current update event. - // We compare the cached old version of object with the new one that triggers the update - deltaobjects := n.nodewatcher.Deltaobjects() - cachednodeold, ok := deltaobjects.old.(*Node) + oldobject := n.nodewatcher.Oldobject() + cachednodeold, ok := oldobject.(*Node) if ok && node.Name == cachednodeold.Name { labelscheck := isEqualMetadata(node.ObjectMeta.Labels, cachednodeold.ObjectMeta.Labels) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index 986a87c63..5ae6be82a 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -60,8 +60,8 @@ type Watcher interface { // Client returns the kubernetes client object used by the watcher Client() kubernetes.Interface - // Deltaobjects returns the objects struct that change during the last updated event - Deltaobjects() Delta + // Oldobject returns the old object before change during the last updated event + Oldobject() runtime.Object } // WatchOptions controls watch behaviors @@ -85,21 +85,16 @@ type item struct { state string } -type Delta struct { - old runtime.Object - new runtime.Object -} - type watcher struct { - client kubernetes.Interface - informer cache.SharedInformer - store cache.Store - queue workqueue.Interface - ctx context.Context - stop context.CancelFunc - handler ResourceEventHandler - logger *logp.Logger - delta Delta + client kubernetes.Interface + informer cache.SharedInformer + store cache.Store + queue workqueue.Interface + ctx context.Context + stop context.CancelFunc + handler ResourceEventHandler + logger *logp.Logger + oldobject runtime.Object } // NewWatcher initializes the watcher client to provide a events handler for @@ -115,7 +110,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store var queue workqueue.Interface - var delta Delta + var oldobject runtime.Object informer, _, err := NewInformer(client, resource, opts, indexers) if err != nil { return nil, err @@ -136,15 +131,15 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource ctx, cancel := context.WithCancel(context.TODO()) w := &watcher{ - client: client, - informer: informer, - store: store, - queue: queue, - ctx: ctx, - delta: delta, - stop: cancel, - logger: logp.NewLogger("kubernetes"), - handler: NoOpEventHandlerFuncs{}, + client: client, + informer: informer, + store: store, + queue: queue, + ctx: ctx, + oldobject: oldobject, + stop: cancel, + logger: logp.NewLogger("kubernetes"), + handler: NoOpEventHandlerFuncs{}, } w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -167,7 +162,7 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } - w.deltaobjects(o, n) + w.oldobjectreturn(o) }, }) @@ -190,9 +185,9 @@ func (w *watcher) Client() kubernetes.Interface { return w.client } -// Deltaobjects returns the objects struct that change during the last updated event -func (w *watcher) Deltaobjects() Delta { - return w.delta +// Oldbject returns the old object in cache during the last updated event +func (w *watcher) Oldobject() runtime.Object { + return w.oldobject } // Start watching pods @@ -234,10 +229,9 @@ func (w *watcher) enqueue(obj interface{}, state string) { w.queue.Add(&item{key, obj, state}) } -// deltaobjects updates the delta struct with the old and the new version of cache objects that are ready to change on update events -func (w *watcher) deltaobjects(o interface{}, n interface{}) { - w.delta.old = o.(runtime.Object) - w.delta.new = n.(runtime.Object) +// oldobjectreturn returns the old version of cache objects before change on update events +func (w *watcher) oldobjectreturn(o interface{}) { + w.oldobject = o.(runtime.Object) } // process gets the top of the work queue and processes the object that is received.