From e026688babf37041c4b2b94d4ce9b366636ecc04 Mon Sep 17 00:00:00 2001 From: Sam Heilbron Date: Tue, 18 Jul 2023 09:21:33 -0600 Subject: [PATCH 1/7] enqueue_static_handler: function signature change to include context --- pkg/handler/enqueue_static_handler.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/handler/enqueue_static_handler.go b/pkg/handler/enqueue_static_handler.go index 1154ffd1f..e22fa2a77 100644 --- a/pkg/handler/enqueue_static_handler.go +++ b/pkg/handler/enqueue_static_handler.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "github.com/solo-io/skv2/pkg/request" skqueue "github.com/solo-io/skv2/pkg/workqueue" "k8s.io/client-go/util/workqueue" @@ -27,8 +29,11 @@ type BroadcastRequests struct { WorkQueues *skqueue.MultiClusterQueues } +// TODO: (sam-heilbron) +// How should the context be propagated? + // Create implements EventHandler -func (e *BroadcastRequests) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (e *BroadcastRequests) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueMultiClusterLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return @@ -37,7 +42,7 @@ func (e *BroadcastRequests) Create(evt event.CreateEvent, q workqueue.RateLimiti } // Update implements EventHandler -func (e *BroadcastRequests) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (e *BroadcastRequests) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { if evt.ObjectOld != nil { e.enqueueRequestsAllClusters() } else { @@ -52,7 +57,7 @@ func (e *BroadcastRequests) Update(evt event.UpdateEvent, q workqueue.RateLimiti } // Delete implements EventHandler -func (e *BroadcastRequests) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (e *BroadcastRequests) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueMultiClusterLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return @@ -61,7 +66,7 @@ func (e *BroadcastRequests) Delete(evt event.DeleteEvent, q workqueue.RateLimiti } // Generic implements EventHandler -func (e *BroadcastRequests) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (e *BroadcastRequests) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueMultiClusterLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return From a14c8cfcd65e6bf0e0ed50e9a8e5c2b802b094ce Mon Sep 17 00:00:00 2001 From: Sam Heilbron Date: Tue, 18 Jul 2023 09:24:49 -0600 Subject: [PATCH 2/7] predicate_request_tracking: function signature change to include context --- pkg/handler/predicate_request_tracking.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/handler/predicate_request_tracking.go b/pkg/handler/predicate_request_tracking.go index 2a9f4723f..382484926 100644 --- a/pkg/handler/predicate_request_tracking.go +++ b/pkg/handler/predicate_request_tracking.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "github.com/solo-io/skv2/pkg/request" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -20,17 +22,19 @@ type MultiClusterRequestTracker struct { Requests *request.MultiClusterRequests } -func (h *MultiClusterRequestTracker) Create(evt event.CreateEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiClusterRequestTracker) Create(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) { h.Requests.Append(h.Cluster, RequestForObject(evt.Object)) } -func (h *MultiClusterRequestTracker) Delete(evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiClusterRequestTracker) Delete(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { h.Requests.Remove(h.Cluster, RequestForObject(evt.Object)) } -func (h *MultiClusterRequestTracker) Update(event.UpdateEvent, workqueue.RateLimitingInterface) {} +func (h *MultiClusterRequestTracker) Update(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) { +} -func (h *MultiClusterRequestTracker) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {} +func (h *MultiClusterRequestTracker) Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) { +} func RequestForObject(meta v1.Object) reconcile.Request { return reconcile.Request{NamespacedName: types.NamespacedName{ From 9d4381a23a4664e1131c76df29bb6d25db0c1337 Mon Sep 17 00:00:00 2001 From: Sam Heilbron Date: Tue, 18 Jul 2023 09:25:53 -0600 Subject: [PATCH 3/7] multi_handler: function signature change to include context --- pkg/handler/multi_handler.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/handler/multi_handler.go b/pkg/handler/multi_handler.go index dbf2e8016..335eceee3 100644 --- a/pkg/handler/multi_handler.go +++ b/pkg/handler/multi_handler.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -11,26 +13,26 @@ type MultiHandler struct { Handlers []handler.EventHandler } -func (h *MultiHandler) Create(evt event.CreateEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiHandler) Create(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) { for _, hl := range h.Handlers { - hl.Create(evt, queue) + hl.Create(ctx, evt, queue) } } -func (h *MultiHandler) Update(evt event.UpdateEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiHandler) Update(ctx context.Context, evt event.UpdateEvent, queue workqueue.RateLimitingInterface) { for _, hl := range h.Handlers { - hl.Update(evt, queue) + hl.Update(ctx, evt, queue) } } -func (h *MultiHandler) Delete(evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiHandler) Delete(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) { for _, hl := range h.Handlers { - hl.Delete(evt, queue) + hl.Delete(ctx, evt, queue) } } -func (h *MultiHandler) Generic(evt event.GenericEvent, queue workqueue.RateLimitingInterface) { +func (h *MultiHandler) Generic(ctx context.Context, evt event.GenericEvent, queue workqueue.RateLimitingInterface) { for _, hl := range h.Handlers { - hl.Generic(evt, queue) + hl.Generic(ctx, evt, queue) } } From 0d1c41657f01e67087f1e1d76001f75bedc3cfbf Mon Sep 17 00:00:00 2001 From: Sam Heilbron Date: Tue, 18 Jul 2023 09:26:44 -0600 Subject: [PATCH 4/7] register_queue_handler: function signature change to include context --- pkg/handler/register_queue_handler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/handler/register_queue_handler.go b/pkg/handler/register_queue_handler.go index c272f56b8..5017fe90b 100644 --- a/pkg/handler/register_queue_handler.go +++ b/pkg/handler/register_queue_handler.go @@ -1,6 +1,7 @@ package handler import ( + "context" "sync" apqueue "github.com/solo-io/skv2/pkg/workqueue" @@ -14,7 +15,7 @@ import ( func QueueRegisteringHandler(cluster string, queues *apqueue.MultiClusterQueues) handler.EventHandler { do := &sync.Once{} return &handler.Funcs{ - CreateFunc: func(_ event.CreateEvent, queue workqueue.RateLimitingInterface) { + CreateFunc: func(ctx context.Context, _ event.CreateEvent, queue workqueue.RateLimitingInterface) { do.Do(func() { queues.Set(cluster, queue) }) From eb69eaf3420e856bcde9eb70f68fef6c5b62b803 Mon Sep 17 00:00:00 2001 From: Sam Heilbron Date: Tue, 18 Jul 2023 09:48:41 -0600 Subject: [PATCH 5/7] cache: function signature change to include context --- pkg/events/cache.go | 9 +++++---- pkg/events/watcher.go | 14 +++++++------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/events/cache.go b/pkg/events/cache.go index 006d031c1..1f493e362 100644 --- a/pkg/events/cache.go +++ b/pkg/events/cache.go @@ -1,6 +1,7 @@ package events import ( + "context" "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -94,19 +95,19 @@ func (c *cache) handleEvent(evt eventType, q workqueue.RateLimitingInterface) { }}) } -func (c *cache) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (c *cache) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { c.handleEvent(createEvent(evt), q) } -func (c *cache) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (c *cache) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { c.handleEvent(updateEvent(evt), q) } -func (c *cache) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (c *cache) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { c.handleEvent(deleteEvent(evt), q) } -func (c *cache) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (c *cache) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { c.handleEvent(genericEvent(evt), q) } diff --git a/pkg/events/watcher.go b/pkg/events/watcher.go index 4fdb51fdb..828debc79 100644 --- a/pkg/events/watcher.go +++ b/pkg/events/watcher.go @@ -17,11 +17,11 @@ import ( type EventHandler interface { Create(object client.Object) error - Delete(object client.Object) error + Delete(ctx context.Context, object client.Object) error - Update(old, new client.Object) error + Update(ctx context.Context, old, new client.Object) error - Generic(object client.Object) error + Generic(ctx context.Context, object client.Object) error } // an EventWatcher is a controller-runtime reconciler that @@ -90,19 +90,19 @@ func (w *eventWatcher) Reconcile(ctx context.Context, request reconcile.Request) switch evt := event.(type) { case createEvent: - if err := w.eventHandler.Create(evt.Object); err != nil { + if err := w.eventHandler.Create(ctx, evt.Object); err != nil { return reconcile.Result{}, err } case updateEvent: - if err := w.eventHandler.Update(evt.ObjectOld, evt.ObjectNew); err != nil { + if err := w.eventHandler.Update(ctx, evt.ObjectOld, evt.ObjectNew); err != nil { return reconcile.Result{}, err } case deleteEvent: - if err := w.eventHandler.Delete(evt.Object); err != nil { + if err := w.eventHandler.Delete(ctx, evt.Object); err != nil { return reconcile.Result{}, err } case genericEvent: - if err := w.eventHandler.Generic(evt.Object); err != nil { + if err := w.eventHandler.Generic(ctx, evt.Object); err != nil { return reconcile.Result{}, err } default: From 1321340da38c1b1a8a56fca5dbae13aa21477ce0 Mon Sep 17 00:00:00 2001 From: Sam Heilbron Date: Tue, 18 Jul 2023 09:53:51 -0600 Subject: [PATCH 6/7] support source.kind constructor --- pkg/events/watcher.go | 4 ++-- pkg/reconcile/runner.go | 2 +- pkg/reconcile/v2/runner.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/events/watcher.go b/pkg/events/watcher.go index 828debc79..48b52981b 100644 --- a/pkg/events/watcher.go +++ b/pkg/events/watcher.go @@ -15,7 +15,7 @@ import ( ) type EventHandler interface { - Create(object client.Object) error + Create(ctx context.Context, object client.Object) error Delete(ctx context.Context, object client.Object) error @@ -58,7 +58,7 @@ func (w *watcher) Watch(ctx context.Context, eventHandler EventHandler, predicat } // create a source for the resource type - src := &source.Kind{Type: w.resource} + src := source.Kind(w.mgr.GetCache(), w.resource) // send watch events to the Cache if err := ctl.Watch(src, reconciler.events, predicates...); err != nil { diff --git a/pkg/reconcile/runner.go b/pkg/reconcile/runner.go index cd3acbe25..0117311b0 100644 --- a/pkg/reconcile/runner.go +++ b/pkg/reconcile/runner.go @@ -123,7 +123,7 @@ func (r *runner) RunReconciler(ctx context.Context, reconciler Reconciler, predi } // send us watch events - if err := ctl.Watch(&source.Kind{Type: r.resource}, &handler.EnqueueRequestForObject{}, predicates...); err != nil { + if err := ctl.Watch(source.Kind(r.mgr.GetCache(), r.resource), &handler.EnqueueRequestForObject{}, predicates...); err != nil { return err } diff --git a/pkg/reconcile/v2/runner.go b/pkg/reconcile/v2/runner.go index 3434b0e85..4ac5cb2f2 100644 --- a/pkg/reconcile/v2/runner.go +++ b/pkg/reconcile/v2/runner.go @@ -146,7 +146,7 @@ func (r *runner[T]) RunReconciler( } // send us watch events - if err := ctl.Watch(&source.Kind{Type: obj}, &handler.EnqueueRequestForObject{}, predicates...); err != nil { + if err := ctl.Watch(source.Kind(r.mgr.GetCache(), obj), &handler.EnqueueRequestForObject{}, predicates...); err != nil { return err } From 263383029c9b0da5ebc3b7802f4cc174f7d13912 Mon Sep 17 00:00:00 2001 From: Sam Heilbron Date: Tue, 18 Jul 2023 09:59:24 -0600 Subject: [PATCH 7/7] undo unnecessary context adjustment to function signature of event handler --- pkg/events/watcher.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/events/watcher.go b/pkg/events/watcher.go index 48b52981b..067ef41f7 100644 --- a/pkg/events/watcher.go +++ b/pkg/events/watcher.go @@ -15,13 +15,13 @@ import ( ) type EventHandler interface { - Create(ctx context.Context, object client.Object) error + Create(object client.Object) error - Delete(ctx context.Context, object client.Object) error + Delete(object client.Object) error - Update(ctx context.Context, old, new client.Object) error + Update(old, new client.Object) error - Generic(ctx context.Context, object client.Object) error + Generic(object client.Object) error } // an EventWatcher is a controller-runtime reconciler that @@ -90,19 +90,19 @@ func (w *eventWatcher) Reconcile(ctx context.Context, request reconcile.Request) switch evt := event.(type) { case createEvent: - if err := w.eventHandler.Create(ctx, evt.Object); err != nil { + if err := w.eventHandler.Create(evt.Object); err != nil { return reconcile.Result{}, err } case updateEvent: - if err := w.eventHandler.Update(ctx, evt.ObjectOld, evt.ObjectNew); err != nil { + if err := w.eventHandler.Update(evt.ObjectOld, evt.ObjectNew); err != nil { return reconcile.Result{}, err } case deleteEvent: - if err := w.eventHandler.Delete(ctx, evt.Object); err != nil { + if err := w.eventHandler.Delete(evt.Object); err != nil { return reconcile.Result{}, err } case genericEvent: - if err := w.eventHandler.Generic(ctx, evt.Object); err != nil { + if err := w.eventHandler.Generic(evt.Object); err != nil { return reconcile.Result{}, err } default: