Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Additional k8s 1.27 upgrade changes #465

Merged
merged 7 commits into from
Jul 18, 2023
9 changes: 5 additions & 4 deletions pkg/events/cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"context"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -94,19 +95,19 @@ func (c *cache) handleEvent(evt eventType, q workqueue.RateLimitingInterface) {
}})
}

func (c *cache) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
func (c *cache) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
c.handleEvent(createEvent(evt), q)
}

func (c *cache) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
func (c *cache) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
c.handleEvent(updateEvent(evt), q)
}

func (c *cache) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
func (c *cache) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
c.handleEvent(deleteEvent(evt), q)
}

func (c *cache) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
func (c *cache) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
c.handleEvent(genericEvent(evt), q)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/events/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (w *watcher) Watch(ctx context.Context, eventHandler EventHandler, predicat
}

// create a source for the resource type
src := &source.Kind{Type: w.resource}
src := source.Kind(w.mgr.GetCache(), w.resource)

// send watch events to the Cache
if err := ctl.Watch(src, reconciler.events, predicates...); err != nil {
Expand Down
13 changes: 9 additions & 4 deletions pkg/handler/enqueue_static_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handler

import (
"context"

"github.com/solo-io/skv2/pkg/request"
skqueue "github.com/solo-io/skv2/pkg/workqueue"
"k8s.io/client-go/util/workqueue"
Expand All @@ -27,8 +29,11 @@ type BroadcastRequests struct {
WorkQueues *skqueue.MultiClusterQueues
}

// TODO: (sam-heilbron)
// How should the context be propagated?

// Create implements EventHandler
func (e *BroadcastRequests) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
func (e *BroadcastRequests) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueMultiClusterLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
Expand All @@ -37,7 +42,7 @@ func (e *BroadcastRequests) Create(evt event.CreateEvent, q workqueue.RateLimiti
}

// Update implements EventHandler
func (e *BroadcastRequests) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
func (e *BroadcastRequests) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.ObjectOld != nil {
e.enqueueRequestsAllClusters()
} else {
Expand All @@ -52,7 +57,7 @@ func (e *BroadcastRequests) Update(evt event.UpdateEvent, q workqueue.RateLimiti
}

// Delete implements EventHandler
func (e *BroadcastRequests) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
func (e *BroadcastRequests) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueMultiClusterLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
Expand All @@ -61,7 +66,7 @@ func (e *BroadcastRequests) Delete(evt event.DeleteEvent, q workqueue.RateLimiti
}

// Generic implements EventHandler
func (e *BroadcastRequests) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
func (e *BroadcastRequests) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueMultiClusterLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
return
Expand Down
18 changes: 10 additions & 8 deletions pkg/handler/multi_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handler

import (
"context"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -11,26 +13,26 @@ type MultiHandler struct {
Handlers []handler.EventHandler
}

func (h *MultiHandler) Create(evt event.CreateEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiHandler) Create(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) {
for _, hl := range h.Handlers {
hl.Create(evt, queue)
hl.Create(ctx, evt, queue)
}
}

func (h *MultiHandler) Update(evt event.UpdateEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiHandler) Update(ctx context.Context, evt event.UpdateEvent, queue workqueue.RateLimitingInterface) {
for _, hl := range h.Handlers {
hl.Update(evt, queue)
hl.Update(ctx, evt, queue)
}
}

func (h *MultiHandler) Delete(evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiHandler) Delete(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
for _, hl := range h.Handlers {
hl.Delete(evt, queue)
hl.Delete(ctx, evt, queue)
}
}

func (h *MultiHandler) Generic(evt event.GenericEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiHandler) Generic(ctx context.Context, evt event.GenericEvent, queue workqueue.RateLimitingInterface) {
for _, hl := range h.Handlers {
hl.Generic(evt, queue)
hl.Generic(ctx, evt, queue)
}
}
12 changes: 8 additions & 4 deletions pkg/handler/predicate_request_tracking.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package handler

import (
"context"

"github.com/solo-io/skv2/pkg/request"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -20,17 +22,19 @@ type MultiClusterRequestTracker struct {
Requests *request.MultiClusterRequests
}

func (h *MultiClusterRequestTracker) Create(evt event.CreateEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiClusterRequestTracker) Create(ctx context.Context, evt event.CreateEvent, queue workqueue.RateLimitingInterface) {
h.Requests.Append(h.Cluster, RequestForObject(evt.Object))
}

func (h *MultiClusterRequestTracker) Delete(evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
func (h *MultiClusterRequestTracker) Delete(ctx context.Context, evt event.DeleteEvent, queue workqueue.RateLimitingInterface) {
h.Requests.Remove(h.Cluster, RequestForObject(evt.Object))
}

func (h *MultiClusterRequestTracker) Update(event.UpdateEvent, workqueue.RateLimitingInterface) {}
func (h *MultiClusterRequestTracker) Update(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
}

func (h *MultiClusterRequestTracker) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {}
func (h *MultiClusterRequestTracker) Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
}

func RequestForObject(meta v1.Object) reconcile.Request {
return reconcile.Request{NamespacedName: types.NamespacedName{
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/register_queue_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handler

import (
"context"
"sync"

apqueue "github.com/solo-io/skv2/pkg/workqueue"
Expand All @@ -14,7 +15,7 @@ import (
func QueueRegisteringHandler(cluster string, queues *apqueue.MultiClusterQueues) handler.EventHandler {
do := &sync.Once{}
return &handler.Funcs{
CreateFunc: func(_ event.CreateEvent, queue workqueue.RateLimitingInterface) {
CreateFunc: func(ctx context.Context, _ event.CreateEvent, queue workqueue.RateLimitingInterface) {
do.Do(func() {
queues.Set(cluster, queue)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconcile/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *runner) RunReconciler(ctx context.Context, reconciler Reconciler, predi
}

// send us watch events
if err := ctl.Watch(&source.Kind{Type: r.resource}, &handler.EnqueueRequestForObject{}, predicates...); err != nil {
if err := ctl.Watch(source.Kind(r.mgr.GetCache(), r.resource), &handler.EnqueueRequestForObject{}, predicates...); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/reconcile/v2/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *runner[T]) RunReconciler(
}

// send us watch events
if err := ctl.Watch(&source.Kind{Type: obj}, &handler.EnqueueRequestForObject{}, predicates...); err != nil {
if err := ctl.Watch(source.Kind(r.mgr.GetCache(), obj), &handler.EnqueueRequestForObject{}, predicates...); err != nil {
return err
}

Expand Down