Skip to content

Commit

Permalink
[chore] Use informer to track collector Pods in target allocator (#2528)
Browse files Browse the repository at this point in the history
* Use informer to track collector Pods in target allocator

* Rename CollectorWatcher to Watcher
  • Loading branch information
swiatekm authored May 1, 2024
1 parent dc5dda9 commit fbbca3d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 181 deletions.
144 changes: 68 additions & 76 deletions cmd/otel-allocator/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package collector

import (
"context"
"os"
"time"

Expand All @@ -24,15 +23,16 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation"
)

const (
watcherTimeout = 15 * time.Minute
defaultMinUpdateInterval = time.Second * 5
)

var (
Expand All @@ -43,110 +43,102 @@ var (
})
)

type Client struct {
log logr.Logger
k8sClient kubernetes.Interface
close chan struct{}
type Watcher struct {
log logr.Logger
k8sClient kubernetes.Interface
close chan struct{}
minUpdateInterval time.Duration
}

func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) {
func NewCollectorWatcher(logger logr.Logger, kubeConfig *rest.Config) (*Watcher, error) {
clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return &Client{}, err
return &Watcher{}, err
}

return &Client{
log: logger.WithValues("component", "opentelemetry-targetallocator"),
k8sClient: clientset,
close: make(chan struct{}),
return &Watcher{
log: logger.WithValues("component", "opentelemetry-targetallocator"),
k8sClient: clientset,
close: make(chan struct{}),
minUpdateInterval: defaultMinUpdateInterval,
}, nil
}

func (k *Client) Watch(ctx context.Context, labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector)) error {
collectorMap := map[string]*allocation.Collector{}

func (k *Watcher) Watch(labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector)) error {
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return err
}
opts := metav1.ListOptions{
LabelSelector: selector.String(),
}
pods, err := k.k8sClient.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
k.log.Error(err, "Pod failure")
os.Exit(1)
}
for i := range pods.Items {
pod := pods.Items[i]
if pod.GetObjectMeta().GetDeletionTimestamp() == nil {
collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName)
}

listOptionsFunc := func(listOptions *metav1.ListOptions) {
listOptions.LabelSelector = selector.String()
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(
k.k8sClient,
time.Second*30,
informers.WithNamespace(ns),
informers.WithTweakListOptions(listOptionsFunc))
informer := informerFactory.Core().V1().Pods().Informer()

fn(collectorMap)
notify := make(chan struct{}, 1)
go k.rateLimitedCollectorHandler(notify, informer.GetStore(), fn)

for {
if !k.restartWatch(ctx, opts, collectorMap, fn) {
return nil
notifyFunc := func(_ interface{}) {
select {
case notify <- struct{}{}:
default:
}
}
}

func (k *Client) restartWatch(ctx context.Context, opts metav1.ListOptions, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) bool {
// add timeout to the context before calling Watch
ctx, cancel := context.WithTimeout(ctx, watcherTimeout)
defer cancel()
watcher, err := k.k8sClient.CoreV1().Pods(ns).Watch(ctx, opts)
_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: notifyFunc,
UpdateFunc: func(oldObj, newObj interface{}) {
notifyFunc(newObj)
},
DeleteFunc: notifyFunc,
})
if err != nil {
k.log.Error(err, "unable to create collector pod watcher")
return false
}
k.log.Info("Successfully started a collector pod watcher")
if msg := runWatch(ctx, k, watcher.ResultChan(), collectorMap, fn); msg != "" {
k.log.Info("Collector pod watch event stopped " + msg)
return false
return err
}

return true
informer.Run(k.close)
return nil
}

func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) string {
// rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel,
// but not more frequently than once per k.eventPeriod.
func (k *Watcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) {
ticker := time.NewTicker(k.minUpdateInterval)
defer ticker.Stop()

for {
collectorsDiscovered.Set(float64(len(collectorMap)))
select {
case <-k.close:
return "kubernetes client closed"
case <-ctx.Done():
return ""
case event, ok := <-c:
if !ok {
k.log.Info("No event found. Restarting watch routine")
return ""
}

pod, ok := event.Object.(*v1.Pod)
if !ok {
k.log.Info("No pod found in event Object. Restarting watch routine")
return ""
}

if pod.Spec.NodeName == "" {
k.log.Info("Node name is missing from the spec. Restarting watch routine")
return ""
return
case <-ticker.C: // throttle events to avoid excessive updates
select {
case <-notify:
k.runOnCollectors(store, fn)
default:
}
}
}
}

switch event.Type { //nolint:exhaustive
case watch.Added:
collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName)
case watch.Deleted:
delete(collectorMap, pod.Name)
}
fn(collectorMap)
// runOnCollectors runs the provided function on the set of collectors from the Store.
func (k *Watcher) runOnCollectors(store cache.Store, fn func(collectors map[string]*allocation.Collector)) {
collectorMap := map[string]*allocation.Collector{}
objects := store.List()
for _, obj := range objects {
pod := obj.(*v1.Pod)
if pod.Spec.NodeName == "" {
continue
}
collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName)
}
collectorsDiscovered.Set(float64(len(collectorMap)))
fn(collectorMap)
}

func (k *Client) Close() {
func (k *Watcher) Close() {
close(k.close)
}
Loading

0 comments on commit fbbca3d

Please sign in to comment.