Skip to content

Commit

Permalink
Add multiple handlers functionality in watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelKatsoulis committed Feb 1, 2024
1 parent b42be0d commit 8556023
Showing 1 changed file with 52 additions and 11 deletions.
63 changes: 52 additions & 11 deletions kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type Watcher interface {
// AddEventHandler add event handlers for corresponding event type watched
AddEventHandler(ResourceEventHandler)

GetEventHandlers() map[string]ResourceEventHandler

AppendToEventHandlers(string, ResourceEventHandler)

// Store returns the store object for the watcher
Store() cache.Store

Expand Down Expand Up @@ -86,15 +90,16 @@ type item struct {
}

type watcher struct {
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
logger *logp.Logger
cachedObject runtime.Object
client kubernetes.Interface
informer cache.SharedInformer
store cache.Store
queue workqueue.Interface
ctx context.Context
stop context.CancelFunc
handler ResourceEventHandler
multiplehandlers map[string]ResourceEventHandler
logger *logp.Logger
cachedObject runtime.Object
}

// NewWatcher initializes the watcher client to provide a events handler for
Expand All @@ -118,7 +123,8 @@ func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource

store = informer.GetStore()
queue = workqueue.NewNamed(name)

logger := logp.NewLogger("test")
logger.Infof("New Watcher created by %v", resource)
if opts.IsUpdated == nil {
opts.IsUpdated = func(o, n interface{}) bool {
old, _ := accessor.ResourceVersion(o.(runtime.Object))
Expand Down Expand Up @@ -181,6 +187,21 @@ func (w *watcher) AddEventHandler(h ResourceEventHandler) {
w.handler = h
}

// AppendToEventHandlers adds a resource handler to process each request that is coming into the watcher
func (w *watcher) AppendToEventHandlers(m string, h ResourceEventHandler) {
if w.multiplehandlers == nil {
w.multiplehandlers = map[string]ResourceEventHandler{}
}
logger := logp.NewLogger("test")
logger.Infof("AppendToEventHandlers BY %s and func %+v", m, h)
w.multiplehandlers[m] = h
}

// AddEventHandler adds a resource handler to process each request that is coming into the watcher
func (w *watcher) GetEventHandlers() map[string]ResourceEventHandler {
return w.multiplehandlers
}

// Store returns the store object for the resource that is being watched
func (w *watcher) Store() cache.Store {
return w.store
Expand All @@ -199,7 +220,8 @@ func (w *watcher) CachedObject() runtime.Object {
// Start watching pods
func (w *watcher) Start() error {
go w.informer.Run(w.ctx.Done())

logger := logp.NewLogger("test")
logger.Infof("Watcher starting")
if !cache.WaitForCacheSync(w.ctx.Done(), w.informer.HasSynced) {
return fmt.Errorf("kubernetes informer unable to sync cache")
}
Expand Down Expand Up @@ -274,6 +296,11 @@ func (w *watcher) process(_ context.Context) bool {
w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key)
// delete anyway in order to clean states
w.handler.OnDelete(entry.objectRaw)
if w.multiplehandlers != nil {
for _, handler := range w.multiplehandlers {
handler.OnDelete(entry.objectRaw)
}
}
}
return true
}
Expand All @@ -286,6 +313,20 @@ func (w *watcher) process(_ context.Context) bool {
case delete:
w.handler.OnDelete(o)
}
w.logger.Infof("handler is %+v and multiplehandlers is %+v", w.handler, w.multiplehandlers)
if w.multiplehandlers != nil {
for m, handler := range w.multiplehandlers {
switch entry.state {
case add:
w.logger.Infof("ONADD OF %s", m)
handler.OnAdd(o)
case update:
handler.OnUpdate(o)
case delete:
handler.OnDelete(o)
}
}
}

return true
}

0 comments on commit 8556023

Please sign in to comment.