Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modify method to get service in endpoint related event #413

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions pkg/controller/service/clbv1/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/cloud-provider-alibaba-cloud/pkg/util"
"k8s.io/klog/v2"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -154,23 +155,23 @@ func needAdd(newService *v1.Service) bool {
}

// NewEnqueueRequestForEndpointEvent, event handler for endpoint events
func NewEnqueueRequestForEndpointEvent(client client.Client, eventRecorder record.EventRecorder) *enqueueRequestForEndpointEvent {
func NewEnqueueRequestForEndpointEvent(cache cache.Cache, eventRecorder record.EventRecorder) *enqueueRequestForEndpointEvent {
return &enqueueRequestForEndpointEvent{
client: client,
cache: cache,
eventRecorder: eventRecorder,
}
}

type enqueueRequestForEndpointEvent struct {
client client.Client
cache cache.Cache
eventRecorder record.EventRecorder
}

var _ handler.EventHandler = (*enqueueRequestForEndpointEvent)(nil)

func (h *enqueueRequestForEndpointEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
ep, ok := e.Object.(*v1.Endpoints)
if ok && isEndpointProcessNeeded(ep, h.client) {
if ok && isEndpointProcessNeeded(ep, h.cache) {
util.ServiceLog.Info("controller: endpoint create event", "endpoint", util.Key(ep))
h.enqueueManagedEndpoint(queue, ep)
}
Expand All @@ -180,7 +181,7 @@ func (h *enqueueRequestForEndpointEvent) Update(_ context.Context, e event.Updat
ep1, ok1 := e.ObjectOld.(*v1.Endpoints)
ep2, ok2 := e.ObjectNew.(*v1.Endpoints)

if ok1 && ok2 && isEndpointProcessNeeded(ep1, h.client) &&
if ok1 && ok2 && isEndpointProcessNeeded(ep1, h.cache) &&
!reflect.DeepEqual(ep1.Subsets, ep2.Subsets) {
util.ServiceLog.Info("controller: endpoint update event", "endpoint", util.Key(ep1))
util.ServiceLog.Info(fmt.Sprintf("endpoints before [%s], afeter [%s]",
Expand All @@ -191,7 +192,7 @@ func (h *enqueueRequestForEndpointEvent) Update(_ context.Context, e event.Updat

func (h *enqueueRequestForEndpointEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
ep, ok := e.Object.(*v1.Endpoints)
if ok && isEndpointProcessNeeded(ep, h.client) {
if ok && isEndpointProcessNeeded(ep, h.cache) {
util.ServiceLog.Info("controller: endpoint delete event", "endpoint", util.Key(ep))
h.enqueueManagedEndpoint(queue, ep)
}
Expand All @@ -211,7 +212,7 @@ func (h *enqueueRequestForEndpointEvent) enqueueManagedEndpoint(queue workqueue.
util.ServiceLog.Info("enqueue", "endpoint", util.Key(endpoint), "queueLen", queue.Len())
}

func isEndpointProcessNeeded(ep *v1.Endpoints, client client.Client) bool {
func isEndpointProcessNeeded(ep *v1.Endpoints, cache cache.Cache) bool {
if ep == nil {
return false
}
Expand All @@ -224,7 +225,7 @@ func isEndpointProcessNeeded(ep *v1.Endpoints, client client.Client) bool {
}

svc := &v1.Service{}
err := client.Get(context.TODO(),
err := cache.Get(context.TODO(),
types.NamespacedName{
Namespace: ep.GetNamespace(),
Name: ep.GetName(),
Expand Down Expand Up @@ -353,23 +354,23 @@ func (h *enqueueRequestForNodeEvent) checkServiceAffected(node *v1.Node, svc *v1
}

// NewEnqueueRequestForEndpointSliceEvent, event handler for endpointslice event
func NewEnqueueRequestForEndpointSliceEvent(client client.Client, record record.EventRecorder) *enqueueRequestForEndpointSliceEvent {
func NewEnqueueRequestForEndpointSliceEvent(cache cache.Cache, record record.EventRecorder) *enqueueRequestForEndpointSliceEvent {
return &enqueueRequestForEndpointSliceEvent{
client: client,
cache: cache,
eventRecorder: record,
}
}

type enqueueRequestForEndpointSliceEvent struct {
client client.Client
cache cache.Cache
eventRecorder record.EventRecorder
}

var _ handler.EventHandler = (*enqueueRequestForEndpointSliceEvent)(nil)

func (h *enqueueRequestForEndpointSliceEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
es, ok := e.Object.(*discovery.EndpointSlice)
if ok && isEndpointSliceProcessNeeded(es, h.client) {
if ok && isEndpointSliceProcessNeeded(es, h.cache) {
util.ServiceLog.Info("controller: endpointslice create event", "endpointslice", util.Key(es))
h.enqueueManagedEndpointSlice(queue, es)
}
Expand All @@ -379,7 +380,7 @@ func (h *enqueueRequestForEndpointSliceEvent) Update(_ context.Context, e event.
es1, ok1 := e.ObjectOld.(*discovery.EndpointSlice)
es2, ok2 := e.ObjectNew.(*discovery.EndpointSlice)

if ok1 && ok2 && isEndpointSliceProcessNeeded(es1, h.client) &&
if ok1 && ok2 && isEndpointSliceProcessNeeded(es1, h.cache) &&
isEndpointSliceUpdateNeeded(es1, es2) {
util.ServiceLog.Info("controller: endpointslice update event", "endpointslice", util.Key(es1))
util.ServiceLog.Info(fmt.Sprintf("endpoints before [%s], afeter [%s]",
Expand All @@ -390,7 +391,7 @@ func (h *enqueueRequestForEndpointSliceEvent) Update(_ context.Context, e event.

func (h *enqueueRequestForEndpointSliceEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
es, ok := e.Object.(*discovery.EndpointSlice)
if ok && isEndpointSliceProcessNeeded(es, h.client) {
if ok && isEndpointSliceProcessNeeded(es, h.cache) {
util.ServiceLog.Info("controller: endpointslice delete event", "endpointslice", util.Key(es))
h.enqueueManagedEndpointSlice(queue, es)
}
Expand All @@ -416,7 +417,7 @@ func (h *enqueueRequestForEndpointSliceEvent) enqueueManagedEndpointSlice(queue
util.ServiceLog.Info("enqueue", "endpointslice", util.Key(endpointSlice), "queueLen", queue.Len())
}

func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, client client.Client) bool {
func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, cache cache.Cache) bool {
if es == nil {
return false
}
Expand All @@ -427,7 +428,7 @@ func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, client client.Cli
}

svc := &v1.Service{}
err := client.Get(context.TODO(),
err := cache.Get(context.TODO(),
types.NamespacedName{
Namespace: es.Namespace,
Name: serviceName,
Expand Down
9 changes: 4 additions & 5 deletions pkg/controller/service/clbv1/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
prvd "k8s.io/cloud-provider-alibaba-cloud/pkg/provider"
"k8s.io/klog/v2"
"os"
"strings"
"time"
Expand All @@ -13,8 +15,6 @@ import (
discovery "k8s.io/api/discovery/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"k8s.io/cloud-provider-alibaba-cloud/pkg/controller/service/reconcile/annotation"
svcCtx "k8s.io/cloud-provider-alibaba-cloud/pkg/controller/service/reconcile/context"
"k8s.io/cloud-provider-alibaba-cloud/pkg/util/metric"
Expand All @@ -36,7 +36,6 @@ import (
"k8s.io/cloud-provider-alibaba-cloud/pkg/context/shared"
"k8s.io/cloud-provider-alibaba-cloud/pkg/controller/helper"
"k8s.io/cloud-provider-alibaba-cloud/pkg/model"
"k8s.io/cloud-provider-alibaba-cloud/pkg/provider"
"k8s.io/cloud-provider-alibaba-cloud/pkg/provider/alibaba/vpc"
"k8s.io/cloud-provider-alibaba-cloud/pkg/provider/dryrun"
"k8s.io/cloud-provider-alibaba-cloud/pkg/util"
Expand Down Expand Up @@ -108,13 +107,13 @@ func add(mgr manager.Manager, r *ReconcileService) error {
if utilfeature.DefaultFeatureGate.Enabled(ctrlCfg.EndpointSlice) {
// watch endpointslice
if err := c.Watch(source.Kind(mgr.GetCache(), &discovery.EndpointSlice{}),
NewEnqueueRequestForEndpointSliceEvent(mgr.GetClient(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
NewEnqueueRequestForEndpointSliceEvent(mgr.GetCache(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
return fmt.Errorf("watch resource endpointslice error: %s", err.Error())
}
} else {
// watch endpoints
if err := c.Watch(source.Kind(mgr.GetCache(), &v1.Endpoints{}),
NewEnqueueRequestForEndpointEvent(mgr.GetClient(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
NewEnqueueRequestForEndpointEvent(mgr.GetCache(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
return fmt.Errorf("watch resource endpoint error: %s", err.Error())
}
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/controller/service/nlbv2/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/cloud-provider-alibaba-cloud/pkg/util"
"k8s.io/klog/v2"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -149,23 +150,23 @@ func needAdd(newService *v1.Service) bool {
}

// NewEnqueueRequestForEndpointEvent, event handler for endpoint events
func NewEnqueueRequestForEndpointEvent(client client.Client, eventRecorder record.EventRecorder) *enqueueRequestForEndpointEvent {
func NewEnqueueRequestForEndpointEvent(cache cache.Cache, eventRecorder record.EventRecorder) *enqueueRequestForEndpointEvent {
return &enqueueRequestForEndpointEvent{
client: client,
cache: cache,
eventRecorder: eventRecorder,
}
}

type enqueueRequestForEndpointEvent struct {
client client.Client
cache cache.Cache
eventRecorder record.EventRecorder
}

var _ handler.EventHandler = (*enqueueRequestForEndpointEvent)(nil)

func (h *enqueueRequestForEndpointEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
ep, ok := e.Object.(*v1.Endpoints)
if ok && isEndpointProcessNeeded(ep, h.client) {
if ok && isEndpointProcessNeeded(ep, h.cache) {
util.NLBLog.Info("controller: endpoint create event", "endpoint", util.Key(ep))
h.enqueueManagedEndpoint(queue, ep)
}
Expand All @@ -175,7 +176,7 @@ func (h *enqueueRequestForEndpointEvent) Update(_ context.Context, e event.Updat
ep1, ok1 := e.ObjectOld.(*v1.Endpoints)
ep2, ok2 := e.ObjectNew.(*v1.Endpoints)

if ok1 && ok2 && isEndpointProcessNeeded(ep1, h.client) &&
if ok1 && ok2 && isEndpointProcessNeeded(ep1, h.cache) &&
!reflect.DeepEqual(ep1.Subsets, ep2.Subsets) {
util.NLBLog.Info("controller: endpoint update event", "endpoint", util.Key(ep1))
util.NLBLog.Info(fmt.Sprintf("endpoints before [%s], afeter [%s]",
Expand All @@ -186,7 +187,7 @@ func (h *enqueueRequestForEndpointEvent) Update(_ context.Context, e event.Updat

func (h *enqueueRequestForEndpointEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
ep, ok := e.Object.(*v1.Endpoints)
if ok && isEndpointProcessNeeded(ep, h.client) {
if ok && isEndpointProcessNeeded(ep, h.cache) {
util.NLBLog.Info("controller: endpoint delete event", "endpoint", util.Key(ep))
h.enqueueManagedEndpoint(queue, ep)
}
Expand All @@ -206,7 +207,7 @@ func (h *enqueueRequestForEndpointEvent) enqueueManagedEndpoint(queue workqueue.
util.NLBLog.Info("enqueue", "endpoint", util.Key(endpoint), "queueLen", queue.Len())
}

func isEndpointProcessNeeded(ep *v1.Endpoints, client client.Client) bool {
func isEndpointProcessNeeded(ep *v1.Endpoints, cache cache.Cache) bool {
if ep == nil {
return false
}
Expand All @@ -219,7 +220,7 @@ func isEndpointProcessNeeded(ep *v1.Endpoints, client client.Client) bool {
}

svc := &v1.Service{}
err := client.Get(context.TODO(),
err := cache.Get(context.TODO(),
types.NamespacedName{
Namespace: ep.GetNamespace(),
Name: ep.GetName(),
Expand Down Expand Up @@ -350,23 +351,23 @@ func (h *enqueueRequestForNodeEvent) checkServiceAffected(node *v1.Node, svc *v1
}

// NewEnqueueRequestForEndpointSliceEvent, event handler for endpointslice event
func NewEnqueueRequestForEndpointSliceEvent(client client.Client, record record.EventRecorder) *enqueueRequestForEndpointSliceEvent {
func NewEnqueueRequestForEndpointSliceEvent(cache cache.Cache, record record.EventRecorder) *enqueueRequestForEndpointSliceEvent {
return &enqueueRequestForEndpointSliceEvent{
client: client,
cache: cache,
eventRecorder: record,
}
}

type enqueueRequestForEndpointSliceEvent struct {
client client.Client
cache cache.Cache
eventRecorder record.EventRecorder
}

var _ handler.EventHandler = (*enqueueRequestForEndpointSliceEvent)(nil)

func (h *enqueueRequestForEndpointSliceEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
es, ok := e.Object.(*discovery.EndpointSlice)
if ok && isEndpointSliceProcessNeeded(es, h.client) {
if ok && isEndpointSliceProcessNeeded(es, h.cache) {
util.NLBLog.Info("controller: endpointslice create event", "endpointslice", util.Key(es))
h.enqueueManagedEndpointSlice(queue, es)
}
Expand All @@ -376,7 +377,7 @@ func (h *enqueueRequestForEndpointSliceEvent) Update(_ context.Context, e event.
es1, ok1 := e.ObjectOld.(*discovery.EndpointSlice)
es2, ok2 := e.ObjectNew.(*discovery.EndpointSlice)

if ok1 && ok2 && isEndpointSliceProcessNeeded(es1, h.client) &&
if ok1 && ok2 && isEndpointSliceProcessNeeded(es1, h.cache) &&
isEndpointSliceUpdateNeeded(es1, es2) {
util.NLBLog.Info("controller: endpointslice update event", "endpointslice", util.Key(es1))
util.NLBLog.Info(fmt.Sprintf("endpoints before [%s], afeter [%s]",
Expand All @@ -387,7 +388,7 @@ func (h *enqueueRequestForEndpointSliceEvent) Update(_ context.Context, e event.

func (h *enqueueRequestForEndpointSliceEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
es, ok := e.Object.(*discovery.EndpointSlice)
if ok && isEndpointSliceProcessNeeded(es, h.client) {
if ok && isEndpointSliceProcessNeeded(es, h.cache) {
util.NLBLog.Info("controller: endpointslice delete event", "endpointslice", util.Key(es))
h.enqueueManagedEndpointSlice(queue, es)
}
Expand All @@ -413,7 +414,7 @@ func (h *enqueueRequestForEndpointSliceEvent) enqueueManagedEndpointSlice(queue
util.NLBLog.Info("enqueue", "endpointslice", util.Key(endpointSlice), "queueLen", queue.Len())
}

func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, client client.Client) bool {
func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, cache cache.Cache) bool {
if es == nil {
return false
}
Expand All @@ -424,7 +425,7 @@ func isEndpointSliceProcessNeeded(es *discovery.EndpointSlice, client client.Cli
}

svc := &v1.Service{}
err := client.Get(context.TODO(),
err := cache.Get(context.TODO(),
types.NamespacedName{
Namespace: es.Namespace,
Name: serviceName,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/service/nlbv2/nlb_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ func add(mgr manager.Manager, r *ReconcileNLB) error {
if utilfeature.DefaultFeatureGate.Enabled(ctrlCfg.EndpointSlice) {
// watch endpointslice
if err := c.Watch(source.Kind(mgr.GetCache(), &discovery.EndpointSlice{}),
NewEnqueueRequestForEndpointSliceEvent(mgr.GetClient(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
NewEnqueueRequestForEndpointSliceEvent(mgr.GetCache(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
return fmt.Errorf("watch resource endpointslice error: %s", err.Error())
}
} else {
// watch endpoints
if err := c.Watch(source.Kind(mgr.GetCache(), &v1.Endpoints{}),
NewEnqueueRequestForEndpointEvent(mgr.GetClient(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
NewEnqueueRequestForEndpointEvent(mgr.GetCache(), mgr.GetEventRecorderFor("service-controller"))); err != nil {
return fmt.Errorf("watch resource endpoint error: %s", err.Error())
}
}
Expand Down