From 8556023d3e9b3a600ad9a1fa46d844dd80c0a41c Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Thu, 1 Feb 2024 14:21:39 +0200 Subject: [PATCH] Add multiple handlers functionality in watchers --- kubernetes/watcher.go | 63 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/kubernetes/watcher.go b/kubernetes/watcher.go index c18a6f468..ad8eb29bf 100644 --- a/kubernetes/watcher.go +++ b/kubernetes/watcher.go @@ -54,6 +54,10 @@ type Watcher interface { // AddEventHandler add event handlers for corresponding event type watched AddEventHandler(ResourceEventHandler) + GetEventHandlers() map[string]ResourceEventHandler + + AppendToEventHandlers(string, ResourceEventHandler) + // Store returns the store object for the watcher Store() cache.Store @@ -86,15 +90,16 @@ type item struct { } type watcher struct { - client kubernetes.Interface - informer cache.SharedInformer - store cache.Store - queue workqueue.Interface - ctx context.Context - stop context.CancelFunc - handler ResourceEventHandler - logger *logp.Logger - cachedObject runtime.Object + client kubernetes.Interface + informer cache.SharedInformer + store cache.Store + queue workqueue.Interface + ctx context.Context + stop context.CancelFunc + handler ResourceEventHandler + multiplehandlers map[string]ResourceEventHandler + logger *logp.Logger + cachedObject runtime.Object } // NewWatcher initializes the watcher client to provide a events handler for @@ -118,7 +123,8 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource store = informer.GetStore() queue = workqueue.NewNamed(name) - + logger := logp.NewLogger("test") + logger.Infof("New Watcher created by %v", resource) if opts.IsUpdated == nil { opts.IsUpdated = func(o, n interface{}) bool { old, _ := accessor.ResourceVersion(o.(runtime.Object)) @@ -181,6 +187,21 @@ func (w *watcher) AddEventHandler(h ResourceEventHandler) { w.handler = h } +// AppendToEventHandlers adds a resource handler to process each request that is coming into the watcher +func (w *watcher) AppendToEventHandlers(m string, h ResourceEventHandler) { + if w.multiplehandlers == nil { + w.multiplehandlers = map[string]ResourceEventHandler{} + } + logger := logp.NewLogger("test") + logger.Infof("AppendToEventHandlers BY %s and func %+v", m, h) + w.multiplehandlers[m] = h +} + +// AddEventHandler adds a resource handler to process each request that is coming into the watcher +func (w *watcher) GetEventHandlers() map[string]ResourceEventHandler { + return w.multiplehandlers +} + // Store returns the store object for the resource that is being watched func (w *watcher) Store() cache.Store { return w.store @@ -199,7 +220,8 @@ func (w *watcher) CachedObject() runtime.Object { // Start watching pods func (w *watcher) Start() error { go w.informer.Run(w.ctx.Done()) - + logger := logp.NewLogger("test") + logger.Infof("Watcher starting") if !cache.WaitForCacheSync(w.ctx.Done(), w.informer.HasSynced) { return fmt.Errorf("kubernetes informer unable to sync cache") } @@ -274,6 +296,11 @@ func (w *watcher) process(_ context.Context) bool { w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) // delete anyway in order to clean states w.handler.OnDelete(entry.objectRaw) + if w.multiplehandlers != nil { + for _, handler := range w.multiplehandlers { + handler.OnDelete(entry.objectRaw) + } + } } return true } @@ -286,6 +313,20 @@ func (w *watcher) process(_ context.Context) bool { case delete: w.handler.OnDelete(o) } + w.logger.Infof("handler is %+v and multiplehandlers is %+v", w.handler, w.multiplehandlers) + if w.multiplehandlers != nil { + for m, handler := range w.multiplehandlers { + switch entry.state { + case add: + w.logger.Infof("ONADD OF %s", m) + handler.OnAdd(o) + case update: + handler.OnUpdate(o) + case delete: + handler.OnDelete(o) + } + } + } return true }