diff --git a/CHANGELOG.md b/CHANGELOG.md index 701d6717..403c126a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ NOTE: Breaking release in controllers. - Add multiretriever to retriever different resource types on the same controller. - Refactor metrics recorder implementation including the prometheus backend. - Refactor internal controller queue into a decorator implementation approach. +- Remove `Delete` method from `controller.Handler` and simplify to only `Handle` method ## [0.8.0] - 2019-12-11 diff --git a/controller/generic_test.go b/controller/generic_test.go index 1ec4700e..fa1b7123 100644 --- a/controller/generic_test.go +++ b/controller/generic_test.go @@ -104,7 +104,7 @@ func createNamespaceList(prefix string, q int) (*corev1.NamespaceList, []*corev1 return nsl, nss } -func TestGenericControllerHandleAdds(t *testing.T) { +func TestGenericControllerHandle(t *testing.T) { nsList, expNSAdds := createNamespaceList("testing", 10) tests := []struct { @@ -113,7 +113,7 @@ func TestGenericControllerHandleAdds(t *testing.T) { expNSAdds []*corev1.Namespace }{ { - name: "Listing multiple namespaces should call as add handlers for every namespace on list.", + name: "Listing multiple namespaces should execute the handling for every namespace on list.", nsList: nsList, expNSAdds: expNSAdds, }, @@ -134,7 +134,7 @@ func TestGenericControllerHandleAdds(t *testing.T) { callHandling := 0 // used to track the number of calls. mh := &mcontroller.Handler{} for _, ns := range test.expNSAdds { - mh.On("Add", mock.Anything, ns).Once().Return(nil).Run(func(args mock.Arguments) { + mh.On("Handle", mock.Anything, ns).Once().Return(nil).Run(func(args mock.Arguments) { callHandling++ // Check last call, if is the last call expected then stop the controller so // we can assert the expectations of the calls and finish the test. @@ -172,80 +172,6 @@ func TestGenericControllerHandleAdds(t *testing.T) { } } -func TestGenericControllerHandleDeletes(t *testing.T) { - - startNSList, expNSAdds := createNamespaceList("testing", 10) - nsDels := []*corev1.Namespace{expNSAdds[0], expNSAdds[4], expNSAdds[1]} - - tests := []struct { - name string - startNSList *corev1.NamespaceList - deleteNs []*corev1.Namespace - expDeleteNs []*corev1.Namespace - }{ - { - name: "Deleting multiple namespaces should call as delete handlers for every namespace on deleted.", - startNSList: startNSList, - deleteNs: nsDels, - expDeleteNs: nsDels, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - assert := assert.New(t) - require := require.New(t) - controllerStopperC := make(chan struct{}) - resultC := make(chan error) - - // Mocks kubernetes client. - mc := &fake.Clientset{} - // Populate cache so we ensure deletes are correctly delivered. - onKubeClientListNamespaceReturn(mc, test.startNSList) - onKubeClientWatchNamespaceReturn(mc, nil, nil, test.deleteNs) - - // Mock our handler and set expects. - callHandling := 0 // used to track the number of calls. - mh := &mcontroller.Handler{} - mh.On("Add", mock.Anything, mock.Anything).Return(nil) - for _, ns := range test.expDeleteNs { - mh.On("Delete", mock.Anything, ns.ObjectMeta.Name).Once().Return(nil).Run(func(args mock.Arguments) { - // Check last call, if is the last call expected then stop the controller so - // we can assert the expectations of the calls and finish the test. - callHandling++ - if callHandling == len(test.expDeleteNs) { - close(controllerStopperC) - } - }) - } - - c, err := controller.New(&controller.Config{ - Name: "test", - Handler: mh, - Retriever: newNamespaceRetriever(mc), - Logger: log.Dummy, - }) - require.NoError(err) - - // Run Controller in background. - go func() { - resultC <- c.Run(controllerStopperC) - }() - - // Wait for different results. If no result means error failure. - select { - case err := <-resultC: - if assert.NoError(err) { - // Check handles from the controller. - mh.AssertExpectations(t) - } - case <-time.After(1 * time.Second): - assert.Fail("timeout waiting for controller handling, this could mean the controller is not receiving resources") - } - }) - } -} - func TestGenericControllerErrorRetries(t *testing.T) { nsList, _ := createNamespaceList("testing", 11) @@ -281,7 +207,7 @@ func TestGenericControllerErrorRetries(t *testing.T) { // Expect all the retries for range test.nsList.Items { callsPerNS := test.retryNumber + 1 // initial call + retries. - mh.On("Add", mock.Anything, mock.Anything).Return(err).Times(callsPerNS).Run(func(args mock.Arguments) { + mh.On("Handle", mock.Anything, mock.Anything).Return(err).Times(callsPerNS).Run(func(args mock.Arguments) { totalCalls-- // Check last call, if is the last call expected then stop the controller so // we can assert the expectations of the calls and finish the test. @@ -350,7 +276,7 @@ func TestGenericControllerWithLeaderElection(t *testing.T) { // Expect the calls on the lead (mh1) and no calls on the other ones. totalCalls := len(test.nsList.Items) - mh1.On("Add", mock.Anything, mock.Anything).Return(nil).Times(totalCalls).Run(func(args mock.Arguments) { + mh1.On("Handle", mock.Anything, mock.Anything).Return(nil).Times(totalCalls).Run(func(args mock.Arguments) { totalCalls-- // Check last call, if is the last call expected then stop the controller so // we can assert the expectations of the calls and finish the test. diff --git a/controller/handler.go b/controller/handler.go index c6a53077..5cf7abe7 100644 --- a/controller/handler.go +++ b/controller/handler.go @@ -9,35 +9,16 @@ import ( // Handler knows how to handle the received resources from a kubernetes cluster. type Handler interface { - Add(context.Context, runtime.Object) error - Delete(context.Context, string) error + Handle(context.Context, runtime.Object) error } -// AddFunc knows how to handle resource adds. -type AddFunc func(context.Context, runtime.Object) error +// HandlerFunc knows how to handle resource adds. +type HandlerFunc func(context.Context, runtime.Object) error -// DeleteFunc knows how to handle resource deletes. -type DeleteFunc func(context.Context, string) error - -// HandlerFunc is a handler that is created from functions that the -// Handler interface requires. -type HandlerFunc struct { - AddFunc AddFunc - DeleteFunc DeleteFunc -} - -// Add satisfies Handler interface. -func (h *HandlerFunc) Add(ctx context.Context, obj runtime.Object) error { - if h.AddFunc == nil { - return fmt.Errorf("function can't be nil") - } - return h.AddFunc(ctx, obj) -} - -// Delete satisfies Handler interface. -func (h *HandlerFunc) Delete(ctx context.Context, s string) error { - if h.DeleteFunc == nil { - return fmt.Errorf("function can't be nil") +// Handle satisfies controller.Handler interface. +func (h HandlerFunc) Handle(ctx context.Context, obj runtime.Object) error { + if h == nil { + return fmt.Errorf("handle func is required") } - return h.DeleteFunc(ctx, s) + return h(ctx, obj) } diff --git a/controller/processor.go b/controller/processor.go index 615a1925..9910bf7f 100644 --- a/controller/processor.go +++ b/controller/processor.go @@ -32,12 +32,11 @@ func newIndexerProcessor(indexer cache.Indexer, handler Handler) processor { return err } - // Handle the object. if !exists { - return handler.Delete(ctx, key) + return nil } - return handler.Add(ctx, obj.(runtime.Object)) + return handler.Handle(ctx, obj.(runtime.Object)) }) } diff --git a/examples/config-custom-controller/main.go b/examples/config-custom-controller/main.go index 1e84353f..3d7b7723 100644 --- a/examples/config-custom-controller/main.go +++ b/examples/config-custom-controller/main.go @@ -55,17 +55,11 @@ func run() error { }) // Our domain logic that will print every add/sync/update and delete event we . - hand := &controller.HandlerFunc{ - AddFunc: func(_ context.Context, obj runtime.Object) error { - pod := obj.(*corev1.Pod) - logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) - return nil - }, - DeleteFunc: func(_ context.Context, s string) error { - logger.Infof("Pod deleted: %s", s) - return nil - }, - } + hand := controller.HandlerFunc(func(_ context.Context, obj runtime.Object) error { + pod := obj.(*corev1.Pod) + logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) + return nil + }) // Create the controller with custom configuration. cfg := &controller.Config{ diff --git a/examples/controller-concurrency-handling/main.go b/examples/controller-concurrency-handling/main.go index 27e88ec4..4abbdaeb 100644 --- a/examples/controller-concurrency-handling/main.go +++ b/examples/controller-concurrency-handling/main.go @@ -102,19 +102,12 @@ func run() error { }) // Our domain logic that will print every add/sync/update and delete event we. - hand := &controller.HandlerFunc{ - AddFunc: func(_ context.Context, obj runtime.Object) error { - pod := obj.(*corev1.Pod) - sleep() - logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) - return nil - }, - DeleteFunc: func(_ context.Context, s string) error { - sleep() - logger.Infof("Pod deleted: %s", s) - return nil - }, - } + hand := controller.HandlerFunc(func(_ context.Context, obj runtime.Object) error { + pod := obj.(*corev1.Pod) + sleep() + logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) + return nil + }) // Create the controller. cfg := &controller.Config{ diff --git a/examples/leader-election-controller/main.go b/examples/leader-election-controller/main.go index 01c9873f..71a207e3 100644 --- a/examples/leader-election-controller/main.go +++ b/examples/leader-election-controller/main.go @@ -87,17 +87,11 @@ func run() error { }) // Our domain logic that will print every add/sync/update and delete event we . - hand := &controller.HandlerFunc{ - AddFunc: func(_ context.Context, obj runtime.Object) error { - pod := obj.(*corev1.Pod) - logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) - return nil - }, - DeleteFunc: func(_ context.Context, s string) error { - logger.Infof("Pod deleted: %s", s) - return nil - }, - } + hand := controller.HandlerFunc(func(_ context.Context, obj runtime.Object) error { + pod := obj.(*corev1.Pod) + logger.Infof("Pod added: %s/%s", pod.Namespace, pod.Name) + return nil + }) // Leader election service. lesvc, err := leaderelection.NewDefault(leaderElectionKey, fl.Namespace, k8scli, logger) diff --git a/examples/metrics-controller/main.go b/examples/metrics-controller/main.go index 256315ad..68b391ee 100644 --- a/examples/metrics-controller/main.go +++ b/examples/metrics-controller/main.go @@ -114,16 +114,10 @@ func run() error { }) // Our domain logic that will print every add/sync/update and delete event we . - hand := &controller.HandlerFunc{ - AddFunc: func(_ context.Context, obj runtime.Object) error { - sleepRandomly() - return errRandomly() - }, - DeleteFunc: func(_ context.Context, s string) error { - sleepRandomly() - return errRandomly() - }, - } + hand := controller.HandlerFunc(func(_ context.Context, obj runtime.Object) error { + sleepRandomly() + return errRandomly() + }) // Create the controller that will refresh every 30 seconds. cfg := &controller.Config{ diff --git a/examples/multi-resource-controller/main.go b/examples/multi-resource-controller/main.go index 28d1dde3..ff9d72c8 100644 --- a/examples/multi-resource-controller/main.go +++ b/examples/multi-resource-controller/main.go @@ -73,26 +73,21 @@ func run() error { } // Our domain logic that will print every add/sync/update and delete event we . - hand := &controller.HandlerFunc{ - AddFunc: func(_ context.Context, obj runtime.Object) error { - dep, ok := obj.(*appsv1.Deployment) - if ok { - logger.Infof("Deployment added: %s/%s", dep.Namespace, dep.Name) - return nil - } - - st, ok := obj.(*appsv1.StatefulSet) - if ok { - logger.Infof("Statefulset added: %s/%s", st.Namespace, st.Name) - return nil - } - + hand := controller.HandlerFunc(func(_ context.Context, obj runtime.Object) error { + dep, ok := obj.(*appsv1.Deployment) + if ok { + logger.Infof("Deployment added: %s/%s", dep.Namespace, dep.Name) return nil - }, - DeleteFunc: func(_ context.Context, s string) error { + } + + st, ok := obj.(*appsv1.StatefulSet) + if ok { + logger.Infof("Statefulset added: %s/%s", st.Namespace, st.Name) return nil - }, - } + } + + return nil + }) // Create the controller with custom configuration. cfg := &controller.Config{ diff --git a/examples/pod-terminator-operator/apis/chaos/v1alpha1/types.go b/examples/pod-terminator-operator/apis/chaos/v1alpha1/types.go index 54fba233..5c3fed5f 100644 --- a/examples/pod-terminator-operator/apis/chaos/v1alpha1/types.go +++ b/examples/pod-terminator-operator/apis/chaos/v1alpha1/types.go @@ -4,11 +4,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// PodTerminator represents a pod terminator. +// // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object - -// PodTerminator represents a pod terminator. +// +kubebuilder:resource:singular=podterminator,path=podterminators,shortName=pt,scope=Cluster,categories=terminators;killers;gc type PodTerminator struct { metav1.TypeMeta `json:",inline"` // Standard object's metadata. diff --git a/examples/pod-terminator-operator/manifest-examples/pause.yaml b/examples/pod-terminator-operator/manifest-examples/pause.yaml index 63e5d8f5..3ca0001a 100644 --- a/examples/pod-terminator-operator/manifest-examples/pause.yaml +++ b/examples/pod-terminator-operator/manifest-examples/pause.yaml @@ -1,7 +1,8 @@ -apiVersion: apps/v1beta2 +apiVersion: apps/v1 kind: Deployment metadata: name: pause-pods + namespace: kooper-test labels: application: pause spec: diff --git a/examples/pod-terminator-operator/manifests/chaos.spotahome.com_podterminators.yaml b/examples/pod-terminator-operator/manifests/chaos.spotahome.com_podterminators.yaml index b7db9ddd..bf4f7308 100644 --- a/examples/pod-terminator-operator/manifests/chaos.spotahome.com_podterminators.yaml +++ b/examples/pod-terminator-operator/manifests/chaos.spotahome.com_podterminators.yaml @@ -10,11 +10,17 @@ metadata: spec: group: chaos.spotahome.com names: + categories: + - terminators + - killers + - gc kind: PodTerminator listKind: PodTerminatorList plural: podterminators + shortNames: + - pt singular: podterminator - scope: Namespaced + scope: Cluster validation: openAPIV3Schema: description: PodTerminator represents a pod terminator. diff --git a/examples/pod-terminator-operator/operator/operator.go b/examples/pod-terminator-operator/operator/operator.go index 3d591517..88e53e92 100644 --- a/examples/pod-terminator-operator/operator/operator.go +++ b/examples/pod-terminator-operator/operator/operator.go @@ -27,7 +27,8 @@ type Config struct { // New returns pod terminator operator. func New(cfg Config, podTermCli podtermk8scli.Interface, kubeCli kubernetes.Interface, logger log.Logger) (controller.Controller, error) { return controller.New(&controller.Config{ - Handler: newHandler(kubeCli, logger), + Name: "pod-terminator", + Handler: newHandler(kubeCli, podTermCli, logger), Retriever: newRetriever(podTermCli), Logger: logger, @@ -46,27 +47,66 @@ func newRetriever(cli podtermk8scli.Interface) controller.Retriever { }) } -type handler struct { - chaosService chaos.Syncer - logger log.Logger -} +func newHandler(k8sCli kubernetes.Interface, ptCli podtermk8scli.Interface, logger log.Logger) controller.Handler { + const finalizer = "finalizer.chaos.spotahome.com/podKiller" + chaossvc := chaos.NewChaos(k8sCli, logger) -func newHandler(k8sCli kubernetes.Interface, logger log.Logger) *handler { - return &handler{ - chaosService: chaos.NewChaos(k8sCli, logger), - logger: logger, - } + return controller.HandlerFunc(func(_ context.Context, obj runtime.Object) error { + pt, ok := obj.(*chaosv1alpha1.PodTerminator) + if !ok { + return fmt.Errorf("%v is not a pod terminator object", obj.GetObjectKind()) + } + + switch { + // Handle deletion and remove finalizer. + case !pt.DeletionTimestamp.IsZero() && stringPresentInSlice(pt.Finalizers, finalizer): + logger.Infof("handling pod termination deletion...") + err := chaossvc.DeletePodTerminator(pt.ObjectMeta.Name) + if err != nil { + return fmt.Errorf("could not handle PodTerminator deletion: %w", err) + } + + pt.Finalizers = removeStringFromSlice(pt.Finalizers, finalizer) + _, err = ptCli.ChaosV1alpha1().PodTerminators().Update(pt) + if err != nil { + return fmt.Errorf("could not update pod terminator: %w", err) + } + + return nil + + // Deletion already handled, don't do anything. + case !pt.DeletionTimestamp.IsZero() && !stringPresentInSlice(pt.Finalizers, finalizer): + logger.Infof("handling pod termination deletion already handled, skipping...") + return nil + + // Add finalizer to the object. + case pt.DeletionTimestamp.IsZero() && !stringPresentInSlice(pt.Finalizers, finalizer): + pt.Finalizers = append(pt.Finalizers, finalizer) + _, err := ptCli.ChaosV1alpha1().PodTerminators().Update(pt) + if err != nil { + return fmt.Errorf("could not update pod termiantor: %w", err) + } + } + + // Handle. + return chaossvc.EnsurePodTerminator(pt) + }) } -func (h handler) Add(_ context.Context, obj runtime.Object) error { - pt, ok := obj.(*chaosv1alpha1.PodTerminator) - if !ok { - return fmt.Errorf("%v is not a pod terminator object", obj.GetObjectKind()) +func stringPresentInSlice(ss []string, s string) bool { + for _, f := range ss { + if f == s { + return true + } } - - return h.chaosService.EnsurePodTerminator(pt) + return false } -func (h handler) Delete(_ context.Context, name string) error { - return h.chaosService.DeletePodTerminator(name) +func removeStringFromSlice(ss []string, s string) []string { + for i, f := range ss { + if f == s { + return append(ss[:i], ss[i+1:]...) + } + } + return ss } diff --git a/examples/pod-terminator-operator/service/chaos/chaos.go b/examples/pod-terminator-operator/service/chaos/chaos.go index e6f32465..02971c8f 100644 --- a/examples/pod-terminator-operator/service/chaos/chaos.go +++ b/examples/pod-terminator-operator/service/chaos/chaos.go @@ -59,7 +59,6 @@ func (c *Chaos) EnsurePodTerminator(pt *chaosv1alpha1.PodTerminator) error { pk = NewPodKiller(ptCopy, c.k8sCli, c.logger) c.reg.Store(pt.Name, pk) return pk.Start() - // TODO: garbage collection. } // DeletePodTerminator satisfies ChaosSyncer interface. diff --git a/mocks/controller/Handler.go b/mocks/controller/Handler.go index ae12ac23..ebe15dfc 100644 --- a/mocks/controller/Handler.go +++ b/mocks/controller/Handler.go @@ -15,8 +15,8 @@ type Handler struct { mock.Mock } -// Add provides a mock function with given fields: _a0, _a1 -func (_m *Handler) Add(_a0 context.Context, _a1 runtime.Object) error { +// Handle provides a mock function with given fields: _a0, _a1 +func (_m *Handler) Handle(_a0 context.Context, _a1 runtime.Object) error { ret := _m.Called(_a0, _a1) var r0 error @@ -28,17 +28,3 @@ func (_m *Handler) Add(_a0 context.Context, _a1 runtime.Object) error { return r0 } - -// Delete provides a mock function with given fields: _a0, _a1 -func (_m *Handler) Delete(_a0 context.Context, _a1 string) error { - ret := _m.Called(_a0, _a1) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(_a0, _a1) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/test/integration/controller/controller_test.go b/test/integration/controller/controller_test.go index b7bc9766..8164875e 100644 --- a/test/integration/controller/controller_test.go +++ b/test/integration/controller/controller_test.go @@ -4,7 +4,6 @@ package controller_test import ( "context" - "strings" "sync" "testing" "time" @@ -27,12 +26,10 @@ import ( // events are received and handled correctly. func TestControllerHandleEvents(t *testing.T) { tests := []struct { - name string - addServices []*corev1.Service - updateServices []string - delServices []string - expAddedServices []string - expDeletedServices []string + name string + addServices []*corev1.Service + updateServices []string + expAddedServices []string }{ { name: "If a controller is watching services it should react to the service change events.", @@ -56,10 +53,8 @@ func TestControllerHandleEvents(t *testing.T) { }, }, }, - updateServices: []string{"svc1"}, - delServices: []string{"svc1", "svc2"}, - expAddedServices: []string{"svc1", "svc2", "svc1"}, - expDeletedServices: []string{"svc1", "svc2"}, + updateServices: []string{"svc1"}, + expAddedServices: []string{"svc1", "svc2", "svc1"}, }, } @@ -70,7 +65,6 @@ func TestControllerHandleEvents(t *testing.T) { resync := 30 * time.Second stopC := make(chan struct{}) var gotAddedServices []string - var gotDeletedServices []string // Create the kubernetes client. k8scli, err := cli.GetK8sClient("") @@ -86,38 +80,23 @@ func TestControllerHandleEvents(t *testing.T) { rt := controller.MustRetrieverFromListerWatcher(cache.NewListWatchFromClient(k8scli.CoreV1().RESTClient(), "services", prep.Namespace().Name, fields.Everything())) // Call times are the number of times the handler should be called before sending the termination signal. - stopCallTimes := len(test.addServices) + len(test.updateServices) + len(test.delServices) + stopCallTimes := len(test.addServices) + len(test.updateServices) calledTimes := 0 var mx sync.Mutex // Create the handler. - hl := &controller.HandlerFunc{ - AddFunc: func(_ context.Context, obj runtime.Object) error { - mx.Lock() - calledTimes++ - mx.Unlock() - - svc := obj.(*corev1.Service) - gotAddedServices = append(gotAddedServices, svc.Name) - if calledTimes >= stopCallTimes { - close(stopC) - } - return nil - }, - DeleteFunc: func(_ context.Context, id string) error { - mx.Lock() - calledTimes++ - mx.Unlock() - - // Ignore namespace. - id = strings.Split(id, "/")[1] - gotDeletedServices = append(gotDeletedServices, id) - if calledTimes >= stopCallTimes { - close(stopC) - } - return nil - }, - } + hl := controller.HandlerFunc(func(_ context.Context, obj runtime.Object) error { + mx.Lock() + calledTimes++ + mx.Unlock() + + svc := obj.(*corev1.Service) + gotAddedServices = append(gotAddedServices, svc.Name) + if calledTimes >= stopCallTimes { + close(stopC) + } + return nil + }) // Create a Pod controller. cfg := &controller.Config{ @@ -149,13 +128,6 @@ func TestControllerHandleEvents(t *testing.T) { } } - // Delete the required services. - for _, svc := range test.delServices { - err := k8scli.CoreV1().Services(prep.Namespace().Name).Delete(svc, &metav1.DeleteOptions{}) - assert.NoError(err) - time.Sleep(1 * time.Second) - } - // Wait until we have finished. select { // Timeout. @@ -166,7 +138,6 @@ func TestControllerHandleEvents(t *testing.T) { // Check. assert.Equal(test.expAddedServices, gotAddedServices) - assert.Equal(test.expDeletedServices, gotDeletedServices) }) } } diff --git a/test/integration/controller/generic_test.go b/test/integration/controller/generic_test.go index bf84858f..0f9c1be7 100644 --- a/test/integration/controller/generic_test.go +++ b/test/integration/controller/generic_test.go @@ -64,17 +64,11 @@ func runTimedController(sleepDuration time.Duration, concurrencyLevel int, numbe var wg sync.WaitGroup wg.Add(numberOfEvents) - h := &controller.HandlerFunc{ - AddFunc: func(_ context.Context, _ runtime.Object) error { - time.Sleep(sleepDuration) - wg.Done() - return nil - }, - DeleteFunc: func(_ context.Context, _ string) error { - assert.Fail("delete events should not be used on this test") - return nil - }, - } + h := controller.HandlerFunc(func(_ context.Context, _ runtime.Object) error { + time.Sleep(sleepDuration) + wg.Done() + return nil + }) // Create the controller type depending on the concurrency level. cfg := &controller.Config{