diff --git a/pkg/events/cache.go b/pkg/events/cache.go index 006d031c1..1f493e362 100644 --- a/pkg/events/cache.go +++ b/pkg/events/cache.go @@ -1,6 +1,7 @@ package events import ( + "context" "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -94,19 +95,19 @@ func (c *cache) handleEvent(evt eventType, q workqueue.RateLimitingInterface) { }}) } -func (c *cache) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (c *cache) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { c.handleEvent(createEvent(evt), q) } -func (c *cache) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (c *cache) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { c.handleEvent(updateEvent(evt), q) } -func (c *cache) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (c *cache) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { c.handleEvent(deleteEvent(evt), q) } -func (c *cache) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (c *cache) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { c.handleEvent(genericEvent(evt), q) } diff --git a/pkg/events/watcher.go b/pkg/events/watcher.go index 4fdb51fdb..067ef41f7 100644 --- a/pkg/events/watcher.go +++ b/pkg/events/watcher.go @@ -58,7 +58,7 @@ func (w *watcher) Watch(ctx context.Context, eventHandler EventHandler, predicat } // create a source for the resource type - src := &source.Kind{Type: w.resource} + src := source.Kind(w.mgr.GetCache(), w.resource) // send watch events to the Cache if err := ctl.Watch(src, reconciler.events, predicates...); err != nil { diff --git a/pkg/handler/enqueue_static_handler.go b/pkg/handler/enqueue_static_handler.go index 1154ffd1f..e22fa2a77 100644 --- a/pkg/handler/enqueue_static_handler.go +++ b/pkg/handler/enqueue_static_handler.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "github.com/solo-io/skv2/pkg/request" skqueue "github.com/solo-io/skv2/pkg/workqueue" "k8s.io/client-go/util/workqueue" @@ -27,8 +29,11 @@ type BroadcastRequests struct { WorkQueues *skqueue.MultiClusterQueues } +// TODO: (sam-heilbron) +// How should the context be propagated? + // Create implements EventHandler -func (e *BroadcastRequests) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *BroadcastRequests) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueMultiClusterLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return @@ -37,7 +42,7 @@ func (e *BroadcastRequests) Create(evt event.CreateEvent, q workqueue.RateLimiti } // Update implements EventHandler -func (e *BroadcastRequests) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *BroadcastRequests) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { if evt.ObjectOld != nil { e.enqueueRequestsAllClusters() } else { @@ -52,7 +57,7 @@ func (e *BroadcastRequests) Update(evt event.UpdateEvent, q workqueue.RateLimiti } // Delete implements EventHandler -func (e *BroadcastRequests) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *BroadcastRequests) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueMultiClusterLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return @@ -61,7 +66,7 @@ func (e *BroadcastRequests) Delete(evt event.DeleteEvent, q workqueue.RateLimiti } // Generic implements EventHandler -func (e *BroadcastRequests) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *BroadcastRequests) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueMultiClusterLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return diff --git a/pkg/handler/multi_handler.go b/pkg/handler/multi_handler.go index dbf2e8016..335eceee3 100644 --- a/pkg/handler/multi_handler.go +++ b/pkg/handler/multi_handler.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -11,26 +13,26 @@ type MultiHandler struct { Handlers []handler.EventHandler } -func (h *MultiHandler) Create(evt event.CreateEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiHandler) Create(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) { for _, hl := range h.Handlers { - hl.Create(evt, queue) + hl.Create(ctx, evt, queue) } } -func (h *MultiHandler) Update(evt event.UpdateEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiHandler) Update(ctx context.Context, evt event.UpdateEvent, queue workqueue.RateLimitingInterface) { for _, hl := range h.Handlers { - hl.Update(evt, queue) + hl.Update(ctx, evt, queue) } } -func (h *MultiHandler) Delete(evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiHandler) Delete(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { for _, hl := range h.Handlers { - hl.Delete(evt, queue) + hl.Delete(ctx, evt, queue) } } -func (h *MultiHandler) Generic(evt event.GenericEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiHandler) Generic(ctx context.Context, evt event.GenericEvent, queue workqueue.RateLimitingInterface) { for _, hl := range h.Handlers { - hl.Generic(evt, queue) + hl.Generic(ctx, evt, queue) } } diff --git a/pkg/handler/predicate_request_tracking.go b/pkg/handler/predicate_request_tracking.go index 2a9f4723f..382484926 100644 --- a/pkg/handler/predicate_request_tracking.go +++ b/pkg/handler/predicate_request_tracking.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "github.com/solo-io/skv2/pkg/request" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -20,17 +22,19 @@ type MultiClusterRequestTracker struct { Requests *request.MultiClusterRequests } -func (h *MultiClusterRequestTracker) Create(evt event.CreateEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiClusterRequestTracker) Create(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) { h.Requests.Append(h.Cluster, RequestForObject(evt.Object)) } -func (h *MultiClusterRequestTracker) Delete(evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiClusterRequestTracker) Delete(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { h.Requests.Remove(h.Cluster, RequestForObject(evt.Object)) } -func (h *MultiClusterRequestTracker) Update(event.UpdateEvent, workqueue.RateLimitingInterface) {} +func (h *MultiClusterRequestTracker) Update(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { +} -func (h *MultiClusterRequestTracker) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {} +func (h *MultiClusterRequestTracker) Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { +} func RequestForObject(meta v1.Object) reconcile.Request { return reconcile.Request{NamespacedName: types.NamespacedName{ diff --git a/pkg/handler/register_queue_handler.go b/pkg/handler/register_queue_handler.go index c272f56b8..5017fe90b 100644 --- a/pkg/handler/register_queue_handler.go +++ b/pkg/handler/register_queue_handler.go @@ -1,6 +1,7 @@ package handler import ( + "context" "sync" apqueue "github.com/solo-io/skv2/pkg/workqueue" @@ -14,7 +15,7 @@ import ( func QueueRegisteringHandler(cluster string, queues *apqueue.MultiClusterQueues) handler.EventHandler { do := &sync.Once{} return &handler.Funcs{ - CreateFunc: func(_ event.CreateEvent, queue workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, _ event.CreateEvent, queue workqueue.RateLimitingInterface) { do.Do(func() { queues.Set(cluster, queue) }) diff --git a/pkg/reconcile/runner.go b/pkg/reconcile/runner.go index cd3acbe25..0117311b0 100644 --- a/pkg/reconcile/runner.go +++ b/pkg/reconcile/runner.go @@ -123,7 +123,7 @@ func (r *runner) RunReconciler(ctx context.Context, reconciler Reconciler, predi } // send us watch events - if err := ctl.Watch(&source.Kind{Type: r.resource}, &handler.EnqueueRequestForObject{}, predicates...); err != nil { + if err := ctl.Watch(source.Kind(r.mgr.GetCache(), r.resource), &handler.EnqueueRequestForObject{}, predicates...); err != nil { return err } diff --git a/pkg/reconcile/v2/runner.go b/pkg/reconcile/v2/runner.go index 3434b0e85..4ac5cb2f2 100644 --- a/pkg/reconcile/v2/runner.go +++ b/pkg/reconcile/v2/runner.go @@ -146,7 +146,7 @@ func (r *runner[T]) RunReconciler( } // send us watch events - if err := ctl.Watch(&source.Kind{Type: obj}, &handler.EnqueueRequestForObject{}, predicates...); err != nil { + if err := ctl.Watch(source.Kind(r.mgr.GetCache(), obj), &handler.EnqueueRequestForObject{}, predicates...); err != nil { return err }