From fbbca3d5222f680336a52c21d1442eb87dc1b586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Wed, 1 May 2024 13:59:53 +0000 Subject: [PATCH] [chore] Use informer to track collector Pods in target allocator (#2528) * Use informer to track collector Pods in target allocator * Rename CollectorWatcher to Watcher --- cmd/otel-allocator/collector/collector.go | 144 ++++++++-------- .../collector/collector_test.go | 160 +++++++----------- cmd/otel-allocator/main.go | 6 +- 3 files changed, 129 insertions(+), 181 deletions(-) diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index 68d8db6038..8814e38797 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -15,7 +15,6 @@ package collector import ( - "context" "os" "time" @@ -24,15 +23,16 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) const ( - watcherTimeout = 15 * time.Minute + defaultMinUpdateInterval = time.Second * 5 ) var ( @@ -43,110 +43,102 @@ var ( }) ) -type Client struct { - log logr.Logger - k8sClient kubernetes.Interface - close chan struct{} +type Watcher struct { + log logr.Logger + k8sClient kubernetes.Interface + close chan struct{} + minUpdateInterval time.Duration } -func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) { +func NewCollectorWatcher(logger logr.Logger, kubeConfig *rest.Config) (*Watcher, error) { clientset, err := kubernetes.NewForConfig(kubeConfig) if err != nil { - return &Client{}, err + return &Watcher{}, err } - return &Client{ - log: logger.WithValues("component", "opentelemetry-targetallocator"), - k8sClient: clientset, - close: make(chan struct{}), + return &Watcher{ + log: logger.WithValues("component", "opentelemetry-targetallocator"), + k8sClient: clientset, + close: make(chan struct{}), + minUpdateInterval: defaultMinUpdateInterval, }, nil } -func (k *Client) Watch(ctx context.Context, labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector)) error { - collectorMap := map[string]*allocation.Collector{} - +func (k *Watcher) Watch(labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector)) error { selector, err := metav1.LabelSelectorAsSelector(labelSelector) if err != nil { return err } - opts := metav1.ListOptions{ - LabelSelector: selector.String(), - } - pods, err := k.k8sClient.CoreV1().Pods(ns).List(ctx, opts) - if err != nil { - k.log.Error(err, "Pod failure") - os.Exit(1) - } - for i := range pods.Items { - pod := pods.Items[i] - if pod.GetObjectMeta().GetDeletionTimestamp() == nil { - collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) - } + + listOptionsFunc := func(listOptions *metav1.ListOptions) { + listOptions.LabelSelector = selector.String() } + informerFactory := informers.NewSharedInformerFactoryWithOptions( + k.k8sClient, + time.Second*30, + informers.WithNamespace(ns), + informers.WithTweakListOptions(listOptionsFunc)) + informer := informerFactory.Core().V1().Pods().Informer() - fn(collectorMap) + notify := make(chan struct{}, 1) + go k.rateLimitedCollectorHandler(notify, informer.GetStore(), fn) - for { - if !k.restartWatch(ctx, opts, collectorMap, fn) { - return nil + notifyFunc := func(_ interface{}) { + select { + case notify <- struct{}{}: + default: } } -} - -func (k *Client) restartWatch(ctx context.Context, opts metav1.ListOptions, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) bool { - // add timeout to the context before calling Watch - ctx, cancel := context.WithTimeout(ctx, watcherTimeout) - defer cancel() - watcher, err := k.k8sClient.CoreV1().Pods(ns).Watch(ctx, opts) + _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: notifyFunc, + UpdateFunc: func(oldObj, newObj interface{}) { + notifyFunc(newObj) + }, + DeleteFunc: notifyFunc, + }) if err != nil { - k.log.Error(err, "unable to create collector pod watcher") - return false - } - k.log.Info("Successfully started a collector pod watcher") - if msg := runWatch(ctx, k, watcher.ResultChan(), collectorMap, fn); msg != "" { - k.log.Info("Collector pod watch event stopped " + msg) - return false + return err } - return true + informer.Run(k.close) + return nil } -func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) string { +// rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel, +// but not more frequently than once per k.eventPeriod. +func (k *Watcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) { + ticker := time.NewTicker(k.minUpdateInterval) + defer ticker.Stop() + for { - collectorsDiscovered.Set(float64(len(collectorMap))) select { case <-k.close: - return "kubernetes client closed" - case <-ctx.Done(): - return "" - case event, ok := <-c: - if !ok { - k.log.Info("No event found. Restarting watch routine") - return "" - } - - pod, ok := event.Object.(*v1.Pod) - if !ok { - k.log.Info("No pod found in event Object. Restarting watch routine") - return "" - } - - if pod.Spec.NodeName == "" { - k.log.Info("Node name is missing from the spec. Restarting watch routine") - return "" + return + case <-ticker.C: // throttle events to avoid excessive updates + select { + case <-notify: + k.runOnCollectors(store, fn) + default: } + } + } +} - switch event.Type { //nolint:exhaustive - case watch.Added: - collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) - case watch.Deleted: - delete(collectorMap, pod.Name) - } - fn(collectorMap) +// runOnCollectors runs the provided function on the set of collectors from the Store. +func (k *Watcher) runOnCollectors(store cache.Store, fn func(collectors map[string]*allocation.Collector)) { + collectorMap := map[string]*allocation.Collector{} + objects := store.List() + for _, obj := range objects { + pod := obj.(*v1.Pod) + if pod.Spec.NodeName == "" { + continue } + collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) } + collectorsDiscovered.Set(float64(len(collectorMap))) + fn(collectorMap) } -func (k *Client) Close() { +func (k *Watcher) Close() { close(k.close) } diff --git a/cmd/otel-allocator/collector/collector_test.go b/cmd/otel-allocator/collector/collector_test.go index 6ad3318944..6bffd83212 100644 --- a/cmd/otel-allocator/collector/collector_test.go +++ b/cmd/otel-allocator/collector/collector_test.go @@ -16,16 +16,15 @@ package collector import ( "context" - "fmt" - "os" "sync" "testing" "time" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/fake" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -33,42 +32,30 @@ import ( ) var logger = logf.Log.WithName("collector-unit-tests") +var labelMap = map[string]string{ + "app.kubernetes.io/instance": "default.test", + "app.kubernetes.io/managed-by": "opentelemetry-operator", +} +var labelSelector = metav1.LabelSelector{ + MatchLabels: labelMap, +} -func getTestClient() (Client, watch.Interface) { - kubeClient := Client{ - k8sClient: fake.NewSimpleClientset(), - close: make(chan struct{}), - log: logger, - } - labelSelector := metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app.kubernetes.io/instance": "default.test", - "app.kubernetes.io/managed-by": "opentelemetry-operator", - }, - } - selector, _ := metav1.LabelSelectorAsSelector(&labelSelector) - - opts := metav1.ListOptions{ - LabelSelector: selector.String(), +func getTestPodWatcher() Watcher { + podWatcher := Watcher{ + k8sClient: fake.NewSimpleClientset(), + close: make(chan struct{}), + log: logger, + minUpdateInterval: time.Millisecond, } - watcher, err := kubeClient.k8sClient.CoreV1().Pods("test-ns").Watch(context.Background(), opts) - if err != nil { - fmt.Printf("failed to setup a Collector Pod watcher: %v", err) - os.Exit(1) - } - return kubeClient, watcher + return podWatcher } func pod(name string) *v1.Pod { - labelSet := make(map[string]string) - labelSet["app.kubernetes.io/instance"] = "default.test" - labelSet["app.kubernetes.io/managed-by"] = "opentelemetry-operator" - return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: "test-ns", - Labels: labelSet, + Labels: labelMap, }, Spec: v1.PodSpec{ NodeName: "test-node", @@ -78,7 +65,7 @@ func pod(name string) *v1.Pod { func Test_runWatch(t *testing.T) { type args struct { - kubeFn func(t *testing.T, client Client, group *sync.WaitGroup) + kubeFn func(t *testing.T, podWatcher Watcher) collectorMap map[string]*allocation.Collector } tests := []struct { @@ -89,11 +76,10 @@ func Test_runWatch(t *testing.T) { { name: "pod add", args: args{ - kubeFn: func(t *testing.T, client Client, group *sync.WaitGroup) { + kubeFn: func(t *testing.T, podWatcher Watcher) { for _, k := range []string{"test-pod1", "test-pod2", "test-pod3"} { p := pod(k) - group.Add(1) - _, err := client.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) + _, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) assert.NoError(t, err) } }, @@ -117,10 +103,9 @@ func Test_runWatch(t *testing.T) { { name: "pod delete", args: args{ - kubeFn: func(t *testing.T, client Client, group *sync.WaitGroup) { + kubeFn: func(t *testing.T, podWatcher Watcher) { for _, k := range []string{"test-pod2", "test-pod3"} { - group.Add(1) - err := client.k8sClient.CoreV1().Pods("test-ns").Delete(context.Background(), k, metav1.DeleteOptions{}) + err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Delete(context.Background(), k, metav1.DeleteOptions{}) assert.NoError(t, err) } }, @@ -149,83 +134,54 @@ func Test_runWatch(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - kubeClient, watcher := getTestClient() + podWatcher := getTestPodWatcher() defer func() { - close(kubeClient.close) - watcher.Stop() + close(podWatcher.close) }() - var wg sync.WaitGroup - actual := make(map[string]*allocation.Collector) + var actual map[string]*allocation.Collector + mapMutex := sync.Mutex{} for _, k := range tt.args.collectorMap { p := pod(k.Name) - _, err := kubeClient.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) - wg.Add(1) + _, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) assert.NoError(t, err) } - go runWatch(context.Background(), &kubeClient, watcher.ResultChan(), map[string]*allocation.Collector{}, func(colMap map[string]*allocation.Collector) { - actual = colMap - wg.Done() - }) - - tt.args.kubeFn(t, kubeClient, &wg) - wg.Wait() - - assert.Len(t, actual, len(tt.want)) - assert.Equal(t, actual, tt.want) + go func(podWatcher Watcher) { + err := podWatcher.Watch(&labelSelector, func(colMap map[string]*allocation.Collector) { + mapMutex.Lock() + defer mapMutex.Unlock() + actual = colMap + }) + require.NoError(t, err) + }(podWatcher) + + tt.args.kubeFn(t, podWatcher) + + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + mapMutex.Lock() + assert.Len(collect, actual, len(tt.want)) + assert.Equal(collect, actual, tt.want) + defer mapMutex.Unlock() + }, time.Second, time.Millisecond) + + // check if the metrics were emitted correctly + assert.Equal(t, testutil.ToFloat64(collectorsDiscovered), float64(len(actual))) }) } } -// this tests runWatch in the case of watcher channel closing and watcher timing out. +// this tests runWatch in the case of watcher channel closing. func Test_closeChannel(t *testing.T) { - tests := []struct { - description string - isCloseChannel bool - timeoutSeconds time.Duration - }{ - { - // event is triggered by channel closing. - description: "close_channel", - isCloseChannel: true, - // channel should be closed before this timeout occurs - timeoutSeconds: 10 * time.Second, - }, - { - // event triggered by timeout. - description: "watcher_timeout", - isCloseChannel: false, - timeoutSeconds: 0 * time.Second, - }, - } + podWatcher := getTestPodWatcher() - for _, tc := range tests { - t.Run(tc.description, func(t *testing.T) { - kubeClient, watcher := getTestClient() + var wg sync.WaitGroup + wg.Add(1) - defer func() { - close(kubeClient.close) - watcher.Stop() - }() - var wg sync.WaitGroup - wg.Add(1) - terminated := false - - go func(watcher watch.Interface) { - defer wg.Done() - ctx, cancel := context.WithTimeout(context.Background(), tc.timeoutSeconds) - defer cancel() - if msg := runWatch(ctx, &kubeClient, watcher.ResultChan(), map[string]*allocation.Collector{}, func(colMap map[string]*allocation.Collector) {}); msg != "" { - terminated = true - return - } - }(watcher) + go func(podWatcher Watcher) { + defer wg.Done() + err := podWatcher.Watch(&labelSelector, func(colMap map[string]*allocation.Collector) {}) + require.NoError(t, err) + }(podWatcher) - if tc.isCloseChannel { - // stop pod watcher to trigger event. - watcher.Stop() - } - wg.Wait() - assert.False(t, terminated) - }) - } + podWatcher.Close() + wg.Wait() } diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index a8e57da102..efb43e541c 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -53,7 +53,7 @@ func main() { allocatorPrehook prehook.Hook allocator allocation.Allocator discoveryManager *discovery.Manager - collectorWatcher *collector.Client + collectorWatcher *collector.Watcher promWatcher allocatorWatcher.Watcher targetDiscoverer *target.Discoverer @@ -97,7 +97,7 @@ func main() { discoveryManager = discovery.NewManager(discoveryCtx, gokitlog.NewNopLogger(), prometheus.DefaultRegisterer, sdMetrics) targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv) - collectorWatcher, collectorWatcherErr := collector.NewClient(log, cfg.ClusterConfig) + collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, cfg.ClusterConfig) if collectorWatcherErr != nil { setupLog.Error(collectorWatcherErr, "Unable to initialize collector watcher") os.Exit(1) @@ -169,7 +169,7 @@ func main() { }) runGroup.Add( func() error { - err := collectorWatcher.Watch(ctx, cfg.CollectorSelector, allocator.SetCollectors) + err := collectorWatcher.Watch(cfg.CollectorSelector, allocator.SetCollectors) setupLog.Info("Collector watcher exited") return err },