Skip to content

Commit

Permalink
[WIP] Additional k8s 1.27 upgrade changes (#465)
Browse files Browse the repository at this point in the history
* enqueue_static_handler: function signature change to include context

* predicate_request_tracking: function signature change to include context

* multi_handler: function signature change to include context

* register_queue_handler: function signature change to include context

* cache: function signature change to include context

* support source.kind constructor

* undo unnecessary context adjustment to function signature of event handler
  • Loading branch information
sam-heilbron authored Jul 18, 2023
1 parent 4377fca commit eb4765b
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 24 deletions.
9 changes: 5 additions & 4 deletions pkg/events/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"context"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -94,19 +95,19 @@ func (c *cache) handleEvent(evt eventType, q workqueue.RateLimitingInterface) {
}})
}

func (c *cache) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
func (c *cache) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
c.handleEvent(createEvent(evt), q)
}

func (c *cache) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
func (c *cache) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
c.handleEvent(updateEvent(evt), q)
}

func (c *cache) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
func (c *cache) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
c.handleEvent(deleteEvent(evt), q)
}

func (c *cache) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
func (c *cache) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
c.handleEvent(genericEvent(evt), q)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/events/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (w *watcher) Watch(ctx context.Context, eventHandler EventHandler, predicat
}

// create a source for the resource type
src := &source.Kind{Type: w.resource}
src := source.Kind(w.mgr.GetCache(), w.resource)

// send watch events to the Cache
if err := ctl.Watch(src, reconciler.events, predicates...); err != nil {
Expand Down
13 changes: 9 additions & 4 deletions pkg/handler/enqueue_static_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handler

import (
"context"

"github.com/solo-io/skv2/pkg/request"
skqueue "github.com/solo-io/skv2/pkg/workqueue"
"k8s.io/client-go/util/workqueue"
Expand All @@ -27,8 +29,11 @@ type BroadcastRequests struct {
WorkQueues *skqueue.MultiClusterQueues
}

// TODO: (sam-heilbron)
// How should the context be propagated?

// Create implements EventHandler
func (e *BroadcastRequests) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
func (e *BroadcastRequests) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueMultiClusterLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
Expand All @@ -37,7 +42,7 @@ func (e *BroadcastRequests) Create(evt event.CreateEvent, q workqueue.RateLimiti
}

// Update implements EventHandler
func (e *BroadcastRequests) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
func (e *BroadcastRequests) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.ObjectOld != nil {
e.enqueueRequestsAllClusters()
} else {
Expand All @@ -52,7 +57,7 @@ func (e *BroadcastRequests) Update(evt event.UpdateEvent, q workqueue.RateLimiti
}

// Delete implements EventHandler
func (e *BroadcastRequests) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
func (e *BroadcastRequests) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueMultiClusterLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
Expand All @@ -61,7 +66,7 @@ func (e *BroadcastRequests) Delete(evt event.DeleteEvent, q workqueue.RateLimiti
}

// Generic implements EventHandler
func (e *BroadcastRequests) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
func (e *BroadcastRequests) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueMultiClusterLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
return
Expand Down
18 changes: 10 additions & 8 deletions pkg/handler/multi_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handler

import (
"context"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -11,26 +13,26 @@ type MultiHandler struct {
Handlers []handler.EventHandler
}

func (h *MultiHandler) Create(evt event.CreateEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiHandler) Create(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) {
for _, hl := range h.Handlers {
hl.Create(evt, queue)
hl.Create(ctx, evt, queue)
}
}

func (h *MultiHandler) Update(evt event.UpdateEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiHandler) Update(ctx context.Context, evt event.UpdateEvent, queue workqueue.RateLimitingInterface) {
for _, hl := range h.Handlers {
hl.Update(evt, queue)
hl.Update(ctx, evt, queue)
}
}

func (h *MultiHandler) Delete(evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiHandler) Delete(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
for _, hl := range h.Handlers {
hl.Delete(evt, queue)
hl.Delete(ctx, evt, queue)
}
}

func (h *MultiHandler) Generic(evt event.GenericEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiHandler) Generic(ctx context.Context, evt event.GenericEvent, queue workqueue.RateLimitingInterface) {
for _, hl := range h.Handlers {
hl.Generic(evt, queue)
hl.Generic(ctx, evt, queue)
}
}
12 changes: 8 additions & 4 deletions pkg/handler/predicate_request_tracking.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handler

import (
"context"

"github.com/solo-io/skv2/pkg/request"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -20,17 +22,19 @@ type MultiClusterRequestTracker struct {
Requests *request.MultiClusterRequests
}

func (h *MultiClusterRequestTracker) Create(evt event.CreateEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiClusterRequestTracker) Create(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) {
h.Requests.Append(h.Cluster, RequestForObject(evt.Object))
}

func (h *MultiClusterRequestTracker) Delete(evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiClusterRequestTracker) Delete(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
h.Requests.Remove(h.Cluster, RequestForObject(evt.Object))
}

func (h *MultiClusterRequestTracker) Update(event.UpdateEvent, workqueue.RateLimitingInterface) {}
func (h *MultiClusterRequestTracker) Update(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
}

func (h *MultiClusterRequestTracker) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {}
func (h *MultiClusterRequestTracker) Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
}

func RequestForObject(meta v1.Object) reconcile.Request {
return reconcile.Request{NamespacedName: types.NamespacedName{
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/register_queue_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handler

import (
"context"
"sync"

apqueue "github.com/solo-io/skv2/pkg/workqueue"
Expand All @@ -14,7 +15,7 @@ import (
func QueueRegisteringHandler(cluster string, queues *apqueue.MultiClusterQueues) handler.EventHandler {
do := &sync.Once{}
return &handler.Funcs{
CreateFunc: func(_ event.CreateEvent, queue workqueue.RateLimitingInterface) {
CreateFunc: func(ctx context.Context, _ event.CreateEvent, queue workqueue.RateLimitingInterface) {
do.Do(func() {
queues.Set(cluster, queue)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconcile/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *runner) RunReconciler(ctx context.Context, reconciler Reconciler, predi
}

// send us watch events
if err := ctl.Watch(&source.Kind{Type: r.resource}, &handler.EnqueueRequestForObject{}, predicates...); err != nil {
if err := ctl.Watch(source.Kind(r.mgr.GetCache(), r.resource), &handler.EnqueueRequestForObject{}, predicates...); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/reconcile/v2/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *runner[T]) RunReconciler(
}

// send us watch events
if err := ctl.Watch(&source.Kind{Type: obj}, &handler.EnqueueRequestForObject{}, predicates...); err != nil {
if err := ctl.Watch(source.Kind(r.mgr.GetCache(), obj), &handler.EnqueueRequestForObject{}, predicates...); err != nil {
return err
}

Expand Down

0 comments on commit eb4765b

Please sign in to comment.