Skip to content

Commit

Permalink
modify method to get service in endpoint related event
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiiyvwu committed Aug 29, 2024
1 parent ef16b87 commit cc9b919
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 39 deletions.
33 changes: 17 additions & 16 deletions pkg/controller/service/clbv1/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/cloud-provider-alibaba-cloud/pkg/util"
"k8s.io/klog/v2"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -154,23 +155,23 @@ func needAdd(newService *v1.Service) bool {
}

// NewEnqueueRequestForEndpointEvent, event handler for endpoint events
func NewEnqueueRequestForEndpointEvent(client client.Client, eventRecorder record.EventRecorder) *enqueueRequestForEndpointEvent {
func NewEnqueueRequestForEndpointEvent(cache cache.Cache, eventRecorder record.EventRecorder) *enqueueRequestForEndpointEvent {
return &enqueueRequestForEndpointEvent{
client: client,
cache: cache,
eventRecorder: eventRecorder,
}
}

type enqueueRequestForEndpointEvent struct {
client client.Client
cache cache.Cache
eventRecorder record.EventRecorder
}

var _ handler.EventHandler = (*enqueueRequestForEndpointEvent)(nil)

func (h *enqueueRequestForEndpointEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
ep, ok := e.Object.(*v1.Endpoints)
if ok && isEndpointProcessNeeded(ep, h.client) {
if ok && isEndpointProcessNeeded(ep, h.cache) {
util.ServiceLog.Info("controller: endpoint create event", "endpoint", util.Key(ep))
h.enqueueManagedEndpoint(queue, ep)
}
Expand All @@ -180,7 +181,7 @@ func (h *enqueueRequestForEndpointEvent) Update(_ context.Context, e event.Updat
ep1, ok1 := e.ObjectOld.(*v1.Endpoints)
ep2, ok2 := e.ObjectNew.(*v1.Endpoints)

if ok1 && ok2 && isEndpointProcessNeeded(ep1, h.client) &&
if ok1 && ok2 && isEndpointProcessNeeded(ep1, h.cache) &&
!reflect.DeepEqual(ep1.Subsets, ep2.Subsets) {
util.ServiceLog.Info("controller: endpoint update event", "endpoint", util.Key(ep1))
util.ServiceLog.Info(fmt.Sprintf("endpoints before [%s], afeter [%s]",
Expand All @@ -191,7 +192,7 @@ func (h *enqueueRequestForEndpointEvent) Update(_ context.Context, e event.Updat

func (h *enqueueRequestForEndpointEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
ep, ok := e.Object.(*v1.Endpoints)
if ok && isEndpointProcessNeeded(ep, h.client) {
if ok && isEndpointProcessNeeded(ep, h.cache) {
util.ServiceLog.Info("controller: endpoint delete event", "endpoint", util.Key(ep))
h.enqueueManagedEndpoint(queue, ep)
}
Expand All @@ -211,7 +212,7 @@ func (h *enqueueRequestForEndpointEvent) enqueueManagedEndpoint(queue workqueue.
util.ServiceLog.Info("enqueue", "endpoint", util.Key(endpoint), "queueLen", queue.Len())
}

func isEndpointProcessNeeded(ep *v1.Endpoints, client client.Client) bool {
func isEndpointProcessNeeded(ep *v1.Endpoints, cache cache.Cache) bool {
if ep == nil {
return false
}
Expand All @@ -224,7 +225,7 @@ func isEndpointProcessNeeded(ep *v1.Endpoints, client client.Client) bool {
}

svc := &v1.Service{}
err := client.Get(context.TODO(),
err := cache.Get(context.TODO(),
types.NamespacedName{
Namespace: ep.GetNamespace(),
Name: ep.GetName(),
Expand Down Expand Up @@ -353,23 +354,23 @@ func (h *enqueueRequestForNodeEvent) checkServiceAffected(node *v1.Node, svc *v1
}

// NewEnqueueRequestForEndpointSliceEvent, event handler for endpointslice event
func NewEnqueueRequestForEndpointSliceEvent(client client.Client, record record.EventRecorder) *enqueueRequestForEndpointSliceEvent {
func NewEnqueueRequestForEndpointSliceEvent(cache cache.Cache, record record.EventRecorder) *enqueueRequestForEndpointSliceEvent {
return &enqueueRequestForEndpointSliceEvent{
client: client,
cache: cache,
eventRecorder: record,
}
}

type enqueueRequestForEndpointSliceEvent struct {
client client.Client
cache cache.Cache
eventRecorder record.EventRecorder
}

var _ handler.EventHandler = (*enqueueRequestForEndpointSliceEvent)(nil)

func (h *enqueueRequestForEndpointSliceEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
es, ok := e.Object.(*discovery.EndpointSlice)
if ok && isEndpointSliceProcessNeeded(es, h.client) {
if ok && isEndpointSliceProcessNeeded(es, h.cache) {
util.ServiceLog.Info("controller: endpointslice create event", "endpointslice", util.Key(es))
h.enqueueManagedEndpointSlice(queue, es)
}
Expand All @@ -379,7 +380,7 @@ func (h *enqueueRequestForEndpointSliceEvent) Update(_ context.Context, e event.
es1, ok1 := e.ObjectOld.(*discovery.EndpointSlice)
es2, ok2 := e.ObjectNew.(*discovery.EndpointSlice)

if ok1 && ok2 && isEndpointSliceProcessNeeded(es1, h.client) &&
if ok1 && ok2 && isEndpointSliceProcessNeeded(es1, h.cache) &&
isEndpointSliceUpdateNeeded(es1, es2) {
util.ServiceLog.Info("controller: endpointslice update event", "endpointslice", util.Key(es1))
util.ServiceLog.Info(fmt.Sprintf("endpoints before [%s], afeter [%s]",
Expand All @@ -390,7 +391,7 @@ func (h *enqueueRequestForEndpointSliceEvent) Update(_ context.Context, e event.

func (h *enqueueRequestForEndpointSliceEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
es, ok := e.Object.(*discovery.EndpointSlice)
if ok && isEndpointSliceProcessNeeded(es, h.client) {
if ok && isEndpointSliceProcessNeeded(es, h.cache) {
util.ServiceLog.Info("controller: endpointslice delete event", "endpointslice", util.Key(es))
h.enqueueManagedEndpointSlice(queue, es)
}
Expand All @@ -416,7 +417,7 @@ func (h *enqueueRequestForEndpointSliceEvent) enqueueManagedEndpointSlice(queue
util.ServiceLog.Info("enqueue", "endpointslice", util.Key(endpointSlice), "queueLen", queue.Len())
}

func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, client client.Client) bool {
func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, cache cache.Cache) bool {
if es == nil {
return false
}
Expand All @@ -427,7 +428,7 @@ func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, client client.Cli
}

svc := &v1.Service{}
err := client.Get(context.TODO(),
err := cache.Get(context.TODO(),
types.NamespacedName{
Namespace: es.Namespace,
Name: serviceName,
Expand Down
9 changes: 4 additions & 5 deletions pkg/controller/service/clbv1/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
prvd "k8s.io/cloud-provider-alibaba-cloud/pkg/provider"
"k8s.io/klog/v2"
"os"
"strings"
"time"
Expand All @@ -13,8 +15,6 @@ import (
discovery "k8s.io/api/discovery/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"k8s.io/cloud-provider-alibaba-cloud/pkg/controller/service/reconcile/annotation"
svcCtx "k8s.io/cloud-provider-alibaba-cloud/pkg/controller/service/reconcile/context"
"k8s.io/cloud-provider-alibaba-cloud/pkg/util/metric"
Expand All @@ -36,7 +36,6 @@ import (
"k8s.io/cloud-provider-alibaba-cloud/pkg/context/shared"
"k8s.io/cloud-provider-alibaba-cloud/pkg/controller/helper"
"k8s.io/cloud-provider-alibaba-cloud/pkg/model"
"k8s.io/cloud-provider-alibaba-cloud/pkg/provider"
"k8s.io/cloud-provider-alibaba-cloud/pkg/provider/alibaba/vpc"
"k8s.io/cloud-provider-alibaba-cloud/pkg/provider/dryrun"
"k8s.io/cloud-provider-alibaba-cloud/pkg/util"
Expand Down Expand Up @@ -108,13 +107,13 @@ func add(mgr manager.Manager, r *ReconcileService) error {
if utilfeature.DefaultFeatureGate.Enabled(ctrlCfg.EndpointSlice) {
// watch endpointslice
if err := c.Watch(source.Kind(mgr.GetCache(), &discovery.EndpointSlice{}),
NewEnqueueRequestForEndpointSliceEvent(mgr.GetClient(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
NewEnqueueRequestForEndpointSliceEvent(mgr.GetCache(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
return fmt.Errorf("watch resource endpointslice error: %s", err.Error())
}
} else {
// watch endpoints
if err := c.Watch(source.Kind(mgr.GetCache(), &v1.Endpoints{}),
NewEnqueueRequestForEndpointEvent(mgr.GetClient(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
NewEnqueueRequestForEndpointEvent(mgr.GetCache(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
return fmt.Errorf("watch resource endpoint error: %s", err.Error())
}
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/controller/service/nlbv2/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/cloud-provider-alibaba-cloud/pkg/util"
"k8s.io/klog/v2"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -149,23 +150,23 @@ func needAdd(newService *v1.Service) bool {
}

// NewEnqueueRequestForEndpointEvent, event handler for endpoint events
func NewEnqueueRequestForEndpointEvent(client client.Client, eventRecorder record.EventRecorder) *enqueueRequestForEndpointEvent {
func NewEnqueueRequestForEndpointEvent(cache cache.Cache, eventRecorder record.EventRecorder) *enqueueRequestForEndpointEvent {
return &enqueueRequestForEndpointEvent{
client: client,
cache: cache,
eventRecorder: eventRecorder,
}
}

type enqueueRequestForEndpointEvent struct {
client client.Client
cache cache.Cache
eventRecorder record.EventRecorder
}

var _ handler.EventHandler = (*enqueueRequestForEndpointEvent)(nil)

func (h *enqueueRequestForEndpointEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
ep, ok := e.Object.(*v1.Endpoints)
if ok && isEndpointProcessNeeded(ep, h.client) {
if ok && isEndpointProcessNeeded(ep, h.cache) {
util.NLBLog.Info("controller: endpoint create event", "endpoint", util.Key(ep))
h.enqueueManagedEndpoint(queue, ep)
}
Expand All @@ -175,7 +176,7 @@ func (h *enqueueRequestForEndpointEvent) Update(_ context.Context, e event.Updat
ep1, ok1 := e.ObjectOld.(*v1.Endpoints)
ep2, ok2 := e.ObjectNew.(*v1.Endpoints)

if ok1 && ok2 && isEndpointProcessNeeded(ep1, h.client) &&
if ok1 && ok2 && isEndpointProcessNeeded(ep1, h.cache) &&
!reflect.DeepEqual(ep1.Subsets, ep2.Subsets) {
util.NLBLog.Info("controller: endpoint update event", "endpoint", util.Key(ep1))
util.NLBLog.Info(fmt.Sprintf("endpoints before [%s], afeter [%s]",
Expand All @@ -186,7 +187,7 @@ func (h *enqueueRequestForEndpointEvent) Update(_ context.Context, e event.Updat

func (h *enqueueRequestForEndpointEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
ep, ok := e.Object.(*v1.Endpoints)
if ok && isEndpointProcessNeeded(ep, h.client) {
if ok && isEndpointProcessNeeded(ep, h.cache) {
util.NLBLog.Info("controller: endpoint delete event", "endpoint", util.Key(ep))
h.enqueueManagedEndpoint(queue, ep)
}
Expand All @@ -206,7 +207,7 @@ func (h *enqueueRequestForEndpointEvent) enqueueManagedEndpoint(queue workqueue.
util.NLBLog.Info("enqueue", "endpoint", util.Key(endpoint), "queueLen", queue.Len())
}

func isEndpointProcessNeeded(ep *v1.Endpoints, client client.Client) bool {
func isEndpointProcessNeeded(ep *v1.Endpoints, cache cache.Cache) bool {
if ep == nil {
return false
}
Expand All @@ -219,7 +220,7 @@ func isEndpointProcessNeeded(ep *v1.Endpoints, client client.Client) bool {
}

svc := &v1.Service{}
err := client.Get(context.TODO(),
err := cache.Get(context.TODO(),
types.NamespacedName{
Namespace: ep.GetNamespace(),
Name: ep.GetName(),
Expand Down Expand Up @@ -350,23 +351,23 @@ func (h *enqueueRequestForNodeEvent) checkServiceAffected(node *v1.Node, svc *v1
}

// NewEnqueueRequestForEndpointSliceEvent, event handler for endpointslice event
func NewEnqueueRequestForEndpointSliceEvent(client client.Client, record record.EventRecorder) *enqueueRequestForEndpointSliceEvent {
func NewEnqueueRequestForEndpointSliceEvent(cache cache.Cache, record record.EventRecorder) *enqueueRequestForEndpointSliceEvent {
return &enqueueRequestForEndpointSliceEvent{
client: client,
cache: cache,
eventRecorder: record,
}
}

type enqueueRequestForEndpointSliceEvent struct {
client client.Client
cache cache.Cache
eventRecorder record.EventRecorder
}

var _ handler.EventHandler = (*enqueueRequestForEndpointSliceEvent)(nil)

func (h *enqueueRequestForEndpointSliceEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
es, ok := e.Object.(*discovery.EndpointSlice)
if ok && isEndpointSliceProcessNeeded(es, h.client) {
if ok && isEndpointSliceProcessNeeded(es, h.cache) {
util.NLBLog.Info("controller: endpointslice create event", "endpointslice", util.Key(es))
h.enqueueManagedEndpointSlice(queue, es)
}
Expand All @@ -376,7 +377,7 @@ func (h *enqueueRequestForEndpointSliceEvent) Update(_ context.Context, e event.
es1, ok1 := e.ObjectOld.(*discovery.EndpointSlice)
es2, ok2 := e.ObjectNew.(*discovery.EndpointSlice)

if ok1 && ok2 && isEndpointSliceProcessNeeded(es1, h.client) &&
if ok1 && ok2 && isEndpointSliceProcessNeeded(es1, h.cache) &&
isEndpointSliceUpdateNeeded(es1, es2) {
util.NLBLog.Info("controller: endpointslice update event", "endpointslice", util.Key(es1))
util.NLBLog.Info(fmt.Sprintf("endpoints before [%s], afeter [%s]",
Expand All @@ -387,7 +388,7 @@ func (h *enqueueRequestForEndpointSliceEvent) Update(_ context.Context, e event.

func (h *enqueueRequestForEndpointSliceEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
es, ok := e.Object.(*discovery.EndpointSlice)
if ok && isEndpointSliceProcessNeeded(es, h.client) {
if ok && isEndpointSliceProcessNeeded(es, h.cache) {
util.NLBLog.Info("controller: endpointslice delete event", "endpointslice", util.Key(es))
h.enqueueManagedEndpointSlice(queue, es)
}
Expand All @@ -413,7 +414,7 @@ func (h *enqueueRequestForEndpointSliceEvent) enqueueManagedEndpointSlice(queue
util.NLBLog.Info("enqueue", "endpointslice", util.Key(endpointSlice), "queueLen", queue.Len())
}

func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, client client.Client) bool {
func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, cache cache.Cache) bool {
if es == nil {
return false
}
Expand All @@ -424,7 +425,7 @@ func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, client client.Cli
}

svc := &v1.Service{}
err := client.Get(context.TODO(),
err := cache.Get(context.TODO(),
types.NamespacedName{
Namespace: es.Namespace,
Name: serviceName,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/service/nlbv2/nlb_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ func add(mgr manager.Manager, r *ReconcileNLB) error {
if utilfeature.DefaultFeatureGate.Enabled(ctrlCfg.EndpointSlice) {
// watch endpointslice
if err := c.Watch(source.Kind(mgr.GetCache(), &discovery.EndpointSlice{}),
NewEnqueueRequestForEndpointSliceEvent(mgr.GetClient(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
NewEnqueueRequestForEndpointSliceEvent(mgr.GetCache(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
return fmt.Errorf("watch resource endpointslice error: %s", err.Error())
}
} else {
// watch endpoints
if err := c.Watch(source.Kind(mgr.GetCache(), &v1.Endpoints{}),
NewEnqueueRequestForEndpointEvent(mgr.GetClient(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
NewEnqueueRequestForEndpointEvent(mgr.GetCache(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
return fmt.Errorf("watch resource endpoint error: %s", err.Error())
}
}
Expand Down

0 comments on commit cc9b919

Please sign in to comment.