From 3af517d6248098def580e2fef5348378cbbcee75 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 12 Sep 2022 12:21:10 -0400 Subject: [PATCH] Introduce ability to specify strategies for target allocation (#1079) * Cribbed from another branch * Refactor part 2 * Fix tests * Remove deadlocking problem * Update based on comments * Unexported some fields, added blocker and comments about invariants * Comments and imports fixed --- apis/v1alpha1/opentelemetrycollector_types.go | 2 + apis/v1alpha1/zz_generated.deepcopy.go | 2 +- ...ntelemetry.io_opentelemetrycollectors.yaml | 4 + cmd/otel-allocator/allocation/allocator.go | 237 ---------------- .../allocation/allocator_test.go | 267 ------------------ cmd/otel-allocator/allocation/http.go | 23 +- cmd/otel-allocator/allocation/http_test.go | 45 +-- .../allocation/least_weighted.go | 206 ++++++++++++++ .../allocation/least_weighted_test.go | 264 +++++++++++++++++ cmd/otel-allocator/allocation/strategy.go | 89 ++++++ cmd/otel-allocator/collector/collector.go | 41 +-- .../collector/collector_test.go | 31 +- cmd/otel-allocator/config/config.go | 12 +- cmd/otel-allocator/diff/diff.go | 40 +++ cmd/otel-allocator/diff/diff_test.go | 63 +++++ cmd/otel-allocator/discovery/discovery.go | 14 +- .../discovery/discovery_test.go | 9 +- cmd/otel-allocator/main.go | 47 +-- cmd/otel-allocator/watcher/file.go | 1 + cmd/otel-allocator/watcher/main.go | 5 +- ...ntelemetry.io_opentelemetrycollectors.yaml | 4 + docs/api.md | 7 + pkg/collector/reconcile/configmap.go | 5 + pkg/collector/reconcile/configmap_test.go | 4 +- 24 files changed, 800 insertions(+), 622 deletions(-) delete mode 100644 cmd/otel-allocator/allocation/allocator.go delete mode 100644 cmd/otel-allocator/allocation/allocator_test.go create mode 100644 cmd/otel-allocator/allocation/least_weighted.go create mode 100644 cmd/otel-allocator/allocation/least_weighted_test.go create mode 100644 cmd/otel-allocator/allocation/strategy.go create mode 100644 cmd/otel-allocator/diff/diff.go create mode 100644 cmd/otel-allocator/diff/diff_test.go diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 747736361f..ac79522da0 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -108,6 +108,8 @@ type OpenTelemetryCollectorSpec struct { // OpenTelemetryTargetAllocator defines the configurations for the Prometheus target allocator. type OpenTelemetryTargetAllocator struct { + // AllocationStrategy determines which strategy the target allocator should use for allocation + AllocationStrategy string `json:"allocationStrategy,omitempty"` // ServiceAccount indicates the name of an existing service account to use with this instance. // +optional ServiceAccount string `json:"serviceAccount,omitempty"` diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index cba3b24b99..776bebf9fd 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -20,7 +20,7 @@ package v1alpha1 import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index da91c7ce09..01dc8f2e2b 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -683,6 +683,10 @@ spec: description: TargetAllocator indicates a value which determines whether to spawn a target allocation resource or not. properties: + allocationStrategy: + description: AllocationStrategy determines which strategy the + target allocator should use for allocation + type: string enabled: description: Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not. diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go deleted file mode 100644 index be4d82d3eb..0000000000 --- a/cmd/otel-allocator/allocation/allocator.go +++ /dev/null @@ -1,237 +0,0 @@ -package allocation - -import ( - "fmt" - "net/url" - "sync" - - "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/common/model" -) - -var ( - collectorsAllocatable = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_collectors_allocatable", - Help: "Number of collectors the allocator is able to allocate to.", - }) - targetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_targets_per_collector", - Help: "The number of targets for each collector.", - }, []string{"collector_name"}) - timeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "opentelemetry_allocator_time_to_allocate", - Help: "The time it takes to allocate", - }, []string{"method"}) -) - -/* - Load balancer will serve on an HTTP server exposing /jobs//targets - The targets are allocated using the least connection method - Load balancer will need information about the collectors in order to set the URLs - Keep a Map of what each collector currently holds and update it based on new scrape target updates -*/ - -type TargetItem struct { - JobName string - Link LinkJSON - TargetURL string - Label model.LabelSet - Collector *collector -} - -func (t TargetItem) hash() string { - return t.JobName + t.TargetURL + t.Label.Fingerprint().String() -} - -// Create a struct that holds collector - and jobs for that collector -// This struct will be parsed into endpoint with collector and jobs info - -type collector struct { - Name string - NumTargets int -} - -// Allocator makes decisions to distribute work among -// a number of OpenTelemetry collectors based on the number of targets. -// Users need to call SetTargets when they have new targets in their -// clusters and call SetCollectors when the collectors have changed. -type Allocator struct { - // m protects collectors and targetItems for concurrent use. - m sync.RWMutex - collectors map[string]*collector // all current collectors - targetItems map[string]*TargetItem - - log logr.Logger -} - -// TargetItems returns a shallow copy of the targetItems map. -func (allocator *Allocator) TargetItems() map[string]*TargetItem { - allocator.m.RLock() - defer allocator.m.RUnlock() - targetItemsCopy := make(map[string]*TargetItem) - for k, v := range allocator.targetItems { - targetItemsCopy[k] = v - } - return targetItemsCopy -} - -// Collectors returns a shallow copy of the collectors map. -func (allocator *Allocator) Collectors() map[string]*collector { - allocator.m.RLock() - defer allocator.m.RUnlock() - collectorsCopy := make(map[string]*collector) - for k, v := range allocator.collectors { - collectorsCopy[k] = v - } - return collectorsCopy -} - -// findNextCollector finds the next collector with fewer number of targets. -// This method is called from within SetTargets and SetCollectors, whose caller -// acquires the needed lock. -func (allocator *Allocator) findNextCollector() *collector { - var col *collector - for _, v := range allocator.collectors { - // If the initial collector is empty, set the initial collector to the first element of map - if col == nil { - col = v - } else { - if v.NumTargets < col.NumTargets { - col = v - } - } - } - return col -} - -// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems -// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. -// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap -func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) { - chosenCollector := allocator.findNextCollector() - targetItem := TargetItem{ - JobName: target.JobName, - Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, - TargetURL: target.TargetURL, - Label: target.Label, - Collector: chosenCollector, - } - allocator.targetItems[targetItem.hash()] = &targetItem - chosenCollector.NumTargets++ - targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) -} - -// getCollectorChanges returns the new and removed collectors respectively. -// This method is called from within SetCollectors, which acquires the needed lock. -func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) { - var newCollectors []string - var removedCollectors []string - // Used as a set to check for removed collectors - tempCollectorMap := map[string]bool{} - for _, s := range collectors { - if _, found := allocator.collectors[s]; !found { - newCollectors = append(newCollectors, s) - } - tempCollectorMap[s] = true - } - for k := range allocator.collectors { - if _, found := tempCollectorMap[k]; !found { - removedCollectors = append(removedCollectors, k) - } - } - return newCollectors, removedCollectors -} - -// SetTargets accepts a list of targets that will be used to make -// load balancing decisions. This method should be called when there are -// new targets discovered or existing targets are shutdown. -func (allocator *Allocator) SetTargets(targets []TargetItem) { - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets")) - defer timer.ObserveDuration() - - allocator.m.Lock() - defer allocator.m.Unlock() - - // Make the temp map for access - tempTargetMap := make(map[string]TargetItem, len(targets)) - for _, target := range targets { - tempTargetMap[target.hash()] = target - } - - // Check for removals - for k, target := range allocator.targetItems { - // if the old target is no longer in the new list, remove it - if _, ok := tempTargetMap[k]; !ok { - allocator.collectors[target.Collector.Name].NumTargets-- - delete(allocator.targetItems, k) - targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets)) - } - } - - // Check for additions - for k, target := range tempTargetMap { - // Do nothing if the item is already there - if _, ok := allocator.targetItems[k]; ok { - continue - } else { - // Assign new set of collectors with the one different name - allocator.addTargetToTargetItems(&target) - } - } -} - -// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. -// This method is called when Collectors are added or removed. -func (allocator *Allocator) SetCollectors(collectors []string) { - log := allocator.log.WithValues("component", "opentelemetry-targetallocator") - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors")) - defer timer.ObserveDuration() - - collectorsAllocatable.Set(float64(len(collectors))) - if len(collectors) == 0 { - log.Info("No collector instances present") - return - } - - allocator.m.Lock() - defer allocator.m.Unlock() - newCollectors, removedCollectors := allocator.getCollectorChanges(collectors) - if len(newCollectors) == 0 && len(removedCollectors) == 0 { - log.Info("No changes to the collectors found") - return - } - - // Clear existing collectors - for _, k := range removedCollectors { - delete(allocator.collectors, k) - targetsPerCollector.WithLabelValues(k).Set(0) - } - // Insert the new collectors - for _, i := range newCollectors { - allocator.collectors[i] = &collector{Name: i, NumTargets: 0} - } - - // find targets which need to be redistributed - var redistribute []*TargetItem - for _, item := range allocator.targetItems { - for _, s := range removedCollectors { - if item.Collector.Name == s { - redistribute = append(redistribute, item) - } - } - } - // Re-Allocate the existing targets - for _, item := range redistribute { - allocator.addTargetToTargetItems(item) - } -} - -func NewAllocator(log logr.Logger) *Allocator { - return &Allocator{ - log: log, - collectors: make(map[string]*collector), - targetItems: make(map[string]*TargetItem), - } -} diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/allocator_test.go deleted file mode 100644 index 0b754cb47a..0000000000 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ /dev/null @@ -1,267 +0,0 @@ -package allocation - -import ( - "math" - "testing" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - logf "sigs.k8s.io/controller-runtime/pkg/log" -) - -var logger = logf.Log.WithName("unit-tests") - -// Tests the least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload -func TestFindNextCollector(t *testing.T) { - s := NewAllocator(logger) - - defaultCol := collector{Name: "default-col", NumTargets: 1} - maxCol := collector{Name: "max-col", NumTargets: 2} - leastCol := collector{Name: "least-col", NumTargets: 0} - s.collectors[maxCol.Name] = &maxCol - s.collectors[leastCol.Name] = &leastCol - s.collectors[defaultCol.Name] = &defaultCol - - assert.Equal(t, "least-col", s.findNextCollector().Name) -} - -func TestSetCollectors(t *testing.T) { - s := NewAllocator(logger) - - cols := []string{"col-1", "col-2", "col-3"} - s.SetCollectors(cols) - - expectedColLen := len(cols) - collectors := s.Collectors() - assert.Len(t, collectors, expectedColLen) - - for _, i := range cols { - assert.NotNil(t, collectors[i]) - } -} - -func TestAddingAndRemovingTargets(t *testing.T) { - // prepare allocator with initial targets and collectors - s := NewAllocator(logger) - - cols := []string{"col-1", "col-2", "col-3"} - s.SetCollectors(cols) - labels := model.LabelSet{} - - initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem - for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) - } - - // test that targets and collectors are added properly - s.SetTargets(targetList) - - // verify - expectedTargetLen := len(initTargets) - assert.Len(t, s.TargetItems(), expectedTargetLen) - - // prepare second round of targets - tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"} - var newTargetList []TargetItem - for _, i := range tar { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) - } - - // test that fewer targets are found - removed - s.SetTargets(newTargetList) - - // verify - targetItems := s.TargetItems() - expectedNewTargetLen := len(tar) - assert.Len(t, targetItems, expectedNewTargetLen) - - // verify results map - for _, i := range tar { - _, ok := targetItems["sample-name"+i+labels.Fingerprint().String()] - assert.True(t, ok) - } -} - -// Tests that two targets with the same target url and job name but different label set are both added -func TestAllocationCollision(t *testing.T) { - // prepare allocator with initial targets and collectors - s := NewAllocator(logger) - - cols := []string{"col-1", "col-2", "col-3"} - s.SetCollectors(cols) - firstLabels := model.LabelSet{ - "test": "test1", - } - secondLabels := model.LabelSet{ - "test": "test2", - } - - targetList := []TargetItem{ - {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, - {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, - } - - // test that targets and collectors are added properly - s.SetTargets(targetList) - - // verify - targetItems := s.TargetItems() - expectedTargetLen := len(targetList) - assert.Len(t, targetItems, expectedTargetLen) - - // verify results map - for _, i := range targetList { - _, ok := targetItems[i.hash()] - assert.True(t, ok) - } -} - -func TestNoCollectorReassignment(t *testing.T) { - s := NewAllocator(logger) - - cols := []string{"col-1", "col-2", "col-3"} - s.SetCollectors(cols) - labels := model.LabelSet{} - - expectedColLen := len(cols) - assert.Len(t, s.collectors, expectedColLen) - - for _, i := range cols { - assert.NotNil(t, s.collectors[i]) - } - initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem - for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) - } - // test that targets and collectors are added properly - s.SetTargets(targetList) - - // verify - expectedTargetLen := len(initTargets) - targetItems := s.TargetItems() - assert.Len(t, targetItems, expectedTargetLen) - - // assign new set of collectors with the same names - newCols := []string{"col-1", "col-2", "col-3"} - s.SetCollectors(newCols) - - newTargetItems := s.TargetItems() - assert.Equal(t, targetItems, newTargetItems) - -} - -func TestSmartCollectorReassignment(t *testing.T) { - s := NewAllocator(logger) - - cols := []string{"col-1", "col-2", "col-3"} - s.SetCollectors(cols) - labels := model.LabelSet{} - - expectedColLen := len(cols) - assert.Len(t, s.collectors, expectedColLen) - - for _, i := range cols { - assert.NotNil(t, s.collectors[i]) - } - initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem - for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) - } - // test that targets and collectors are added properly - s.SetTargets(targetList) - - // verify - expectedTargetLen := len(initTargets) - targetItems := s.TargetItems() - assert.Len(t, targetItems, expectedTargetLen) - - // assign new set of collectors with the same names - newCols := []string{"col-1", "col-2", "col-4"} - s.SetCollectors(newCols) - - newTargetItems := s.TargetItems() - assert.Equal(t, len(targetItems), len(newTargetItems)) - for key, targetItem := range targetItems { - item, ok := newTargetItems[key] - assert.True(t, ok, "all target items should be found in new target item list") - if targetItem.Collector.Name != "col-3" { - assert.Equal(t, targetItem.Collector.Name, item.Collector.Name) - } else { - assert.Equal(t, "col-4", item.Collector.Name) - } - } -} - -// Tests that the delta in number of targets per collector is less than 15% of an even distribution -func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { - - // prepare allocator with 3 collectors and 'random' amount of targets - s := NewAllocator(logger) - - cols := []string{"col-1", "col-2", "col-3"} - s.SetCollectors(cols) - - targets := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005", "prometheus:1006", - "prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1015", "prometheus:1016", - "prometheus:1021", "prometheus:1022", "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"} - var newTargetList []TargetItem - for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) - } - s.SetTargets(newTargetList) - - // Divisor needed to get 15% - divisor := 6.7 - - targetItemLen := len(s.TargetItems()) - collectors := s.Collectors() - count := targetItemLen / len(collectors) - percent := float64(targetItemLen) / divisor - - // test - for _, i := range collectors { - assert.InDelta(t, i.NumTargets, count, percent) - } - - // removing targets at 'random' - targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006", - "prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1016", - "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"} - newTargetList = []TargetItem{} - for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) - } - s.SetTargets(newTargetList) - - targetItemLen = len(s.TargetItems()) - collectors = s.Collectors() - count = targetItemLen / len(collectors) - percent = float64(targetItemLen) / divisor - - // test - for _, i := range collectors { - assert.InDelta(t, i.NumTargets, count, math.Round(percent)) - } - // adding targets at 'random' - targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006", - "prometheus:1011", "prometheus:1012", "prometheus:1001", "prometheus:1014", "prometheus:1016", - "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1126", "prometheus:1227"} - newTargetList = []TargetItem{} - for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) - } - s.SetTargets(newTargetList) - - targetItemLen = len(s.TargetItems()) - collectors = s.Collectors() - count = targetItemLen / len(collectors) - percent = float64(targetItemLen) / divisor - - // test - for _, i := range collectors { - assert.InDelta(t, i.NumTargets, count, math.Round(percent)) - } -} diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index ba2602fff9..88622aef6e 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -21,12 +21,12 @@ type targetGroupJSON struct { Labels model.LabelSet `json:"labels"` } -func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *Allocator) map[string]collectorJSON { +func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allocator) map[string]collectorJSON { displayData := make(map[string]collectorJSON) for _, j := range allocator.TargetItems() { if j.JobName == job { var targetList []TargetItem - targetList = append(targetList, cMap[j.Collector.Name+j.JobName]...) + targetList = append(targetList, cMap[j.CollectorName+j.JobName]...) var targetGroupList []targetGroupJSON @@ -37,30 +37,27 @@ func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *All }) } - displayData[j.Collector.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.Collector.Name), Jobs: targetGroupList} + displayData[j.CollectorName] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList} } } return displayData } -func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator *Allocator) []targetGroupJSON { +func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator Allocator) []targetGroupJSON { var tgs []targetGroupJSON group := make(map[string]string) labelSet := make(map[string]model.LabelSet) - for _, col := range allocator.Collectors() { - if col.Name == collector { - for _, targetItemArr := range cMap { - for _, targetItem := range targetItemArr { - if targetItem.Collector.Name == collector && targetItem.JobName == job { - group[targetItem.Label.String()] = targetItem.TargetURL - labelSet[targetItem.TargetURL] = targetItem.Label - } + if _, ok := allocator.Collectors()[collector]; ok { + for _, targetItemArr := range cMap { + for _, targetItem := range targetItemArr { + if targetItem.CollectorName == collector && targetItem.JobName == job { + group[targetItem.Label.String()] = targetItem.TargetURL + labelSet[targetItem.TargetURL] = targetItem.Label } } } } - for _, v := range group { tgs = append(tgs, targetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) } diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index ed62c1113f..1b8cf97260 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -9,15 +9,15 @@ import ( ) func TestGetAllTargetsByCollectorAndJob(t *testing.T) { - baseAllocator := NewAllocator(logger) - baseAllocator.SetCollectors([]string{"test-collector"}) - statefulAllocator := NewAllocator(logger) - statefulAllocator.SetCollectors([]string{"test-collector-0"}) + baseAllocator, _ := New("least-weighted", logger) + baseAllocator.SetCollectors(map[string]*Collector{"test-collector": {Name: "test-collector"}}) + statefulAllocator, _ := New("least-weighted", logger) + statefulAllocator.SetCollectors(map[string]*Collector{"test-collector-0": {Name: "test-collector-0"}}) type args struct { collector string job string cMap map[string][]TargetItem - allocator *Allocator + allocator Allocator } var tests = []struct { name string @@ -46,11 +46,8 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, }, @@ -77,11 +74,8 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, "test-collectortest-job2": { @@ -90,11 +84,8 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, }, @@ -122,11 +113,8 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { "test-label": "test-value", "foo": "bar", }, - TargetURL: "test-url1", - Collector: &collector{ - Name: "test-collector", - NumTargets: 2, - }, + TargetURL: "test-url1", + CollectorName: "test-collector", }, }, "test-collectortest-job2": { @@ -135,11 +123,8 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url2", - Collector: &collector{ - Name: "test-collector", - NumTargets: 2, - }, + TargetURL: "test-url2", + CollectorName: "test-collector", }, }, }, diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go new file mode 100644 index 0000000000..9e037567d6 --- /dev/null +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -0,0 +1,206 @@ +package allocation + +import ( + "fmt" + "net/url" + "sync" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" + + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" +) + +var _ Allocator = &leastWeightedAllocator{} + +const strategyName = "least-weighted" + +/* + Load balancer will serve on an HTTP server exposing /jobs//targets + The targets are allocated using the least connection method + Load balancer will need information about the collectors in order to set the URLs + Keep a Map of what each collector currently holds and update it based on new scrape target updates +*/ + +// leastWeightedAllocator makes decisions to distribute work among +// a number of OpenTelemetry collectors based on the number of targets. +// Users need to call SetTargets when they have new targets in their +// clusters and call SetCollectors when the collectors have changed. +type leastWeightedAllocator struct { + // m protects collectors and targetItems for concurrent use. + m sync.RWMutex + // collectors is a map from a Collector's name to a Collector instance + collectors map[string]*Collector + // targetItems is a map from a target item's hash to the target items allocated state + targetItems map[string]*TargetItem + + log logr.Logger +} + +// TargetItems returns a shallow copy of the targetItems map. +func (allocator *leastWeightedAllocator) TargetItems() map[string]*TargetItem { + allocator.m.RLock() + defer allocator.m.RUnlock() + targetItemsCopy := make(map[string]*TargetItem) + for k, v := range allocator.targetItems { + targetItemsCopy[k] = v + } + return targetItemsCopy +} + +// Collectors returns a shallow copy of the collectors map. +func (allocator *leastWeightedAllocator) Collectors() map[string]*Collector { + allocator.m.RLock() + defer allocator.m.RUnlock() + collectorsCopy := make(map[string]*Collector) + for k, v := range allocator.collectors { + collectorsCopy[k] = v + } + return collectorsCopy +} + +// findNextCollector finds the next collector with fewer number of targets. +// This method is called from within SetTargets and SetCollectors, whose caller +// acquires the needed lock. This method assumes there are is at least 1 collector set +// INVARIANT: allocator.collectors must have at least 1 collector set +func (allocator *leastWeightedAllocator) findNextCollector() *Collector { + var col *Collector + for _, v := range allocator.collectors { + // If the initial collector is empty, set the initial collector to the first element of map + if col == nil { + col = v + } else if v.NumTargets < col.NumTargets { + col = v + } + } + return col +} + +// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems +// This method is called from within SetTargets and SetCollectors, which acquire the needed lock. +// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap +// INVARIANT: allocator.collectors must have at least 1 collector set +func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetItem) { + chosenCollector := allocator.findNextCollector() + targetItem := &TargetItem{ + JobName: target.JobName, + Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, + TargetURL: target.TargetURL, + Label: target.Label, + CollectorName: chosenCollector.Name, + } + allocator.targetItems[targetItem.Hash()] = targetItem + chosenCollector.NumTargets++ + TargetsPerCollector.WithLabelValues(chosenCollector.Name, strategyName).Set(float64(chosenCollector.NumTargets)) +} + +// handleTargets receives the new and removed targets and reconciles the current state. +// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector +// Any net-new additions are assigned to the next available collector +func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*TargetItem]) { + // Check for removals + for k, target := range allocator.targetItems { + // if the current target is in the removals list + if _, ok := diff.Removals()[k]; ok { + c := allocator.collectors[target.CollectorName] + c.NumTargets-- + delete(allocator.targetItems, k) + TargetsPerCollector.WithLabelValues(target.CollectorName, strategyName).Set(float64(c.NumTargets)) + } + } + + // Check for additions + for k, target := range diff.Additions() { + // Do nothing if the item is already there + if _, ok := allocator.targetItems[k]; ok { + continue + } else { + // Add target to target pool and assign a collector + allocator.addTargetToTargetItems(target) + } + } +} + +// handleCollectors receives the new and removed collectors and reconciles the current state. +// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map +// Finally, any targets of removed collectors are reallocated to the next available collector. +func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Collector]) { + // Clear removed collectors + for _, k := range diff.Removals() { + delete(allocator.collectors, k.Name) + TargetsPerCollector.WithLabelValues(k.Name, strategyName).Set(0) + } + // Insert the new collectors + for _, i := range diff.Additions() { + allocator.collectors[i.Name] = NewCollector(i.Name) + } + + // Re-Allocate targets of the removed collectors + for _, item := range allocator.targetItems { + if _, ok := diff.Removals()[item.CollectorName]; ok { + allocator.addTargetToTargetItems(item) + } + } +} + +// SetTargets accepts a list of targets that will be used to make +// load balancing decisions. This method should be called when there are +// new targets discovered or existing targets are shutdown. +func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetItem) { + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", strategyName)) + defer timer.ObserveDuration() + + allocator.m.Lock() + defer allocator.m.Unlock() + + if len(allocator.collectors) == 0 { + allocator.log.Info("No collector instances present, cannot set targets") + return + } + // Check for target changes + targetsDiff := diff.Maps(allocator.targetItems, targets) + // If there are any additions or removals + if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { + allocator.handleTargets(targetsDiff) + } + return +} + +// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. +// This method is called when Collectors are added or removed. +func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Collector) { + log := allocator.log.WithValues("component", "opentelemetry-targetallocator") + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", strategyName)) + defer timer.ObserveDuration() + + CollectorsAllocatable.WithLabelValues(strategyName).Set(float64(len(collectors))) + if len(collectors) == 0 { + log.Info("No collector instances present") + return + } + + allocator.m.Lock() + defer allocator.m.Unlock() + + // Check for collector changes + collectorsDiff := diff.Maps(allocator.collectors, collectors) + if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { + allocator.handleCollectors(collectorsDiff) + } + return +} + +func NewAllocator(log logr.Logger) Allocator { + return &leastWeightedAllocator{ + log: log, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*TargetItem), + } +} + +func init() { + err := Register(strategyName, NewAllocator) + if err != nil { + panic(err) + } +} diff --git a/cmd/otel-allocator/allocation/least_weighted_test.go b/cmd/otel-allocator/allocation/least_weighted_test.go new file mode 100644 index 0000000000..c15c30a301 --- /dev/null +++ b/cmd/otel-allocator/allocation/least_weighted_test.go @@ -0,0 +1,264 @@ +package allocation + +import ( + "fmt" + "math" + "math/rand" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +var logger = logf.Log.WithName("unit-tests") + +func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*TargetItem { + toReturn := map[string]*TargetItem{} + for i := startingIndex; i < n+startingIndex; i++ { + collector := fmt.Sprintf("collector-%d", i%numCollectors) + newTarget := NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", nil, collector) + toReturn[newTarget.Hash()] = newTarget + } + return toReturn +} + +func makeNCollectors(n int, targetsForEach int) map[string]*Collector { + toReturn := map[string]*Collector{} + for i := 0; i < n; i++ { + collector := fmt.Sprintf("collector-%d", i) + toReturn[collector] = &Collector{ + Name: collector, + NumTargets: targetsForEach, + } + } + return toReturn +} + +func TestSetCollectors(t *testing.T) { + s, _ := New("least-weighted", logger) + + cols := makeNCollectors(3, 0) + s.SetCollectors(cols) + + expectedColLen := len(cols) + collectors := s.Collectors() + assert.Len(t, collectors, expectedColLen) + + for _, i := range cols { + assert.NotNil(t, collectors[i.Name]) + } +} + +func TestAddingAndRemovingTargets(t *testing.T) { + // prepare allocator with initial targets and collectors + s, _ := New("least-weighted", logger) + + cols := makeNCollectors(3, 0) + s.SetCollectors(cols) + + initTargets := makeNNewTargets(6, 3, 0) + + // test that targets and collectors are added properly + s.SetTargets(initTargets) + + // verify + expectedTargetLen := len(initTargets) + assert.Len(t, s.TargetItems(), expectedTargetLen) + + // prepare second round of targets + tar := makeNNewTargets(4, 3, 0) + + // test that fewer targets are found - removed + s.SetTargets(tar) + + // verify + targetItems := s.TargetItems() + expectedNewTargetLen := len(tar) + assert.Len(t, targetItems, expectedNewTargetLen) + + // verify results map + for _, i := range tar { + _, ok := targetItems[i.Hash()] + assert.True(t, ok) + } +} + +// Tests that two targets with the same target url and job name but different label set are both added +func TestAllocationCollision(t *testing.T) { + // prepare allocator with initial targets and collectors + s, _ := New("least-weighted", logger) + + cols := makeNCollectors(3, 0) + s.SetCollectors(cols) + firstLabels := model.LabelSet{ + "test": "test1", + } + secondLabels := model.LabelSet{ + "test": "test2", + } + firstTarget := NewTargetItem("sample-name", "0.0.0.0:8000", firstLabels, "") + secondTarget := NewTargetItem("sample-name", "0.0.0.0:8000", secondLabels, "") + + targetList := map[string]*TargetItem{ + firstTarget.Hash(): firstTarget, + secondTarget.Hash(): secondTarget, + } + + // test that targets and collectors are added properly + s.SetTargets(targetList) + + // verify + targetItems := s.TargetItems() + expectedTargetLen := len(targetList) + assert.Len(t, targetItems, expectedTargetLen) + + // verify results map + for _, i := range targetList { + _, ok := targetItems[i.Hash()] + assert.True(t, ok) + } +} + +func TestNoCollectorReassignment(t *testing.T) { + s, _ := New("least-weighted", logger) + + cols := makeNCollectors(3, 0) + s.SetCollectors(cols) + + expectedColLen := len(cols) + assert.Len(t, s.Collectors(), expectedColLen) + + for _, i := range cols { + assert.NotNil(t, s.Collectors()[i.Name]) + } + initTargets := makeNNewTargets(6, 3, 0) + + // test that targets and collectors are added properly + s.SetTargets(initTargets) + + // verify + expectedTargetLen := len(initTargets) + targetItems := s.TargetItems() + assert.Len(t, targetItems, expectedTargetLen) + + // assign new set of collectors with the same names + newCols := makeNCollectors(3, 0) + s.SetCollectors(newCols) + + newTargetItems := s.TargetItems() + assert.Equal(t, targetItems, newTargetItems) + +} + +func TestSmartCollectorReassignment(t *testing.T) { + s, _ := New("least-weighted", logger) + + cols := makeNCollectors(4, 0) + s.SetCollectors(cols) + + expectedColLen := len(cols) + assert.Len(t, s.Collectors(), expectedColLen) + + for _, i := range cols { + assert.NotNil(t, s.Collectors()[i.Name]) + } + initTargets := makeNNewTargets(6, 4, 0) + // test that targets and collectors are added properly + s.SetTargets(initTargets) + + // verify + expectedTargetLen := len(initTargets) + targetItems := s.TargetItems() + assert.Len(t, targetItems, expectedTargetLen) + + // assign new set of collectors with the same names + newCols := map[string]*Collector{ + "collector-0": { + Name: "collector-0", + }, "collector-1": { + Name: "collector-1", + }, "collector-2": { + Name: "collector-2", + }, "collector-4": { + Name: "collector-4", + }, + } + s.SetCollectors(newCols) + + newTargetItems := s.TargetItems() + assert.Equal(t, len(targetItems), len(newTargetItems)) + for key, targetItem := range targetItems { + item, ok := newTargetItems[key] + assert.True(t, ok, "all target items should be found in new target item list") + if targetItem.CollectorName != "collector-3" { + assert.Equal(t, targetItem.CollectorName, item.CollectorName) + } else { + assert.Equal(t, "collector-4", item.CollectorName) + } + } +} + +// Tests that the delta in number of targets per collector is less than 15% of an even distribution +func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { + + // prepare allocator with 3 collectors and 'random' amount of targets + s, _ := New("least-weighted", logger) + + cols := makeNCollectors(3, 0) + s.SetCollectors(cols) + + targets := makeNNewTargets(27, 3, 0) + s.SetTargets(targets) + + // Divisor needed to get 15% + divisor := 6.7 + + targetItemLen := len(s.TargetItems()) + collectors := s.Collectors() + count := targetItemLen / len(collectors) + percent := float64(targetItemLen) / divisor + + // test + for _, i := range collectors { + assert.InDelta(t, i.NumTargets, count, percent) + } + + // removing targets at 'random' + // Remove half of targets randomly + toDelete := len(targets) / 2 + counter := 0 + for index, _ := range targets { + shouldDelete := rand.Intn(toDelete) + if counter < shouldDelete { + delete(targets, index) + } + counter++ + } + s.SetTargets(targets) + + targetItemLen = len(s.TargetItems()) + collectors = s.Collectors() + count = targetItemLen / len(collectors) + percent = float64(targetItemLen) / divisor + + // test + for _, i := range collectors { + assert.InDelta(t, i.NumTargets, count, math.Round(percent)) + } + // adding targets at 'random' + for _, item := range makeNNewTargets(13, 3, 100) { + targets[item.Hash()] = item + } + s.SetTargets(targets) + + targetItemLen = len(s.TargetItems()) + collectors = s.Collectors() + count = targetItemLen / len(collectors) + percent = float64(targetItemLen) / divisor + + // test + for _, i := range collectors { + assert.InDelta(t, i.NumTargets, count, math.Round(percent)) + } +} diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go new file mode 100644 index 0000000000..42166caf78 --- /dev/null +++ b/cmd/otel-allocator/allocation/strategy.go @@ -0,0 +1,89 @@ +package allocation + +import ( + "errors" + "fmt" + "net/url" + + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" +) + +type AllocatorProvider func(log logr.Logger) Allocator + +var ( + registry = map[string]AllocatorProvider{} + + // TargetsPerCollector records how many targets have been assigned to each collector + // It is currently the responsibility of the strategy to track this information. + TargetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_targets_per_collector", + Help: "The number of targets for each collector.", + }, []string{"collector_name", "strategy"}) + CollectorsAllocatable = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_collectors_allocatable", + Help: "Number of collectors the allocator is able to allocate to.", + }, []string{"strategy"}) + TimeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "opentelemetry_allocator_time_to_allocate", + Help: "The time it takes to allocate", + }, []string{"method", "strategy"}) +) + +func New(name string, log logr.Logger) (Allocator, error) { + if p, ok := registry[name]; ok { + return p(log), nil + } + return nil, errors.New(fmt.Sprintf("unregistered strategy: %s", name)) +} + +func Register(name string, provider AllocatorProvider) error { + if _, ok := registry[name]; ok { + return errors.New("already registered") + } + registry[name] = provider + return nil +} + +type Allocator interface { + SetCollectors(collectors map[string]*Collector) + SetTargets(targets map[string]*TargetItem) + TargetItems() map[string]*TargetItem + Collectors() map[string]*Collector +} + +type TargetItem struct { + JobName string + Link LinkJSON + TargetURL string + Label model.LabelSet + CollectorName string +} + +func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *TargetItem { + return &TargetItem{ + JobName: jobName, + Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, + TargetURL: targetURL, + Label: label, + CollectorName: collectorName, + } +} + +func (t TargetItem) Hash() string { + return t.JobName + t.TargetURL + t.Label.Fingerprint().String() +} + +// Collector Creates a struct that holds Collector information +// This struct will be parsed into endpoint with Collector and jobs info +// This struct can be extended with information like annotations and labels in the future +type Collector struct { + Name string + NumTargets int +} + +func NewCollector(name string) *Collector { + return &Collector{Name: name} +} diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index 7732d42f08..e1cf66ec1f 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -15,6 +15,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) const ( @@ -30,10 +32,9 @@ var ( ) type Client struct { - log logr.Logger - k8sClient kubernetes.Interface - collectorChan chan []string - close chan struct{} + log logr.Logger + k8sClient kubernetes.Interface + close chan struct{} } func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) { @@ -49,8 +50,8 @@ func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) { }, nil } -func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors []string)) { - collectorMap := map[string]bool{} +func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors map[string]*allocation.Collector)) { + collectorMap := map[string]*allocation.Collector{} log := k.log.WithValues("component", "opentelemetry-targetallocator") opts := metav1.ListOptions{ @@ -64,17 +65,11 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( for i := range pods.Items { pod := pods.Items[i] if pod.GetObjectMeta().GetDeletionTimestamp() == nil { - collectorMap[pod.Name] = true + collectorMap[pod.Name] = allocation.NewCollector(pod.Name) } } - collectorKeys := make([]string, len(collectorMap)) - i := 0 - for keys := range collectorMap { - collectorKeys[i] = keys - i++ - } - fn(collectorKeys) + fn(collectorMap) go func() { for { @@ -92,7 +87,7 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( }() } -func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]bool, fn func(collectors []string)) string { +func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) string { log := k.log.WithValues("component", "opentelemetry-targetallocator") for { collectorsDiscovered.Set(float64(len(collectorMap))) @@ -115,23 +110,11 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap switch event.Type { case watch.Added: - collectorMap[pod.Name] = true + collectorMap[pod.Name] = allocation.NewCollector(pod.Name) case watch.Deleted: delete(collectorMap, pod.Name) } - - collectorKeys := make([]string, len(collectorMap)) - i := 0 - for keys := range collectorMap { - collectorKeys[i] = keys - i++ - } - fn(collectorKeys) - select { - case k.collectorChan <- collectorKeys: - default: - fn(collectorKeys) - } + fn(collectorMap) case <-time.After(watcherTimeout): log.Info("Restarting watch routine") return "" diff --git a/cmd/otel-allocator/collector/collector_test.go b/cmd/otel-allocator/collector/collector_test.go index 5ba5b0fb1c..059e616702 100644 --- a/cmd/otel-allocator/collector/collector_test.go +++ b/cmd/otel-allocator/collector/collector_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "sort" "testing" "github.com/stretchr/testify/assert" @@ -12,16 +11,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/fake" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) var client Client -var collectors = []string{} +var collectors = map[string]*allocation.Collector{} func TestMain(m *testing.M) { client = Client{ - k8sClient: fake.NewSimpleClientset(), - collectorChan: make(chan []string, 3), - close: make(chan struct{}), + k8sClient: fake.NewSimpleClientset(), + close: make(chan struct{}), } labelMap := map[string]string{ @@ -39,7 +39,7 @@ func TestMain(m *testing.M) { os.Exit(1) } - go runWatch(context.Background(), &client, watcher.ResultChan(), map[string]bool{}, func(collectorList []string) { getCollectors(collectorList) }) + go runWatch(context.Background(), &client, watcher.ResultChan(), map[string]*allocation.Collector{}, func(colMap map[string]*allocation.Collector) { getCollectors(colMap) }) code := m.Run() @@ -49,18 +49,25 @@ func TestMain(m *testing.M) { } func TestWatchPodAddition(t *testing.T) { - expected := []string{"test-pod1", "test-pod2", "test-pod3"} + expected := map[string]*allocation.Collector{ + "test-pod1": { + Name: "test-pod1", + }, + "test-pod2": { + Name: "test-pod2", + }, + "test-pod3": { + Name: "test-pod3", + }, + } for _, k := range []string{"test-pod1", "test-pod2", "test-pod3"} { expected := pod(k) _, err := client.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), expected, metav1.CreateOptions{}) assert.NoError(t, err) - collectors = <-client.collectorChan } assert.Len(t, collectors, 3) - - sort.Strings(collectors) assert.Equal(t, collectors, expected) } @@ -70,16 +77,14 @@ func TestWatchPodDeletion(t *testing.T) { for _, k := range []string{"test-pod2", "test-pod3"} { err := client.k8sClient.CoreV1().Pods("test-ns").Delete(context.Background(), k, metav1.DeleteOptions{}) assert.NoError(t, err) - collectors = <-client.collectorChan } assert.Len(t, collectors, 1) - sort.Strings(collectors) assert.Equal(t, collectors, expected) } -func getCollectors(c []string) { +func getCollectors(c map[string]*allocation.Collector) { collectors = c } diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 4be3bdf3c2..05d0e38405 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -31,8 +31,16 @@ const DefaultResyncTime = 5 * time.Minute const DefaultConfigFilePath string = "/conf/targetallocator.yaml" type Config struct { - LabelSelector map[string]string `yaml:"label_selector,omitempty"` - Config *promconfig.Config `yaml:"config"` + LabelSelector map[string]string `yaml:"label_selector,omitempty"` + Config *promconfig.Config `yaml:"config"` + AllocationStrategy *string `yaml:"allocation_strategy,omitempty"` +} + +func (c Config) GetAllocationStrategy() string { + if c.AllocationStrategy != nil { + return *c.AllocationStrategy + } + return "least-weighted" } type PrometheusCRWatcherConfig struct { diff --git a/cmd/otel-allocator/diff/diff.go b/cmd/otel-allocator/diff/diff.go new file mode 100644 index 0000000000..44ec55e040 --- /dev/null +++ b/cmd/otel-allocator/diff/diff.go @@ -0,0 +1,40 @@ +package diff + +// Changes is the result of the difference between two maps – items that are added and items that are removed +// This map is used to reconcile state differences. +type Changes[T any] struct { + additions map[string]T + removals map[string]T +} + +func (c Changes[T]) Additions() map[string]T { + return c.additions +} + +func (c Changes[T]) Removals() map[string]T { + return c.removals +} + +// Maps generates Changes for two maps with the same type signature by checking for any removals and then checking for +// additions. +func Maps[T any](current, new map[string]T) Changes[T] { + additions := map[string]T{} + removals := map[string]T{} + // Used as a set to check for removed items + newMembership := map[string]bool{} + for key, value := range new { + if _, found := current[key]; !found { + additions[key] = value + } + newMembership[key] = true + } + for key, value := range current { + if _, found := newMembership[key]; !found { + removals[key] = value + } + } + return Changes[T]{ + additions: additions, + removals: removals, + } +} diff --git a/cmd/otel-allocator/diff/diff_test.go b/cmd/otel-allocator/diff/diff_test.go new file mode 100644 index 0000000000..bcae530d55 --- /dev/null +++ b/cmd/otel-allocator/diff/diff_test.go @@ -0,0 +1,63 @@ +package diff + +import ( + "reflect" + "testing" +) + +func TestDiffMaps(t *testing.T) { + type args struct { + current map[string]string + new map[string]string + } + tests := []struct { + name string + args args + want Changes[string] + }{ + { + name: "basic replacement", + args: args{ + current: map[string]string{ + "current": "one", + }, + new: map[string]string{ + "new": "another", + }, + }, + want: Changes[string]{ + additions: map[string]string{ + "new": "another", + }, + removals: map[string]string{ + "current": "one", + }, + }, + }, + { + name: "single addition", + args: args{ + current: map[string]string{ + "current": "one", + }, + new: map[string]string{ + "current": "one", + "new": "another", + }, + }, + want: Changes[string]{ + additions: map[string]string{ + "new": "another", + }, + removals: map[string]string{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := Maps(tt.args.current, tt.args.new); !reflect.DeepEqual(got, tt.want) { + t.Errorf("DiffMaps() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index 1714f92657..2c0393f619 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -5,13 +5,14 @@ import ( "github.com/go-kit/log" "github.com/go-logr/logr" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" - allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) var ( @@ -59,7 +60,7 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C return m.manager.ApplyConfig(discoveryCfg) } -func (m *Manager) Watch(fn func(targets []allocation.TargetItem)) { +func (m *Manager) Watch(fn func(targets map[string]*allocation.TargetItem)) { log := m.log.WithValues("component", "opentelemetry-targetallocator") go func() { @@ -69,18 +70,19 @@ func (m *Manager) Watch(fn func(targets []allocation.TargetItem)) { log.Info("Service Discovery watch event stopped: discovery manager closed") return case tsets := <-m.manager.SyncCh(): - targets := []allocation.TargetItem{} + targets := map[string]*allocation.TargetItem{} for jobName, tgs := range tsets { var count float64 = 0 for _, tg := range tgs { for _, t := range tg.Targets { count++ - targets = append(targets, allocation.TargetItem{ + item := &allocation.TargetItem{ JobName: jobName, TargetURL: string(t[model.AddressLabel]), Label: t.Merge(tg.Labels), - }) + } + targets[item.Hash()] = item } } targetsDiscovered.WithLabelValues(jobName).Set(count) diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index 11e601f364..7edacde407 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -8,14 +8,15 @@ import ( "testing" gokitlog "github.com/go-kit/log" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" - allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" "github.com/prometheus/common/model" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/stretchr/testify/assert" ctrl "sigs.k8s.io/controller-runtime" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" + allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) var cfg config.Config @@ -32,7 +33,7 @@ func TestMain(m *testing.M) { manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger()) results = make(chan []string) - manager.Watch(func(targets []allocation.TargetItem) { + manager.Watch(func(targets map[string]*allocation.TargetItem) { var result []string for _, t := range targets { result = append(result, t.TargetURL) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index bd2bd0a469..80d8ec4743 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -12,15 +12,17 @@ import ( gokitlog "github.com/go-kit/log" "github.com/go-logr/logr" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + ctrl "sigs.k8s.io/controller-runtime" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" - ctrl "sigs.k8s.io/controller-runtime" ) var ( @@ -41,13 +43,22 @@ func main() { setupLog.Error(err, "Failed to parse parameters") os.Exit(1) } + cfg, err := config.Load(*cliConf.ConfigFilePath) + if err != nil { + setupLog.Error(err, "Unable to load configuration") + } cliConf.RootLogger.Info("Starting the Target Allocator") ctx := context.Background() log := ctrl.Log.WithName("allocator") - allocator := allocation.NewAllocator(log) + + allocator, err := allocation.New(cfg.GetAllocationStrategy(), log) + if err != nil { + setupLog.Error(err, "Unable to initialize allocation strategy") + os.Exit(1) + } watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator) if err != nil { setupLog.Error(err, "Can't start the watchers") @@ -60,7 +71,13 @@ func main() { defer discoveryManager.Close() discoveryManager.Watch(allocator.SetTargets) - srv, err := newServer(log, allocator, discoveryManager, cliConf) + k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf) + if err != nil { + setupLog.Error(err, "Can't start the k8s client") + os.Exit(1) + } + + srv, err := newServer(log, allocator, discoveryManager, k8sclient, cliConf.ListenAddr) if err != nil { setupLog.Error(err, "Can't start the server") } @@ -91,7 +108,7 @@ func main() { if err := srv.Shutdown(ctx); err != nil { setupLog.Error(err, "Cannot shutdown the server") } - srv, err = newServer(log, allocator, discoveryManager, cliConf) + srv, err = newServer(log, allocator, discoveryManager, k8sclient, cliConf.ListenAddr) if err != nil { setupLog.Error(err, "Error restarting the server with new config") } @@ -120,17 +137,13 @@ func main() { type server struct { logger logr.Logger - allocator *allocation.Allocator + allocator allocation.Allocator discoveryManager *lbdiscovery.Manager k8sClient *collector.Client server *http.Server } -func newServer(log logr.Logger, allocator *allocation.Allocator, discoveryManager *lbdiscovery.Manager, cliConf config.CLIConfig) (*server, error) { - k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf) - if err != nil { - return nil, err - } +func newServer(log logr.Logger, allocator allocation.Allocator, discoveryManager *lbdiscovery.Manager, k8sclient *collector.Client, listenAddr *string) (*server, error) { s := &server{ logger: log, allocator: allocator, @@ -142,11 +155,11 @@ func newServer(log logr.Logger, allocator *allocation.Allocator, discoveryManage router.HandleFunc("/jobs", s.JobHandler).Methods("GET") router.HandleFunc("/jobs/{job_id}/targets", s.TargetsHandler).Methods("GET") router.Path("/metrics").Handler(promhttp.Handler()) - s.server = &http.Server{Addr: *cliConf.ListenAddr, Handler: router} + s.server = &http.Server{Addr: *listenAddr, Handler: router} return s, nil } -func configureFileDiscovery(log logr.Logger, allocator *allocation.Allocator, discoveryManager *lbdiscovery.Manager, ctx context.Context, cliConfig config.CLIConfig) (*collector.Client, error) { +func configureFileDiscovery(log logr.Logger, allocator allocation.Allocator, discoveryManager *lbdiscovery.Manager, ctx context.Context, cliConfig config.CLIConfig) (*collector.Client, error) { cfg, err := config.Load(*cliConfig.ConfigFilePath) if err != nil { return nil, err @@ -180,7 +193,7 @@ func (s *server) Shutdown(ctx context.Context) error { func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) { displayData := make(map[string]allocation.LinkJSON) for _, v := range s.allocator.TargetItems() { - displayData[v.JobName] = allocation.LinkJSON{v.Link.Link} + displayData[v.JobName] = allocation.LinkJSON{Link: v.Link.Link} } jsonHandler(w, r, displayData) } @@ -201,7 +214,7 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem for _, v := range s.allocator.TargetItems() { - compareMap[v.Collector.Name+v.JobName] = append(compareMap[v.Collector.Name+v.JobName], *v) + compareMap[v.CollectorName+v.JobName] = append(compareMap[v.CollectorName+v.JobName], *v) } params := mux.Vars(r) jobId, err := url.QueryUnescape(params["job_id"]) diff --git a/cmd/otel-allocator/watcher/file.go b/cmd/otel-allocator/watcher/file.go index 262bb71bfb..6adc3c33e9 100644 --- a/cmd/otel-allocator/watcher/file.go +++ b/cmd/otel-allocator/watcher/file.go @@ -5,6 +5,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" ) diff --git a/cmd/otel-allocator/watcher/main.go b/cmd/otel-allocator/watcher/main.go index dd09ca1b09..dac48d5fb2 100644 --- a/cmd/otel-allocator/watcher/main.go +++ b/cmd/otel-allocator/watcher/main.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" ) @@ -11,7 +12,7 @@ import ( type Manager struct { Events chan Event Errors chan error - allocator *allocation.Allocator + allocator allocation.Allocator watchers []Watcher } @@ -44,7 +45,7 @@ func (e EventSource) String() string { return eventSourceToString[e] } -func NewWatcher(logger logr.Logger, config config.CLIConfig, allocator *allocation.Allocator) (*Manager, error) { +func NewWatcher(logger logr.Logger, config config.CLIConfig, allocator allocation.Allocator) (*Manager, error) { watcher := Manager{ allocator: allocator, Events: make(chan Event), diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index e5a0084416..b2f0bf3d0c 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -681,6 +681,10 @@ spec: description: TargetAllocator indicates a value which determines whether to spawn a target allocation resource or not. properties: + allocationStrategy: + description: AllocationStrategy determines which strategy the + target allocator should use for allocation + type: string enabled: description: Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not. diff --git a/docs/api.md b/docs/api.md index 57b8d78a9a..6dcb8b56ec 100644 --- a/docs/api.md +++ b/docs/api.md @@ -2869,6 +2869,13 @@ TargetAllocator indicates a value which determines whether to spawn a target all + allocationStrategy + string + + AllocationStrategy determines which strategy the target allocator should use for allocation
+ + false + enabled boolean diff --git a/pkg/collector/reconcile/configmap.go b/pkg/collector/reconcile/configmap.go index f24b626852..4c15e4296c 100644 --- a/pkg/collector/reconcile/configmap.go +++ b/pkg/collector/reconcile/configmap.go @@ -114,6 +114,11 @@ func desiredTAConfigMap(params Params) (corev1.ConfigMap, error) { "app.kubernetes.io/component": "opentelemetry-collector", } taConfig["config"] = promConfig + if len(params.Instance.Spec.TargetAllocator.AllocationStrategy) > 0 { + taConfig["allocation_strategy"] = params.Instance.Spec.TargetAllocator.AllocationStrategy + } else { + taConfig["allocation_strategy"] = "least-weighted" + } taConfigYAML, err := yaml.Marshal(taConfig) if err != nil { return corev1.ConfigMap{}, err diff --git a/pkg/collector/reconcile/configmap_test.go b/pkg/collector/reconcile/configmap_test.go index 05df6d79a3..bdc13d79ca 100644 --- a/pkg/collector/reconcile/configmap_test.go +++ b/pkg/collector/reconcile/configmap_test.go @@ -185,7 +185,8 @@ service: expectedLables["app.kubernetes.io/name"] = "test-targetallocator" expectedData := map[string]string{ - "targetallocator.yaml": `config: + "targetallocator.yaml": `allocation_strategy: least-weighted +config: scrape_configs: - job_name: otel-collector scrape_interval: 10s @@ -324,6 +325,7 @@ func TestExpectedConfigMap(t *testing.T) { "app.kubernetes.io/component": "opentelemetry-collector", } taConfig["config"] = parmConfig + taConfig["allocation_strategy"] = "least-weighted" taConfigYAML, _ := yaml.Marshal(taConfig) assert.Equal(t, string(taConfigYAML), actual.Data["targetallocator.yaml"])