From 5e84673c1682089a6930fc95da96d5604381a9e8 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 7 Sep 2022 11:43:57 -0400 Subject: [PATCH 1/7] Cribbed from another branch --- apis/v1alpha1/opentelemetrycollector_types.go | 2 + apis/v1alpha1/zz_generated.deepcopy.go | 2 +- ...ntelemetry.io_opentelemetrycollectors.yaml | 4 ++ cmd/otel-allocator/config/config.go | 5 +- cmd/otel-allocator/utility/utility.go | 36 +++++++++++ cmd/otel-allocator/utility/utility_test.go | 63 +++++++++++++++++++ ...ntelemetry.io_opentelemetrycollectors.yaml | 4 ++ docs/api.md | 7 +++ pkg/collector/reconcile/configmap.go | 5 ++ pkg/collector/reconcile/configmap_test.go | 4 +- 10 files changed, 128 insertions(+), 4 deletions(-) create mode 100644 cmd/otel-allocator/utility/utility.go create mode 100644 cmd/otel-allocator/utility/utility_test.go diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 747736361f..ac79522da0 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -108,6 +108,8 @@ type OpenTelemetryCollectorSpec struct { // OpenTelemetryTargetAllocator defines the configurations for the Prometheus target allocator. type OpenTelemetryTargetAllocator struct { + // AllocationStrategy determines which strategy the target allocator should use for allocation + AllocationStrategy string `json:"allocationStrategy,omitempty"` // ServiceAccount indicates the name of an existing service account to use with this instance. // +optional ServiceAccount string `json:"serviceAccount,omitempty"` diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index cba3b24b99..776bebf9fd 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -20,7 +20,7 @@ package v1alpha1 import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index da91c7ce09..01dc8f2e2b 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -683,6 +683,10 @@ spec: description: TargetAllocator indicates a value which determines whether to spawn a target allocation resource or not. properties: + allocationStrategy: + description: AllocationStrategy determines which strategy the + target allocator should use for allocation + type: string enabled: description: Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not. diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 4be3bdf3c2..97d729ac09 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -31,8 +31,9 @@ const DefaultResyncTime = 5 * time.Minute const DefaultConfigFilePath string = "/conf/targetallocator.yaml" type Config struct { - LabelSelector map[string]string `yaml:"label_selector,omitempty"` - Config *promconfig.Config `yaml:"config"` + LabelSelector map[string]string `yaml:"label_selector,omitempty"` + Config *promconfig.Config `yaml:"config"` + AllocationStrategy *string `yaml:"allocation_strategy,omitempty"` } type PrometheusCRWatcherConfig struct { diff --git a/cmd/otel-allocator/utility/utility.go b/cmd/otel-allocator/utility/utility.go new file mode 100644 index 0000000000..30d26d755c --- /dev/null +++ b/cmd/otel-allocator/utility/utility.go @@ -0,0 +1,36 @@ +package utility + +type Changes[T any] struct { + additions map[string]T + removals map[string]T +} + +func (c Changes[T]) Additions() map[string]T { + return c.additions +} + +func (c Changes[T]) Removals() map[string]T { + return c.removals +} + +func DiffMaps[T any](current, new map[string]T) Changes[T] { + additions := map[string]T{} + removals := map[string]T{} + // Used as a set to check for removed items + newMembership := map[string]bool{} + for key, value := range new { + if _, found := current[key]; !found { + additions[key] = value + } + newMembership[key] = true + } + for key, value := range current { + if _, found := newMembership[key]; !found { + removals[key] = value + } + } + return Changes[T]{ + additions: additions, + removals: removals, + } +} diff --git a/cmd/otel-allocator/utility/utility_test.go b/cmd/otel-allocator/utility/utility_test.go new file mode 100644 index 0000000000..47943b97eb --- /dev/null +++ b/cmd/otel-allocator/utility/utility_test.go @@ -0,0 +1,63 @@ +package utility + +import ( + "reflect" + "testing" +) + +func TestDiffMaps(t *testing.T) { + type args struct { + current map[string]string + new map[string]string + } + tests := []struct { + name string + args args + want Changes[string] + }{ + { + name: "basic replacement", + args: args{ + current: map[string]string{ + "current": "one", + }, + new: map[string]string{ + "new": "another", + }, + }, + want: Changes[string]{ + additions: map[string]string{ + "new": "another", + }, + removals: map[string]string{ + "current": "one", + }, + }, + }, + { + name: "single addition", + args: args{ + current: map[string]string{ + "current": "one", + }, + new: map[string]string{ + "current": "one", + "new": "another", + }, + }, + want: Changes[string]{ + additions: map[string]string{ + "new": "another", + }, + removals: map[string]string{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := DiffMaps(tt.args.current, tt.args.new); !reflect.DeepEqual(got, tt.want) { + t.Errorf("DiffMaps() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index e5a0084416..b2f0bf3d0c 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -681,6 +681,10 @@ spec: description: TargetAllocator indicates a value which determines whether to spawn a target allocation resource or not. properties: + allocationStrategy: + description: AllocationStrategy determines which strategy the + target allocator should use for allocation + type: string enabled: description: Enabled indicates whether to use a target allocation mechanism for Prometheus targets or not. diff --git a/docs/api.md b/docs/api.md index 57b8d78a9a..6dcb8b56ec 100644 --- a/docs/api.md +++ b/docs/api.md @@ -2869,6 +2869,13 @@ TargetAllocator indicates a value which determines whether to spawn a target all + allocationStrategy + string + + AllocationStrategy determines which strategy the target allocator should use for allocation
+ + false + enabled boolean diff --git a/pkg/collector/reconcile/configmap.go b/pkg/collector/reconcile/configmap.go index f24b626852..4c15e4296c 100644 --- a/pkg/collector/reconcile/configmap.go +++ b/pkg/collector/reconcile/configmap.go @@ -114,6 +114,11 @@ func desiredTAConfigMap(params Params) (corev1.ConfigMap, error) { "app.kubernetes.io/component": "opentelemetry-collector", } taConfig["config"] = promConfig + if len(params.Instance.Spec.TargetAllocator.AllocationStrategy) > 0 { + taConfig["allocation_strategy"] = params.Instance.Spec.TargetAllocator.AllocationStrategy + } else { + taConfig["allocation_strategy"] = "least-weighted" + } taConfigYAML, err := yaml.Marshal(taConfig) if err != nil { return corev1.ConfigMap{}, err diff --git a/pkg/collector/reconcile/configmap_test.go b/pkg/collector/reconcile/configmap_test.go index 05df6d79a3..bdc13d79ca 100644 --- a/pkg/collector/reconcile/configmap_test.go +++ b/pkg/collector/reconcile/configmap_test.go @@ -185,7 +185,8 @@ service: expectedLables["app.kubernetes.io/name"] = "test-targetallocator" expectedData := map[string]string{ - "targetallocator.yaml": `config: + "targetallocator.yaml": `allocation_strategy: least-weighted +config: scrape_configs: - job_name: otel-collector scrape_interval: 10s @@ -324,6 +325,7 @@ func TestExpectedConfigMap(t *testing.T) { "app.kubernetes.io/component": "opentelemetry-collector", } taConfig["config"] = parmConfig + taConfig["allocation_strategy"] = "least-weighted" taConfigYAML, _ := yaml.Marshal(taConfig) assert.Equal(t, string(taConfigYAML), actual.Data["targetallocator.yaml"]) From be870dbaa8020383ab1299dfc0cc0999e2f07f90 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 7 Sep 2022 13:36:32 -0400 Subject: [PATCH 2/7] Refactor part 2 --- apis/v1alpha1/zz_generated.deepcopy.go | 2 +- cmd/otel-allocator/allocation/allocator.go | 237 ------------------ cmd/otel-allocator/allocation/http.go | 42 ++-- cmd/otel-allocator/allocation/http_test.go | 79 +++--- .../allocation/least_weighted/allocator.go | 210 ++++++++++++++++ .../{ => least_weighted}/allocator_test.go | 2 +- .../allocation/strategy/state.go | 91 +++++++ .../allocation/strategy/strategy.go | 45 ++++ cmd/otel-allocator/collector/collector.go | 41 +-- .../collector/collector_test.go | 31 ++- cmd/otel-allocator/discovery/discovery.go | 14 +- .../discovery/discovery_test.go | 10 +- cmd/otel-allocator/main.go | 55 ++-- cmd/otel-allocator/watcher/file.go | 1 + cmd/otel-allocator/watcher/main.go | 7 +- 15 files changed, 482 insertions(+), 385 deletions(-) delete mode 100644 cmd/otel-allocator/allocation/allocator.go create mode 100644 cmd/otel-allocator/allocation/least_weighted/allocator.go rename cmd/otel-allocator/allocation/{ => least_weighted}/allocator_test.go (99%) create mode 100644 cmd/otel-allocator/allocation/strategy/state.go create mode 100644 cmd/otel-allocator/allocation/strategy/strategy.go diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 776bebf9fd..cba3b24b99 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -20,7 +20,7 @@ package v1alpha1 import ( - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go deleted file mode 100644 index be4d82d3eb..0000000000 --- a/cmd/otel-allocator/allocation/allocator.go +++ /dev/null @@ -1,237 +0,0 @@ -package allocation - -import ( - "fmt" - "net/url" - "sync" - - "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/common/model" -) - -var ( - collectorsAllocatable = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_collectors_allocatable", - Help: "Number of collectors the allocator is able to allocate to.", - }) - targetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_targets_per_collector", - Help: "The number of targets for each collector.", - }, []string{"collector_name"}) - timeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "opentelemetry_allocator_time_to_allocate", - Help: "The time it takes to allocate", - }, []string{"method"}) -) - -/* - Load balancer will serve on an HTTP server exposing /jobs//targets - The targets are allocated using the least connection method - Load balancer will need information about the collectors in order to set the URLs - Keep a Map of what each collector currently holds and update it based on new scrape target updates -*/ - -type TargetItem struct { - JobName string - Link LinkJSON - TargetURL string - Label model.LabelSet - Collector *collector -} - -func (t TargetItem) hash() string { - return t.JobName + t.TargetURL + t.Label.Fingerprint().String() -} - -// Create a struct that holds collector - and jobs for that collector -// This struct will be parsed into endpoint with collector and jobs info - -type collector struct { - Name string - NumTargets int -} - -// Allocator makes decisions to distribute work among -// a number of OpenTelemetry collectors based on the number of targets. -// Users need to call SetTargets when they have new targets in their -// clusters and call SetCollectors when the collectors have changed. -type Allocator struct { - // m protects collectors and targetItems for concurrent use. - m sync.RWMutex - collectors map[string]*collector // all current collectors - targetItems map[string]*TargetItem - - log logr.Logger -} - -// TargetItems returns a shallow copy of the targetItems map. -func (allocator *Allocator) TargetItems() map[string]*TargetItem { - allocator.m.RLock() - defer allocator.m.RUnlock() - targetItemsCopy := make(map[string]*TargetItem) - for k, v := range allocator.targetItems { - targetItemsCopy[k] = v - } - return targetItemsCopy -} - -// Collectors returns a shallow copy of the collectors map. -func (allocator *Allocator) Collectors() map[string]*collector { - allocator.m.RLock() - defer allocator.m.RUnlock() - collectorsCopy := make(map[string]*collector) - for k, v := range allocator.collectors { - collectorsCopy[k] = v - } - return collectorsCopy -} - -// findNextCollector finds the next collector with fewer number of targets. -// This method is called from within SetTargets and SetCollectors, whose caller -// acquires the needed lock. -func (allocator *Allocator) findNextCollector() *collector { - var col *collector - for _, v := range allocator.collectors { - // If the initial collector is empty, set the initial collector to the first element of map - if col == nil { - col = v - } else { - if v.NumTargets < col.NumTargets { - col = v - } - } - } - return col -} - -// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems -// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. -// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap -func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) { - chosenCollector := allocator.findNextCollector() - targetItem := TargetItem{ - JobName: target.JobName, - Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, - TargetURL: target.TargetURL, - Label: target.Label, - Collector: chosenCollector, - } - allocator.targetItems[targetItem.hash()] = &targetItem - chosenCollector.NumTargets++ - targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) -} - -// getCollectorChanges returns the new and removed collectors respectively. -// This method is called from within SetCollectors, which acquires the needed lock. -func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) { - var newCollectors []string - var removedCollectors []string - // Used as a set to check for removed collectors - tempCollectorMap := map[string]bool{} - for _, s := range collectors { - if _, found := allocator.collectors[s]; !found { - newCollectors = append(newCollectors, s) - } - tempCollectorMap[s] = true - } - for k := range allocator.collectors { - if _, found := tempCollectorMap[k]; !found { - removedCollectors = append(removedCollectors, k) - } - } - return newCollectors, removedCollectors -} - -// SetTargets accepts a list of targets that will be used to make -// load balancing decisions. This method should be called when there are -// new targets discovered or existing targets are shutdown. -func (allocator *Allocator) SetTargets(targets []TargetItem) { - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets")) - defer timer.ObserveDuration() - - allocator.m.Lock() - defer allocator.m.Unlock() - - // Make the temp map for access - tempTargetMap := make(map[string]TargetItem, len(targets)) - for _, target := range targets { - tempTargetMap[target.hash()] = target - } - - // Check for removals - for k, target := range allocator.targetItems { - // if the old target is no longer in the new list, remove it - if _, ok := tempTargetMap[k]; !ok { - allocator.collectors[target.Collector.Name].NumTargets-- - delete(allocator.targetItems, k) - targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets)) - } - } - - // Check for additions - for k, target := range tempTargetMap { - // Do nothing if the item is already there - if _, ok := allocator.targetItems[k]; ok { - continue - } else { - // Assign new set of collectors with the one different name - allocator.addTargetToTargetItems(&target) - } - } -} - -// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. -// This method is called when Collectors are added or removed. -func (allocator *Allocator) SetCollectors(collectors []string) { - log := allocator.log.WithValues("component", "opentelemetry-targetallocator") - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors")) - defer timer.ObserveDuration() - - collectorsAllocatable.Set(float64(len(collectors))) - if len(collectors) == 0 { - log.Info("No collector instances present") - return - } - - allocator.m.Lock() - defer allocator.m.Unlock() - newCollectors, removedCollectors := allocator.getCollectorChanges(collectors) - if len(newCollectors) == 0 && len(removedCollectors) == 0 { - log.Info("No changes to the collectors found") - return - } - - // Clear existing collectors - for _, k := range removedCollectors { - delete(allocator.collectors, k) - targetsPerCollector.WithLabelValues(k).Set(0) - } - // Insert the new collectors - for _, i := range newCollectors { - allocator.collectors[i] = &collector{Name: i, NumTargets: 0} - } - - // find targets which need to be redistributed - var redistribute []*TargetItem - for _, item := range allocator.targetItems { - for _, s := range removedCollectors { - if item.Collector.Name == s { - redistribute = append(redistribute, item) - } - } - } - // Re-Allocate the existing targets - for _, item := range redistribute { - allocator.addTargetToTargetItems(item) - } -} - -func NewAllocator(log logr.Logger) *Allocator { - return &Allocator{ - log: log, - collectors: make(map[string]*collector), - targetItems: make(map[string]*TargetItem), - } -} diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index ba2602fff9..d59db9b451 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -4,55 +4,43 @@ import ( "fmt" "net/url" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/prometheus/common/model" ) -type LinkJSON struct { - Link string `json:"_link"` -} - -type collectorJSON struct { - Link string `json:"_link"` - Jobs []targetGroupJSON `json:"targets"` -} - -type targetGroupJSON struct { - Targets []string `json:"targets"` - Labels model.LabelSet `json:"labels"` -} - -func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *Allocator) map[string]collectorJSON { - displayData := make(map[string]collectorJSON) +func GetAllTargetsByJob(job string, cMap map[string][]strategy.TargetItem, allocator strategy.Allocator) map[string]strategy.CollectorJSON { + displayData := make(map[string]strategy.CollectorJSON) for _, j := range allocator.TargetItems() { if j.JobName == job { - var targetList []TargetItem - targetList = append(targetList, cMap[j.Collector.Name+j.JobName]...) + var targetList []strategy.TargetItem + targetList = append(targetList, cMap[j.CollectorName+j.JobName]...) - var targetGroupList []targetGroupJSON + var targetGroupList []strategy.TargetGroupJSON for _, t := range targetList { - targetGroupList = append(targetGroupList, targetGroupJSON{ + targetGroupList = append(targetGroupList, strategy.TargetGroupJSON{ Targets: []string{t.TargetURL}, Labels: t.Label, }) } - displayData[j.Collector.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.Collector.Name), Jobs: targetGroupList} + displayData[j.CollectorName] = strategy.CollectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList} } } return displayData } -func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator *Allocator) []targetGroupJSON { - var tgs []targetGroupJSON +func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]strategy.TargetItem, allocator strategy.Allocator) []strategy.TargetGroupJSON { + var tgs []strategy.TargetGroupJSON group := make(map[string]string) labelSet := make(map[string]model.LabelSet) - for _, col := range allocator.Collectors() { - if col.Name == collector { + for colName, _ := range allocator.Collectors() { + if colName == collector { for _, targetItemArr := range cMap { for _, targetItem := range targetItemArr { - if targetItem.Collector.Name == collector && targetItem.JobName == job { + if targetItem.CollectorName == collector && targetItem.JobName == job { group[targetItem.Label.String()] = targetItem.TargetURL labelSet[targetItem.TargetURL] = targetItem.Label } @@ -62,7 +50,7 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin } for _, v := range group { - tgs = append(tgs, targetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) + tgs = append(tgs, strategy.TargetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) } return tgs diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index ed62c1113f..d1476a8e08 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -6,30 +6,36 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + _ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/least_weighted" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" ) +var logger = logf.Log.WithName("unit-tests") + func TestGetAllTargetsByCollectorAndJob(t *testing.T) { - baseAllocator := NewAllocator(logger) - baseAllocator.SetCollectors([]string{"test-collector"}) - statefulAllocator := NewAllocator(logger) - statefulAllocator.SetCollectors([]string{"test-collector-0"}) + baseAllocator, _ := strategy.New("least-weighted", logger) + baseAllocator.SetCollectors(map[string]*strategy.Collector{"test-collector": {Name: "test-collector"}}) + statefulAllocator, _ := strategy.New("least-weighted", logger) + statefulAllocator.SetCollectors(map[string]*strategy.Collector{"test-collector-0": {Name: "test-collector-0"}}) type args struct { collector string job string - cMap map[string][]TargetItem - allocator *Allocator + cMap map[string][]strategy.TargetItem + allocator strategy.Allocator } var tests = []struct { name string args args - want []targetGroupJSON + want []strategy.TargetGroupJSON }{ { name: "Empty target map", args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{}, + cMap: map[string][]strategy.TargetItem{}, allocator: baseAllocator, }, want: nil, @@ -39,24 +45,21 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]strategy.TargetItem{ "test-collectortest-job": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, }, allocator: baseAllocator, }, - want: []targetGroupJSON{ + want: []strategy.TargetGroupJSON{ { Targets: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ @@ -70,37 +73,31 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]strategy.TargetItem{ "test-collectortest-job": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, "test-collectortest-job2": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job2", Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", - Collector: &collector{ - Name: "test-collector", - NumTargets: 1, - }, + TargetURL: "test-url", + CollectorName: "test-collector", }, }, }, allocator: baseAllocator, }, - want: []targetGroupJSON{ + want: []strategy.TargetGroupJSON{ { Targets: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ @@ -114,38 +111,32 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]TargetItem{ + cMap: map[string][]strategy.TargetItem{ "test-collectortest-job": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", "foo": "bar", }, - TargetURL: "test-url1", - Collector: &collector{ - Name: "test-collector", - NumTargets: 2, - }, + TargetURL: "test-url1", + CollectorName: "test-collector", }, }, "test-collectortest-job2": { - TargetItem{ + strategy.TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url2", - Collector: &collector{ - Name: "test-collector", - NumTargets: 2, - }, + TargetURL: "test-url2", + CollectorName: "test-collector", }, }, }, allocator: baseAllocator, }, - want: []targetGroupJSON{ + want: []strategy.TargetGroupJSON{ { Targets: []string{"test-url1"}, Labels: map[model.LabelName]model.LabelValue{ diff --git a/cmd/otel-allocator/allocation/least_weighted/allocator.go b/cmd/otel-allocator/allocation/least_weighted/allocator.go new file mode 100644 index 0000000000..e34be81e25 --- /dev/null +++ b/cmd/otel-allocator/allocation/least_weighted/allocator.go @@ -0,0 +1,210 @@ +package least_weighted + +import ( + "fmt" + "net/url" + "os" + "sync" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/utility" + + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + collectorsAllocatable = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_collectors_allocatable", + Help: "Number of collectors the allocator is able to allocate to.", + }) + timeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "opentelemetry_allocator_time_to_allocate", + Help: "The time it takes to allocate", + }, []string{"method"}) + _ strategy.Allocator = &LeastWeightedAllocator{} +) + +/* + Load balancer will serve on an HTTP server exposing /jobs//targets + The targets are allocated using the least connection method + Load balancer will need information about the collectors in order to set the URLs + Keep a Map of what each collector currently holds and update it based on new scrape target updates +*/ + +// LeastWeightedAllocator makes decisions to distribute work among +// a number of OpenTelemetry collectors based on the number of targets. +// Users need to call SetTargets when they have new targets in their +// clusters and call SetCollectors when the collectors have changed. +type LeastWeightedAllocator struct { + // m protects collectors and targetItems for concurrent use. + m sync.RWMutex + state strategy.State + + log logr.Logger +} + +// TargetItems returns a shallow copy of the targetItems map. +func (allocator *LeastWeightedAllocator) TargetItems() map[string]*strategy.TargetItem { + allocator.m.RLock() + defer allocator.m.RUnlock() + targetItemsCopy := make(map[string]*strategy.TargetItem) + for k, v := range allocator.state.TargetItems() { + targetItemsCopy[k] = v + } + return targetItemsCopy +} + +// Collectors returns a shallow copy of the collectors map. +func (allocator *LeastWeightedAllocator) Collectors() map[string]*strategy.Collector { + allocator.m.RLock() + defer allocator.m.RUnlock() + collectorsCopy := make(map[string]*strategy.Collector) + for k, v := range allocator.state.Collectors() { + collectorsCopy[k] = v + } + return collectorsCopy +} + +// findNextCollector finds the next collector with fewer number of targets. +// This method is called from within SetTargets and SetCollectors, whose caller +// acquires the needed lock. +func (allocator *LeastWeightedAllocator) findNextCollector() *strategy.Collector { + var col *strategy.Collector + for _, v := range allocator.state.Collectors() { + // If the initial collector is empty, set the initial collector to the first element of map + if col == nil { + col = v + } else { + if v.NumTargets < col.NumTargets { + col = v + } + } + } + return col +} + +// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems +// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. +// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap +func (allocator *LeastWeightedAllocator) addTargetToTargetItems(target *strategy.TargetItem) { + chosenCollector := allocator.findNextCollector() + targetItem := &strategy.TargetItem{ + JobName: target.JobName, + Link: strategy.LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, + TargetURL: target.TargetURL, + Label: target.Label, + CollectorName: chosenCollector.Name, + } + allocator.state.SetTargetItem(targetItem.Hash(), targetItem) + chosenCollector.NumTargets++ + strategy.TargetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) +} + +func (allocator *LeastWeightedAllocator) handleTargets(diff utility.Changes[*strategy.TargetItem]) { + // Check for removals + for k, target := range allocator.state.TargetItems() { + // if the current target is in the removals list + if _, ok := diff.Removals()[k]; ok { + c := allocator.state.Collectors()[target.CollectorName] + c.NumTargets-- + allocator.state.RemoveTargetItem(k) + strategy.TargetsPerCollector.WithLabelValues(target.CollectorName).Set(float64(c.NumTargets)) + } + } + + // Check for additions + for k, target := range diff.Additions() { + // Do nothing if the item is already there + if _, ok := allocator.TargetItems()[k]; ok { + continue + } else { + // Assign new set of collectors with the one different name + allocator.addTargetToTargetItems(target) + } + } +} + +func (allocator *LeastWeightedAllocator) handleCollectors(diff utility.Changes[*strategy.Collector]) { + // Clear existing collectors + for _, k := range diff.Removals() { + allocator.state.RemoveCollector(k.Name) + strategy.TargetsPerCollector.WithLabelValues(k.Name).Set(0) + } + // Insert the new collectors + for _, i := range diff.Additions() { + allocator.state.SetCollector(i.Name, &strategy.Collector{Name: i.Name, NumTargets: 0}) + } + + // find targets which need to be redistributed + var redistribute []*strategy.TargetItem + for _, item := range allocator.state.TargetItems() { + for _, s := range diff.Removals() { + if item.CollectorName == s.Name { + redistribute = append(redistribute, item) + } + } + } + // Re-Allocate the existing targets + for _, item := range redistribute { + allocator.addTargetToTargetItems(item) + } +} + +// SetTargets accepts a list of targets that will be used to make +// load balancing decisions. This method should be called when there are +// new targets discovered or existing targets are shutdown. +func (allocator *LeastWeightedAllocator) SetTargets(targets map[string]*strategy.TargetItem) { + timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets")) + defer timer.ObserveDuration() + + allocator.m.Lock() + defer allocator.m.Unlock() + + // Check for target changes + targetsDiff := utility.DiffMaps(allocator.TargetItems(), targets) + // If there are any additions or removals + if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { + allocator.handleTargets(targetsDiff) + } + return +} + +// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. +// This method is called when Collectors are added or removed. +func (allocator *LeastWeightedAllocator) SetCollectors(collectors map[string]*strategy.Collector) { + log := allocator.log.WithValues("component", "opentelemetry-targetallocator") + timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors")) + defer timer.ObserveDuration() + + collectorsAllocatable.Set(float64(len(collectors))) + if len(collectors) == 0 { + log.Info("No collector instances present") + return + } + + allocator.m.Lock() + defer allocator.m.Unlock() + + // Check for collector changes + collectorsDiff := utility.DiffMaps(allocator.Collectors(), collectors) + if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { + allocator.handleCollectors(collectorsDiff) + } + return +} + +func NewAllocator(log logr.Logger) strategy.Allocator { + return &LeastWeightedAllocator{ + log: log, + state: strategy.NewState(make(map[string]*strategy.Collector), make(map[string]*strategy.TargetItem)), + } +} + +func init() { + err := strategy.Register("least-weighted", NewAllocator) + if err != nil { + os.Exit(1) + } +} diff --git a/cmd/otel-allocator/allocation/allocator_test.go b/cmd/otel-allocator/allocation/least_weighted/allocator_test.go similarity index 99% rename from cmd/otel-allocator/allocation/allocator_test.go rename to cmd/otel-allocator/allocation/least_weighted/allocator_test.go index 0b754cb47a..c2a095adb3 100644 --- a/cmd/otel-allocator/allocation/allocator_test.go +++ b/cmd/otel-allocator/allocation/least_weighted/allocator_test.go @@ -1,4 +1,4 @@ -package allocation +package least_weighted import ( "math" diff --git a/cmd/otel-allocator/allocation/strategy/state.go b/cmd/otel-allocator/allocation/strategy/state.go new file mode 100644 index 0000000000..5177d8f52e --- /dev/null +++ b/cmd/otel-allocator/allocation/strategy/state.go @@ -0,0 +1,91 @@ +package strategy + +import ( + "fmt" + "net/url" + + "github.com/prometheus/common/model" +) + +type LinkJSON struct { + Link string `json:"_link"` +} + +type CollectorJSON struct { + Link string `json:"_link"` + Jobs []TargetGroupJSON `json:"targets"` +} + +type TargetGroupJSON struct { + Targets []string `json:"targets"` + Labels model.LabelSet `json:"labels"` +} + +type TargetItem struct { + JobName string + Link LinkJSON + TargetURL string + Label model.LabelSet + CollectorName string +} + +func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) TargetItem { + return TargetItem{ + JobName: jobName, + Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, + TargetURL: targetURL, + Label: label, + CollectorName: collectorName, + } +} + +func (t TargetItem) Hash() string { + return t.JobName + t.TargetURL + t.Label.Fingerprint().String() +} + +// Collector Creates a struct that holds Collector information +// This struct will be parsed into endpoint with Collector and jobs info +// This struct can be extended with information like annotations and labels in the future +type Collector struct { + Name string + NumTargets int +} + +func NewCollector(name string) *Collector { + return &Collector{Name: name} +} + +type State struct { + // collectors is a map from a Collector's name to a Collector instance + collectors map[string]*Collector + // targetItems is a map from a target item's hash to the target items allocated state + targetItems map[string]*TargetItem +} + +func (s State) Collectors() map[string]*Collector { + return s.collectors +} + +func (s State) TargetItems() map[string]*TargetItem { + return s.targetItems +} + +func (s State) SetTargetItem(key string, value *TargetItem) { + s.targetItems[key] = value +} + +func (s State) SetCollector(key string, value *Collector) { + s.collectors[key] = value +} + +func (s State) RemoveCollector(key string) { + delete(s.collectors, key) +} + +func (s State) RemoveTargetItem(key string) { + delete(s.targetItems, key) +} + +func NewState(collectors map[string]*Collector, targetItems map[string]*TargetItem) State { + return State{collectors: collectors, targetItems: targetItems} +} diff --git a/cmd/otel-allocator/allocation/strategy/strategy.go b/cmd/otel-allocator/allocation/strategy/strategy.go new file mode 100644 index 0000000000..11b6f256ef --- /dev/null +++ b/cmd/otel-allocator/allocation/strategy/strategy.go @@ -0,0 +1,45 @@ +package strategy + +import ( + "errors" + "fmt" + + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type AllocatorProvider func(log logr.Logger) Allocator + +var ( + registry = map[string]AllocatorProvider{} + + // TargetsPerCollector records how many targets have been assigned to each collector + // It is currently the responsibility of the strategy to track this information. + TargetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_targets_per_collector", + Help: "The number of targets for each collector.", + }, []string{"collector_name"}) +) + +func New(name string, log logr.Logger) (Allocator, error) { + if p, ok := registry[name]; ok { + return p(log), nil + } + return nil, errors.New(fmt.Sprintf("unregistered strategy: %s", name)) +} + +func Register(name string, provider AllocatorProvider) error { + if _, ok := registry[name]; ok { + return errors.New("already registered") + } + registry[name] = provider + return nil +} + +type Allocator interface { + SetCollectors(collectors map[string]*Collector) + SetTargets(targets map[string]*TargetItem) + TargetItems() map[string]*TargetItem + Collectors() map[string]*Collector +} diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index 7732d42f08..a4ebef8e6c 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -6,6 +6,8 @@ import ( "strconv" "time" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -30,10 +32,9 @@ var ( ) type Client struct { - log logr.Logger - k8sClient kubernetes.Interface - collectorChan chan []string - close chan struct{} + log logr.Logger + k8sClient kubernetes.Interface + close chan struct{} } func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) { @@ -49,8 +50,8 @@ func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) { }, nil } -func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors []string)) { - collectorMap := map[string]bool{} +func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors map[string]*strategy.Collector)) { + collectorMap := map[string]*strategy.Collector{} log := k.log.WithValues("component", "opentelemetry-targetallocator") opts := metav1.ListOptions{ @@ -64,17 +65,11 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( for i := range pods.Items { pod := pods.Items[i] if pod.GetObjectMeta().GetDeletionTimestamp() == nil { - collectorMap[pod.Name] = true + collectorMap[pod.Name] = strategy.NewCollector(pod.Name) } } - collectorKeys := make([]string, len(collectorMap)) - i := 0 - for keys := range collectorMap { - collectorKeys[i] = keys - i++ - } - fn(collectorKeys) + fn(collectorMap) go func() { for { @@ -92,7 +87,7 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( }() } -func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]bool, fn func(collectors []string)) string { +func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]*strategy.Collector, fn func(collectors map[string]*strategy.Collector)) string { log := k.log.WithValues("component", "opentelemetry-targetallocator") for { collectorsDiscovered.Set(float64(len(collectorMap))) @@ -115,23 +110,11 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap switch event.Type { case watch.Added: - collectorMap[pod.Name] = true + collectorMap[pod.Name] = strategy.NewCollector(pod.Name) case watch.Deleted: delete(collectorMap, pod.Name) } - - collectorKeys := make([]string, len(collectorMap)) - i := 0 - for keys := range collectorMap { - collectorKeys[i] = keys - i++ - } - fn(collectorKeys) - select { - case k.collectorChan <- collectorKeys: - default: - fn(collectorKeys) - } + fn(collectorMap) case <-time.After(watcherTimeout): log.Info("Restarting watch routine") return "" diff --git a/cmd/otel-allocator/collector/collector_test.go b/cmd/otel-allocator/collector/collector_test.go index 5ba5b0fb1c..ff13f003ca 100644 --- a/cmd/otel-allocator/collector/collector_test.go +++ b/cmd/otel-allocator/collector/collector_test.go @@ -4,9 +4,10 @@ import ( "context" "fmt" "os" - "sort" "testing" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,13 +16,12 @@ import ( ) var client Client -var collectors = []string{} +var collectors = map[string]*strategy.Collector{} func TestMain(m *testing.M) { client = Client{ - k8sClient: fake.NewSimpleClientset(), - collectorChan: make(chan []string, 3), - close: make(chan struct{}), + k8sClient: fake.NewSimpleClientset(), + close: make(chan struct{}), } labelMap := map[string]string{ @@ -39,7 +39,7 @@ func TestMain(m *testing.M) { os.Exit(1) } - go runWatch(context.Background(), &client, watcher.ResultChan(), map[string]bool{}, func(collectorList []string) { getCollectors(collectorList) }) + go runWatch(context.Background(), &client, watcher.ResultChan(), map[string]*strategy.Collector{}, func(colMap map[string]*strategy.Collector) { getCollectors(colMap) }) code := m.Run() @@ -49,18 +49,25 @@ func TestMain(m *testing.M) { } func TestWatchPodAddition(t *testing.T) { - expected := []string{"test-pod1", "test-pod2", "test-pod3"} + expected := map[string]*strategy.Collector{ + "test-pod1": { + Name: "test-pod1", + }, + "test-pod2": { + Name: "test-pod2", + }, + "test-pod3": { + Name: "test-pod3", + }, + } for _, k := range []string{"test-pod1", "test-pod2", "test-pod3"} { expected := pod(k) _, err := client.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), expected, metav1.CreateOptions{}) assert.NoError(t, err) - collectors = <-client.collectorChan } assert.Len(t, collectors, 3) - - sort.Strings(collectors) assert.Equal(t, collectors, expected) } @@ -70,16 +77,14 @@ func TestWatchPodDeletion(t *testing.T) { for _, k := range []string{"test-pod2", "test-pod3"} { err := client.k8sClient.CoreV1().Pods("test-ns").Delete(context.Background(), k, metav1.DeleteOptions{}) assert.NoError(t, err) - collectors = <-client.collectorChan } assert.Len(t, collectors, 1) - sort.Strings(collectors) assert.Equal(t, collectors, expected) } -func getCollectors(c []string) { +func getCollectors(c map[string]*strategy.Collector) { collectors = c } diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index 1714f92657..003c623404 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -5,13 +5,14 @@ import ( "github.com/go-kit/log" "github.com/go-logr/logr" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" - allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) var ( @@ -59,7 +60,7 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C return m.manager.ApplyConfig(discoveryCfg) } -func (m *Manager) Watch(fn func(targets []allocation.TargetItem)) { +func (m *Manager) Watch(fn func(targets map[string]*strategy.TargetItem)) { log := m.log.WithValues("component", "opentelemetry-targetallocator") go func() { @@ -69,18 +70,19 @@ func (m *Manager) Watch(fn func(targets []allocation.TargetItem)) { log.Info("Service Discovery watch event stopped: discovery manager closed") return case tsets := <-m.manager.SyncCh(): - targets := []allocation.TargetItem{} + targets := map[string]*strategy.TargetItem{} for jobName, tgs := range tsets { var count float64 = 0 for _, tg := range tgs { for _, t := range tg.Targets { count++ - targets = append(targets, allocation.TargetItem{ + item := &strategy.TargetItem{ JobName: jobName, TargetURL: string(t[model.AddressLabel]), Label: t.Merge(tg.Labels), - }) + } + targets[item.Hash()] = item } } targetsDiscovered.WithLabelValues(jobName).Set(count) diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index 11e601f364..bcb71deeae 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -7,15 +7,17 @@ import ( "sort" "testing" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + gokitlog "github.com/go-kit/log" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" - allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" "github.com/prometheus/common/model" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/stretchr/testify/assert" ctrl "sigs.k8s.io/controller-runtime" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" + allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) var cfg config.Config @@ -32,7 +34,7 @@ func TestMain(m *testing.M) { manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger()) results = make(chan []string) - manager.Watch(func(targets []allocation.TargetItem) { + manager.Watch(func(targets map[string]*strategy.TargetItem) { var result []string for _, t := range targets { result = append(result, t.TargetURL) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index bd2bd0a469..86665d196d 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -9,18 +9,22 @@ import ( "os/signal" "syscall" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + gokitlog "github.com/go-kit/log" "github.com/go-logr/logr" "github.com/gorilla/mux" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" - lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery" - allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ctrl "sigs.k8s.io/controller-runtime" + + _ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/least_weighted" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" + lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery" + allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) var ( @@ -41,13 +45,22 @@ func main() { setupLog.Error(err, "Failed to parse parameters") os.Exit(1) } + cfg, err := config.Load(*cliConf.ConfigFilePath) + if err != nil { + setupLog.Error(err, "Unable to load configuration") + } cliConf.RootLogger.Info("Starting the Target Allocator") ctx := context.Background() log := ctrl.Log.WithName("allocator") - allocator := allocation.NewAllocator(log) + + allocator, err := strategy.New(*cfg.AllocationStrategy, log) + if err != nil { + setupLog.Error(err, "Unable to initialize allocation strategy") + os.Exit(1) + } watcher, err := allocatorWatcher.NewWatcher(setupLog, cliConf, allocator) if err != nil { setupLog.Error(err, "Can't start the watchers") @@ -60,7 +73,13 @@ func main() { defer discoveryManager.Close() discoveryManager.Watch(allocator.SetTargets) - srv, err := newServer(log, allocator, discoveryManager, cliConf) + k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf) + if err != nil { + setupLog.Error(err, "Can't start the k8s client") + os.Exit(1) + } + + srv, err := newServer(log, allocator, discoveryManager, k8sclient, cliConf.ListenAddr) if err != nil { setupLog.Error(err, "Can't start the server") } @@ -91,7 +110,7 @@ func main() { if err := srv.Shutdown(ctx); err != nil { setupLog.Error(err, "Cannot shutdown the server") } - srv, err = newServer(log, allocator, discoveryManager, cliConf) + srv, err = newServer(log, allocator, discoveryManager, k8sclient, cliConf.ListenAddr) if err != nil { setupLog.Error(err, "Error restarting the server with new config") } @@ -120,17 +139,13 @@ func main() { type server struct { logger logr.Logger - allocator *allocation.Allocator + allocator strategy.Allocator discoveryManager *lbdiscovery.Manager k8sClient *collector.Client server *http.Server } -func newServer(log logr.Logger, allocator *allocation.Allocator, discoveryManager *lbdiscovery.Manager, cliConf config.CLIConfig) (*server, error) { - k8sclient, err := configureFileDiscovery(log, allocator, discoveryManager, context.Background(), cliConf) - if err != nil { - return nil, err - } +func newServer(log logr.Logger, allocator strategy.Allocator, discoveryManager *lbdiscovery.Manager, k8sclient *collector.Client, listenAddr *string) (*server, error) { s := &server{ logger: log, allocator: allocator, @@ -142,11 +157,11 @@ func newServer(log logr.Logger, allocator *allocation.Allocator, discoveryManage router.HandleFunc("/jobs", s.JobHandler).Methods("GET") router.HandleFunc("/jobs/{job_id}/targets", s.TargetsHandler).Methods("GET") router.Path("/metrics").Handler(promhttp.Handler()) - s.server = &http.Server{Addr: *cliConf.ListenAddr, Handler: router} + s.server = &http.Server{Addr: *listenAddr, Handler: router} return s, nil } -func configureFileDiscovery(log logr.Logger, allocator *allocation.Allocator, discoveryManager *lbdiscovery.Manager, ctx context.Context, cliConfig config.CLIConfig) (*collector.Client, error) { +func configureFileDiscovery(log logr.Logger, allocator strategy.Allocator, discoveryManager *lbdiscovery.Manager, ctx context.Context, cliConfig config.CLIConfig) (*collector.Client, error) { cfg, err := config.Load(*cliConfig.ConfigFilePath) if err != nil { return nil, err @@ -178,9 +193,9 @@ func (s *server) Shutdown(ctx context.Context) error { } func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) { - displayData := make(map[string]allocation.LinkJSON) + displayData := make(map[string]strategy.LinkJSON) for _, v := range s.allocator.TargetItems() { - displayData[v.JobName] = allocation.LinkJSON{v.Link.Link} + displayData[v.JobName] = strategy.LinkJSON{Link: v.Link.Link} } jsonHandler(w, r, displayData) } @@ -199,9 +214,9 @@ func (s *server) PrometheusMiddleware(next http.Handler) http.Handler { func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { q := r.URL.Query()["collector_id"] - var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem + var compareMap = make(map[string][]strategy.TargetItem) // CollectorName+jobName -> TargetItem for _, v := range s.allocator.TargetItems() { - compareMap[v.Collector.Name+v.JobName] = append(compareMap[v.Collector.Name+v.JobName], *v) + compareMap[v.CollectorName+v.JobName] = append(compareMap[v.CollectorName+v.JobName], *v) } params := mux.Vars(r) jobId, err := url.QueryUnescape(params["job_id"]) diff --git a/cmd/otel-allocator/watcher/file.go b/cmd/otel-allocator/watcher/file.go index 262bb71bfb..6adc3c33e9 100644 --- a/cmd/otel-allocator/watcher/file.go +++ b/cmd/otel-allocator/watcher/file.go @@ -5,6 +5,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" ) diff --git a/cmd/otel-allocator/watcher/main.go b/cmd/otel-allocator/watcher/main.go index dd09ca1b09..dbfc275b8b 100644 --- a/cmd/otel-allocator/watcher/main.go +++ b/cmd/otel-allocator/watcher/main.go @@ -4,14 +4,15 @@ import ( "fmt" "github.com/go-logr/logr" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" ) type Manager struct { Events chan Event Errors chan error - allocator *allocation.Allocator + allocator strategy.Allocator watchers []Watcher } @@ -44,7 +45,7 @@ func (e EventSource) String() string { return eventSourceToString[e] } -func NewWatcher(logger logr.Logger, config config.CLIConfig, allocator *allocation.Allocator) (*Manager, error) { +func NewWatcher(logger logr.Logger, config config.CLIConfig, allocator strategy.Allocator) (*Manager, error) { watcher := Manager{ allocator: allocator, Events: make(chan Event), From b4005be9b617245d4ba0d02f3c162febce85ba5c Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 7 Sep 2022 15:55:09 -0400 Subject: [PATCH 3/7] Fix tests --- .../least_weighted/allocator_test.go | 171 +++++++++--------- .../allocation/strategy/state.go | 4 +- 2 files changed, 86 insertions(+), 89 deletions(-) diff --git a/cmd/otel-allocator/allocation/least_weighted/allocator_test.go b/cmd/otel-allocator/allocation/least_weighted/allocator_test.go index c2a095adb3..fe5c7f7a9c 100644 --- a/cmd/otel-allocator/allocation/least_weighted/allocator_test.go +++ b/cmd/otel-allocator/allocation/least_weighted/allocator_test.go @@ -1,9 +1,13 @@ package least_weighted import ( + "fmt" "math" + "math/rand" "testing" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -11,24 +15,32 @@ import ( var logger = logf.Log.WithName("unit-tests") -// Tests the least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload -func TestFindNextCollector(t *testing.T) { - s := NewAllocator(logger) - - defaultCol := collector{Name: "default-col", NumTargets: 1} - maxCol := collector{Name: "max-col", NumTargets: 2} - leastCol := collector{Name: "least-col", NumTargets: 0} - s.collectors[maxCol.Name] = &maxCol - s.collectors[leastCol.Name] = &leastCol - s.collectors[defaultCol.Name] = &defaultCol +func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*strategy.TargetItem { + toReturn := map[string]*strategy.TargetItem{} + for i := startingIndex; i < n+startingIndex; i++ { + collector := fmt.Sprintf("collector-%d", i%numCollectors) + newTarget := strategy.NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", nil, collector) + toReturn[newTarget.Hash()] = newTarget + } + return toReturn +} - assert.Equal(t, "least-col", s.findNextCollector().Name) +func makeNCollectors(n int, targetsForEach int) map[string]*strategy.Collector { + toReturn := map[string]*strategy.Collector{} + for i := 0; i < n; i++ { + collector := fmt.Sprintf("collector-%d", i) + toReturn[collector] = &strategy.Collector{ + Name: collector, + NumTargets: targetsForEach, + } + } + return toReturn } func TestSetCollectors(t *testing.T) { - s := NewAllocator(logger) + s, _ := strategy.New("least-weighted", logger) - cols := []string{"col-1", "col-2", "col-3"} + cols := makeNCollectors(3, 0) s.SetCollectors(cols) expectedColLen := len(cols) @@ -36,40 +48,31 @@ func TestSetCollectors(t *testing.T) { assert.Len(t, collectors, expectedColLen) for _, i := range cols { - assert.NotNil(t, collectors[i]) + assert.NotNil(t, collectors[i.Name]) } } func TestAddingAndRemovingTargets(t *testing.T) { // prepare allocator with initial targets and collectors - s := NewAllocator(logger) + s, _ := strategy.New("least-weighted", logger) - cols := []string{"col-1", "col-2", "col-3"} + cols := makeNCollectors(3, 0) s.SetCollectors(cols) - labels := model.LabelSet{} - initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem - for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) - } + initTargets := makeNNewTargets(6, 3, 0) // test that targets and collectors are added properly - s.SetTargets(targetList) + s.SetTargets(initTargets) // verify expectedTargetLen := len(initTargets) assert.Len(t, s.TargetItems(), expectedTargetLen) // prepare second round of targets - tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"} - var newTargetList []TargetItem - for _, i := range tar { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) - } + tar := makeNNewTargets(4, 3, 0) // test that fewer targets are found - removed - s.SetTargets(newTargetList) + s.SetTargets(tar) // verify targetItems := s.TargetItems() @@ -78,7 +81,7 @@ func TestAddingAndRemovingTargets(t *testing.T) { // verify results map for _, i := range tar { - _, ok := targetItems["sample-name"+i+labels.Fingerprint().String()] + _, ok := targetItems[i.Hash()] assert.True(t, ok) } } @@ -86,9 +89,9 @@ func TestAddingAndRemovingTargets(t *testing.T) { // Tests that two targets with the same target url and job name but different label set are both added func TestAllocationCollision(t *testing.T) { // prepare allocator with initial targets and collectors - s := NewAllocator(logger) + s, _ := strategy.New("least-weighted", logger) - cols := []string{"col-1", "col-2", "col-3"} + cols := makeNCollectors(3, 0) s.SetCollectors(cols) firstLabels := model.LabelSet{ "test": "test1", @@ -96,10 +99,12 @@ func TestAllocationCollision(t *testing.T) { secondLabels := model.LabelSet{ "test": "test2", } + firstTarget := strategy.NewTargetItem("sample-name", "0.0.0.0:8000", firstLabels, "") + secondTarget := strategy.NewTargetItem("sample-name", "0.0.0.0:8000", secondLabels, "") - targetList := []TargetItem{ - {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: firstLabels}, - {JobName: "sample-name", TargetURL: "0.0.0.0:8000", Label: secondLabels}, + targetList := map[string]*strategy.TargetItem{ + firstTarget.Hash(): firstTarget, + secondTarget.Hash(): secondTarget, } // test that targets and collectors are added properly @@ -112,31 +117,27 @@ func TestAllocationCollision(t *testing.T) { // verify results map for _, i := range targetList { - _, ok := targetItems[i.hash()] + _, ok := targetItems[i.Hash()] assert.True(t, ok) } } func TestNoCollectorReassignment(t *testing.T) { - s := NewAllocator(logger) + s, _ := strategy.New("least-weighted", logger) - cols := []string{"col-1", "col-2", "col-3"} + cols := makeNCollectors(3, 0) s.SetCollectors(cols) - labels := model.LabelSet{} expectedColLen := len(cols) - assert.Len(t, s.collectors, expectedColLen) + assert.Len(t, s.Collectors(), expectedColLen) for _, i := range cols { - assert.NotNil(t, s.collectors[i]) - } - initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem - for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + assert.NotNil(t, s.Collectors()[i.Name]) } + initTargets := makeNNewTargets(6, 3, 0) + // test that targets and collectors are added properly - s.SetTargets(targetList) + s.SetTargets(initTargets) // verify expectedTargetLen := len(initTargets) @@ -144,7 +145,7 @@ func TestNoCollectorReassignment(t *testing.T) { assert.Len(t, targetItems, expectedTargetLen) // assign new set of collectors with the same names - newCols := []string{"col-1", "col-2", "col-3"} + newCols := makeNCollectors(3, 0) s.SetCollectors(newCols) newTargetItems := s.TargetItems() @@ -153,25 +154,20 @@ func TestNoCollectorReassignment(t *testing.T) { } func TestSmartCollectorReassignment(t *testing.T) { - s := NewAllocator(logger) + s, _ := strategy.New("least-weighted", logger) - cols := []string{"col-1", "col-2", "col-3"} + cols := makeNCollectors(3, 0) s.SetCollectors(cols) - labels := model.LabelSet{} expectedColLen := len(cols) - assert.Len(t, s.collectors, expectedColLen) + assert.Len(t, s.Collectors(), expectedColLen) for _, i := range cols { - assert.NotNil(t, s.collectors[i]) - } - initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} - var targetList []TargetItem - for _, i := range initTargets { - targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: labels}) + assert.NotNil(t, s.Collectors()[i.Name]) } + initTargets := makeNNewTargets(6, 3, 0) // test that targets and collectors are added properly - s.SetTargets(targetList) + s.SetTargets(initTargets) // verify expectedTargetLen := len(initTargets) @@ -179,7 +175,15 @@ func TestSmartCollectorReassignment(t *testing.T) { assert.Len(t, targetItems, expectedTargetLen) // assign new set of collectors with the same names - newCols := []string{"col-1", "col-2", "col-4"} + newCols := map[string]*strategy.Collector{ + "collector-1": { + Name: "collector-1", + }, "collector-2": { + Name: "collector-2", + }, "collector-4": { + Name: "collector-4", + }, + } s.SetCollectors(newCols) newTargetItems := s.TargetItems() @@ -187,10 +191,10 @@ func TestSmartCollectorReassignment(t *testing.T) { for key, targetItem := range targetItems { item, ok := newTargetItems[key] assert.True(t, ok, "all target items should be found in new target item list") - if targetItem.Collector.Name != "col-3" { - assert.Equal(t, targetItem.Collector.Name, item.Collector.Name) + if targetItem.CollectorName != "col-3" { + assert.Equal(t, targetItem.CollectorName, item.CollectorName) } else { - assert.Equal(t, "col-4", item.Collector.Name) + assert.Equal(t, "col-4", item.CollectorName) } } } @@ -199,19 +203,13 @@ func TestSmartCollectorReassignment(t *testing.T) { func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { // prepare allocator with 3 collectors and 'random' amount of targets - s := NewAllocator(logger) + s, _ := strategy.New("least-weighted", logger) - cols := []string{"col-1", "col-2", "col-3"} + cols := makeNCollectors(3, 0) s.SetCollectors(cols) - targets := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005", "prometheus:1006", - "prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1015", "prometheus:1016", - "prometheus:1021", "prometheus:1022", "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"} - var newTargetList []TargetItem - for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) - } - s.SetTargets(newTargetList) + targets := makeNNewTargets(27, 3, 0) + s.SetTargets(targets) // Divisor needed to get 15% divisor := 6.7 @@ -227,14 +225,17 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { } // removing targets at 'random' - targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006", - "prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1016", - "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"} - newTargetList = []TargetItem{} - for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + // Remove half of targets randomly + toDelete := len(targets) / 2 + counter := 0 + for index, _ := range targets { + shouldDelete := rand.Intn(toDelete) + if counter < shouldDelete { + delete(targets, index) + } + counter++ } - s.SetTargets(newTargetList) + s.SetTargets(targets) targetItemLen = len(s.TargetItems()) collectors = s.Collectors() @@ -246,14 +247,10 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { assert.InDelta(t, i.NumTargets, count, math.Round(percent)) } // adding targets at 'random' - targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006", - "prometheus:1011", "prometheus:1012", "prometheus:1001", "prometheus:1014", "prometheus:1016", - "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1126", "prometheus:1227"} - newTargetList = []TargetItem{} - for _, i := range targets { - newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) + for _, item := range makeNNewTargets(13, 3, 100) { + targets[item.Hash()] = item } - s.SetTargets(newTargetList) + s.SetTargets(targets) targetItemLen = len(s.TargetItems()) collectors = s.Collectors() diff --git a/cmd/otel-allocator/allocation/strategy/state.go b/cmd/otel-allocator/allocation/strategy/state.go index 5177d8f52e..6f6d0a79f2 100644 --- a/cmd/otel-allocator/allocation/strategy/state.go +++ b/cmd/otel-allocator/allocation/strategy/state.go @@ -29,8 +29,8 @@ type TargetItem struct { CollectorName string } -func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) TargetItem { - return TargetItem{ +func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *TargetItem { + return &TargetItem{ JobName: jobName, Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, TargetURL: targetURL, From bc3b64fe4b2fe6f994448f1691ea9f4befea57d3 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 7 Sep 2022 16:32:51 -0400 Subject: [PATCH 4/7] Remove deadlocking problem --- cmd/otel-allocator/allocation/least_weighted/allocator.go | 6 +++--- cmd/otel-allocator/config/config.go | 7 +++++++ cmd/otel-allocator/main.go | 3 ++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cmd/otel-allocator/allocation/least_weighted/allocator.go b/cmd/otel-allocator/allocation/least_weighted/allocator.go index e34be81e25..e2fc5fdb7b 100644 --- a/cmd/otel-allocator/allocation/least_weighted/allocator.go +++ b/cmd/otel-allocator/allocation/least_weighted/allocator.go @@ -117,7 +117,7 @@ func (allocator *LeastWeightedAllocator) handleTargets(diff utility.Changes[*str // Check for additions for k, target := range diff.Additions() { // Do nothing if the item is already there - if _, ok := allocator.TargetItems()[k]; ok { + if _, ok := allocator.state.TargetItems()[k]; ok { continue } else { // Assign new set of collectors with the one different name @@ -163,7 +163,7 @@ func (allocator *LeastWeightedAllocator) SetTargets(targets map[string]*strategy defer allocator.m.Unlock() // Check for target changes - targetsDiff := utility.DiffMaps(allocator.TargetItems(), targets) + targetsDiff := utility.DiffMaps(allocator.state.TargetItems(), targets) // If there are any additions or removals if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { allocator.handleTargets(targetsDiff) @@ -188,7 +188,7 @@ func (allocator *LeastWeightedAllocator) SetCollectors(collectors map[string]*st defer allocator.m.Unlock() // Check for collector changes - collectorsDiff := utility.DiffMaps(allocator.Collectors(), collectors) + collectorsDiff := utility.DiffMaps(allocator.state.Collectors(), collectors) if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { allocator.handleCollectors(collectorsDiff) } diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 97d729ac09..05d0e38405 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -36,6 +36,13 @@ type Config struct { AllocationStrategy *string `yaml:"allocation_strategy,omitempty"` } +func (c Config) GetAllocationStrategy() string { + if c.AllocationStrategy != nil { + return *c.AllocationStrategy + } + return "least-weighted" +} + type PrometheusCRWatcherConfig struct { Enabled *bool } diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 86665d196d..a69067bdc9 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" _ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/least_weighted" @@ -56,7 +57,7 @@ func main() { log := ctrl.Log.WithName("allocator") - allocator, err := strategy.New(*cfg.AllocationStrategy, log) + allocator, err := strategy.New(cfg.GetAllocationStrategy(), log) if err != nil { setupLog.Error(err, "Unable to initialize allocation strategy") os.Exit(1) From d2c7daae585a13679854b8e71b3ed81f9e21272d Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 8 Sep 2022 13:18:41 -0400 Subject: [PATCH 5/7] Update based on comments --- cmd/otel-allocator/allocation/http.go | 49 ++-- cmd/otel-allocator/allocation/http_test.go | 44 ++-- .../allocation/least_weighted.go | 195 ++++++++++++++++ .../allocation/least_weighted/allocator.go | 210 ------------------ ...locator_test.go => least_weighted_test.go} | 48 ++-- cmd/otel-allocator/allocation/strategy.go | 89 ++++++++ .../allocation/strategy/state.go | 91 -------- .../allocation/strategy/strategy.go | 45 ---- cmd/otel-allocator/collector/collector.go | 13 +- .../collector/collector_test.go | 11 +- .../{utility/utility.go => diff/diff.go} | 4 +- .../utility_test.go => diff/diff_test.go} | 4 +- cmd/otel-allocator/discovery/discovery.go | 8 +- .../discovery/discovery_test.go | 5 +- cmd/otel-allocator/main.go | 16 +- cmd/otel-allocator/watcher/main.go | 6 +- 16 files changed, 387 insertions(+), 451 deletions(-) create mode 100644 cmd/otel-allocator/allocation/least_weighted.go delete mode 100644 cmd/otel-allocator/allocation/least_weighted/allocator.go rename cmd/otel-allocator/allocation/{least_weighted/allocator_test.go => least_weighted_test.go} (82%) create mode 100644 cmd/otel-allocator/allocation/strategy.go delete mode 100644 cmd/otel-allocator/allocation/strategy/state.go delete mode 100644 cmd/otel-allocator/allocation/strategy/strategy.go rename cmd/otel-allocator/{utility/utility.go => diff/diff.go} (89%) rename cmd/otel-allocator/{utility/utility_test.go => diff/diff_test.go} (90%) diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index d59db9b451..6a914017e4 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -4,53 +4,62 @@ import ( "fmt" "net/url" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" - "github.com/prometheus/common/model" ) -func GetAllTargetsByJob(job string, cMap map[string][]strategy.TargetItem, allocator strategy.Allocator) map[string]strategy.CollectorJSON { - displayData := make(map[string]strategy.CollectorJSON) +type LinkJSON struct { + Link string `json:"_link"` +} + +type CollectorJSON struct { + Link string `json:"_link"` + Jobs []TargetGroupJSON `json:"targets"` +} + +type TargetGroupJSON struct { + Targets []string `json:"targets"` + Labels model.LabelSet `json:"labels"` +} + +func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allocator) map[string]CollectorJSON { + displayData := make(map[string]CollectorJSON) for _, j := range allocator.TargetItems() { if j.JobName == job { - var targetList []strategy.TargetItem + var targetList []TargetItem targetList = append(targetList, cMap[j.CollectorName+j.JobName]...) - var targetGroupList []strategy.TargetGroupJSON + var targetGroupList []TargetGroupJSON for _, t := range targetList { - targetGroupList = append(targetGroupList, strategy.TargetGroupJSON{ + targetGroupList = append(targetGroupList, TargetGroupJSON{ Targets: []string{t.TargetURL}, Labels: t.Label, }) } - displayData[j.CollectorName] = strategy.CollectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList} + displayData[j.CollectorName] = CollectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList} } } return displayData } -func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]strategy.TargetItem, allocator strategy.Allocator) []strategy.TargetGroupJSON { - var tgs []strategy.TargetGroupJSON +func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator Allocator) []TargetGroupJSON { + var tgs []TargetGroupJSON group := make(map[string]string) labelSet := make(map[string]model.LabelSet) - for colName, _ := range allocator.Collectors() { - if colName == collector { - for _, targetItemArr := range cMap { - for _, targetItem := range targetItemArr { - if targetItem.CollectorName == collector && targetItem.JobName == job { - group[targetItem.Label.String()] = targetItem.TargetURL - labelSet[targetItem.TargetURL] = targetItem.Label - } + if _, ok := allocator.Collectors()[collector]; ok { + for _, targetItemArr := range cMap { + for _, targetItem := range targetItemArr { + if targetItem.CollectorName == collector && targetItem.JobName == job { + group[targetItem.Label.String()] = targetItem.TargetURL + labelSet[targetItem.TargetURL] = targetItem.Label } } } } - for _, v := range group { - tgs = append(tgs, strategy.TargetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) + tgs = append(tgs, TargetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) } return tgs diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index d1476a8e08..94cf53a673 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -6,36 +6,30 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" - logf "sigs.k8s.io/controller-runtime/pkg/log" - - _ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/least_weighted" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" ) -var logger = logf.Log.WithName("unit-tests") - func TestGetAllTargetsByCollectorAndJob(t *testing.T) { - baseAllocator, _ := strategy.New("least-weighted", logger) - baseAllocator.SetCollectors(map[string]*strategy.Collector{"test-collector": {Name: "test-collector"}}) - statefulAllocator, _ := strategy.New("least-weighted", logger) - statefulAllocator.SetCollectors(map[string]*strategy.Collector{"test-collector-0": {Name: "test-collector-0"}}) + baseAllocator, _ := New("least-weighted", logger) + baseAllocator.SetCollectors(map[string]*Collector{"test-collector": {Name: "test-collector"}}) + statefulAllocator, _ := New("least-weighted", logger) + statefulAllocator.SetCollectors(map[string]*Collector{"test-collector-0": {Name: "test-collector-0"}}) type args struct { collector string job string - cMap map[string][]strategy.TargetItem - allocator strategy.Allocator + cMap map[string][]TargetItem + allocator Allocator } var tests = []struct { name string args args - want []strategy.TargetGroupJSON + want []TargetGroupJSON }{ { name: "Empty target map", args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]strategy.TargetItem{}, + cMap: map[string][]TargetItem{}, allocator: baseAllocator, }, want: nil, @@ -45,9 +39,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]strategy.TargetItem{ + cMap: map[string][]TargetItem{ "test-collectortest-job": { - strategy.TargetItem{ + TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -59,7 +53,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, allocator: baseAllocator, }, - want: []strategy.TargetGroupJSON{ + want: []TargetGroupJSON{ { Targets: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ @@ -73,9 +67,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]strategy.TargetItem{ + cMap: map[string][]TargetItem{ "test-collectortest-job": { - strategy.TargetItem{ + TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -85,7 +79,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, }, "test-collectortest-job2": { - strategy.TargetItem{ + TargetItem{ JobName: "test-job2", Label: model.LabelSet{ "test-label": "test-value", @@ -97,7 +91,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, allocator: baseAllocator, }, - want: []strategy.TargetGroupJSON{ + want: []TargetGroupJSON{ { Targets: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ @@ -111,9 +105,9 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { args: args{ collector: "test-collector", job: "test-job", - cMap: map[string][]strategy.TargetItem{ + cMap: map[string][]TargetItem{ "test-collectortest-job": { - strategy.TargetItem{ + TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -124,7 +118,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, }, "test-collectortest-job2": { - strategy.TargetItem{ + TargetItem{ JobName: "test-job", Label: model.LabelSet{ "test-label": "test-value", @@ -136,7 +130,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, allocator: baseAllocator, }, - want: []strategy.TargetGroupJSON{ + want: []TargetGroupJSON{ { Targets: []string{"test-url1"}, Labels: map[model.LabelName]model.LabelValue{ diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go new file mode 100644 index 0000000000..199baa902a --- /dev/null +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -0,0 +1,195 @@ +package allocation + +import ( + "fmt" + "net/url" + "sync" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" + + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + _ Allocator = &leastWeightedAllocator{} + strategyName = "least-weighted" +) + +/* + Load balancer will serve on an HTTP server exposing /jobs//targets + The targets are allocated using the least connection method + Load balancer will need information about the collectors in order to set the URLs + Keep a Map of what each collector currently holds and update it based on new scrape target updates +*/ + +// leastWeightedAllocator makes decisions to distribute work among +// a number of OpenTelemetry collectors based on the number of targets. +// Users need to call SetTargets when they have new targets in their +// clusters and call SetCollectors when the collectors have changed. +type leastWeightedAllocator struct { + // m protects collectors and targetItems for concurrent use. + m sync.RWMutex + // collectors is a map from a Collector's name to a Collector instance + collectors map[string]*Collector + // targetItems is a map from a target item's hash to the target items allocated state + targetItems map[string]*TargetItem + + log logr.Logger +} + +// TargetItems returns a shallow copy of the targetItems map. +func (allocator *leastWeightedAllocator) TargetItems() map[string]*TargetItem { + allocator.m.RLock() + defer allocator.m.RUnlock() + targetItemsCopy := make(map[string]*TargetItem) + for k, v := range allocator.targetItems { + targetItemsCopy[k] = v + } + return targetItemsCopy +} + +// Collectors returns a shallow copy of the collectors map. +func (allocator *leastWeightedAllocator) Collectors() map[string]*Collector { + allocator.m.RLock() + defer allocator.m.RUnlock() + collectorsCopy := make(map[string]*Collector) + for k, v := range allocator.collectors { + collectorsCopy[k] = v + } + return collectorsCopy +} + +// findNextCollector finds the next collector with fewer number of targets. +// This method is called from within SetTargets and SetCollectors, whose caller +// acquires the needed lock. This method assumes there are is at least 1 collector set +func (allocator *leastWeightedAllocator) findNextCollector() *Collector { + var col *Collector + for _, v := range allocator.collectors { + // If the initial collector is empty, set the initial collector to the first element of map + if col == nil { + col = v + } else if v.NumTargets < col.NumTargets { + col = v + } + } + return col +} + +// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems +// This method is called from within SetTargets and SetCollectors, which acquire the needed lock. +// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap +func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetItem) { + chosenCollector := allocator.findNextCollector() + targetItem := &TargetItem{ + JobName: target.JobName, + Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, + TargetURL: target.TargetURL, + Label: target.Label, + CollectorName: chosenCollector.Name, + } + allocator.targetItems[targetItem.Hash()] = targetItem + chosenCollector.NumTargets++ + TargetsPerCollector.WithLabelValues(chosenCollector.Name, strategyName).Set(float64(chosenCollector.NumTargets)) +} + +func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*TargetItem]) { + // Check for removals + for k, target := range allocator.targetItems { + // if the current target is in the removals list + if _, ok := diff.Removals()[k]; ok { + c := allocator.collectors[target.CollectorName] + c.NumTargets-- + delete(allocator.targetItems, k) + TargetsPerCollector.WithLabelValues(target.CollectorName, strategyName).Set(float64(c.NumTargets)) + } + } + + // Check for additions + for k, target := range diff.Additions() { + // Do nothing if the item is already there + if _, ok := allocator.targetItems[k]; ok { + continue + } else { + // Add target to target pool and assign a collector + allocator.addTargetToTargetItems(target) + } + } +} + +func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Collector]) { + // Clear removed collectors + for _, k := range diff.Removals() { + delete(allocator.collectors, k.Name) + TargetsPerCollector.WithLabelValues(k.Name, strategyName).Set(0) + } + // Insert the new collectors + for _, i := range diff.Additions() { + allocator.collectors[i.Name] = NewCollector(i.Name) + } + + // Re-Allocate targets of the removed collectors + for _, item := range allocator.targetItems { + if _, ok := diff.Removals()[item.CollectorName]; ok { + allocator.addTargetToTargetItems(item) + } + } +} + +// SetTargets accepts a list of targets that will be used to make +// load balancing decisions. This method should be called when there are +// new targets discovered or existing targets are shutdown. +func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetItem) { + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", strategyName)) + defer timer.ObserveDuration() + + allocator.m.Lock() + defer allocator.m.Unlock() + + // Check for target changes + targetsDiff := diff.Maps(allocator.targetItems, targets) + // If there are any additions or removals + if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { + allocator.handleTargets(targetsDiff) + } + return +} + +// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. +// This method is called when Collectors are added or removed. +func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Collector) { + log := allocator.log.WithValues("component", "opentelemetry-targetallocator") + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", strategyName)) + defer timer.ObserveDuration() + + CollectorsAllocatable.WithLabelValues(strategyName).Set(float64(len(collectors))) + if len(collectors) == 0 { + log.Info("No collector instances present") + return + } + + allocator.m.Lock() + defer allocator.m.Unlock() + + // Check for collector changes + collectorsDiff := diff.Maps(allocator.collectors, collectors) + if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { + allocator.handleCollectors(collectorsDiff) + } + return +} + +func NewAllocator(log logr.Logger) Allocator { + return &leastWeightedAllocator{ + log: log, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*TargetItem), + } +} + +func init() { + err := Register(strategyName, NewAllocator) + if err != nil { + panic(err) + } +} diff --git a/cmd/otel-allocator/allocation/least_weighted/allocator.go b/cmd/otel-allocator/allocation/least_weighted/allocator.go deleted file mode 100644 index e2fc5fdb7b..0000000000 --- a/cmd/otel-allocator/allocation/least_weighted/allocator.go +++ /dev/null @@ -1,210 +0,0 @@ -package least_weighted - -import ( - "fmt" - "net/url" - "os" - "sync" - - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/utility" - - "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var ( - collectorsAllocatable = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_collectors_allocatable", - Help: "Number of collectors the allocator is able to allocate to.", - }) - timeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: "opentelemetry_allocator_time_to_allocate", - Help: "The time it takes to allocate", - }, []string{"method"}) - _ strategy.Allocator = &LeastWeightedAllocator{} -) - -/* - Load balancer will serve on an HTTP server exposing /jobs//targets - The targets are allocated using the least connection method - Load balancer will need information about the collectors in order to set the URLs - Keep a Map of what each collector currently holds and update it based on new scrape target updates -*/ - -// LeastWeightedAllocator makes decisions to distribute work among -// a number of OpenTelemetry collectors based on the number of targets. -// Users need to call SetTargets when they have new targets in their -// clusters and call SetCollectors when the collectors have changed. -type LeastWeightedAllocator struct { - // m protects collectors and targetItems for concurrent use. - m sync.RWMutex - state strategy.State - - log logr.Logger -} - -// TargetItems returns a shallow copy of the targetItems map. -func (allocator *LeastWeightedAllocator) TargetItems() map[string]*strategy.TargetItem { - allocator.m.RLock() - defer allocator.m.RUnlock() - targetItemsCopy := make(map[string]*strategy.TargetItem) - for k, v := range allocator.state.TargetItems() { - targetItemsCopy[k] = v - } - return targetItemsCopy -} - -// Collectors returns a shallow copy of the collectors map. -func (allocator *LeastWeightedAllocator) Collectors() map[string]*strategy.Collector { - allocator.m.RLock() - defer allocator.m.RUnlock() - collectorsCopy := make(map[string]*strategy.Collector) - for k, v := range allocator.state.Collectors() { - collectorsCopy[k] = v - } - return collectorsCopy -} - -// findNextCollector finds the next collector with fewer number of targets. -// This method is called from within SetTargets and SetCollectors, whose caller -// acquires the needed lock. -func (allocator *LeastWeightedAllocator) findNextCollector() *strategy.Collector { - var col *strategy.Collector - for _, v := range allocator.state.Collectors() { - // If the initial collector is empty, set the initial collector to the first element of map - if col == nil { - col = v - } else { - if v.NumTargets < col.NumTargets { - col = v - } - } - } - return col -} - -// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems -// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock. -// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap -func (allocator *LeastWeightedAllocator) addTargetToTargetItems(target *strategy.TargetItem) { - chosenCollector := allocator.findNextCollector() - targetItem := &strategy.TargetItem{ - JobName: target.JobName, - Link: strategy.LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, - TargetURL: target.TargetURL, - Label: target.Label, - CollectorName: chosenCollector.Name, - } - allocator.state.SetTargetItem(targetItem.Hash(), targetItem) - chosenCollector.NumTargets++ - strategy.TargetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets)) -} - -func (allocator *LeastWeightedAllocator) handleTargets(diff utility.Changes[*strategy.TargetItem]) { - // Check for removals - for k, target := range allocator.state.TargetItems() { - // if the current target is in the removals list - if _, ok := diff.Removals()[k]; ok { - c := allocator.state.Collectors()[target.CollectorName] - c.NumTargets-- - allocator.state.RemoveTargetItem(k) - strategy.TargetsPerCollector.WithLabelValues(target.CollectorName).Set(float64(c.NumTargets)) - } - } - - // Check for additions - for k, target := range diff.Additions() { - // Do nothing if the item is already there - if _, ok := allocator.state.TargetItems()[k]; ok { - continue - } else { - // Assign new set of collectors with the one different name - allocator.addTargetToTargetItems(target) - } - } -} - -func (allocator *LeastWeightedAllocator) handleCollectors(diff utility.Changes[*strategy.Collector]) { - // Clear existing collectors - for _, k := range diff.Removals() { - allocator.state.RemoveCollector(k.Name) - strategy.TargetsPerCollector.WithLabelValues(k.Name).Set(0) - } - // Insert the new collectors - for _, i := range diff.Additions() { - allocator.state.SetCollector(i.Name, &strategy.Collector{Name: i.Name, NumTargets: 0}) - } - - // find targets which need to be redistributed - var redistribute []*strategy.TargetItem - for _, item := range allocator.state.TargetItems() { - for _, s := range diff.Removals() { - if item.CollectorName == s.Name { - redistribute = append(redistribute, item) - } - } - } - // Re-Allocate the existing targets - for _, item := range redistribute { - allocator.addTargetToTargetItems(item) - } -} - -// SetTargets accepts a list of targets that will be used to make -// load balancing decisions. This method should be called when there are -// new targets discovered or existing targets are shutdown. -func (allocator *LeastWeightedAllocator) SetTargets(targets map[string]*strategy.TargetItem) { - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets")) - defer timer.ObserveDuration() - - allocator.m.Lock() - defer allocator.m.Unlock() - - // Check for target changes - targetsDiff := utility.DiffMaps(allocator.state.TargetItems(), targets) - // If there are any additions or removals - if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { - allocator.handleTargets(targetsDiff) - } - return -} - -// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. -// This method is called when Collectors are added or removed. -func (allocator *LeastWeightedAllocator) SetCollectors(collectors map[string]*strategy.Collector) { - log := allocator.log.WithValues("component", "opentelemetry-targetallocator") - timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors")) - defer timer.ObserveDuration() - - collectorsAllocatable.Set(float64(len(collectors))) - if len(collectors) == 0 { - log.Info("No collector instances present") - return - } - - allocator.m.Lock() - defer allocator.m.Unlock() - - // Check for collector changes - collectorsDiff := utility.DiffMaps(allocator.state.Collectors(), collectors) - if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { - allocator.handleCollectors(collectorsDiff) - } - return -} - -func NewAllocator(log logr.Logger) strategy.Allocator { - return &LeastWeightedAllocator{ - log: log, - state: strategy.NewState(make(map[string]*strategy.Collector), make(map[string]*strategy.TargetItem)), - } -} - -func init() { - err := strategy.Register("least-weighted", NewAllocator) - if err != nil { - os.Exit(1) - } -} diff --git a/cmd/otel-allocator/allocation/least_weighted/allocator_test.go b/cmd/otel-allocator/allocation/least_weighted_test.go similarity index 82% rename from cmd/otel-allocator/allocation/least_weighted/allocator_test.go rename to cmd/otel-allocator/allocation/least_weighted_test.go index fe5c7f7a9c..c15c30a301 100644 --- a/cmd/otel-allocator/allocation/least_weighted/allocator_test.go +++ b/cmd/otel-allocator/allocation/least_weighted_test.go @@ -1,4 +1,4 @@ -package least_weighted +package allocation import ( "fmt" @@ -6,8 +6,6 @@ import ( "math/rand" "testing" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" - "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -15,21 +13,21 @@ import ( var logger = logf.Log.WithName("unit-tests") -func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*strategy.TargetItem { - toReturn := map[string]*strategy.TargetItem{} +func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*TargetItem { + toReturn := map[string]*TargetItem{} for i := startingIndex; i < n+startingIndex; i++ { collector := fmt.Sprintf("collector-%d", i%numCollectors) - newTarget := strategy.NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", nil, collector) + newTarget := NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", nil, collector) toReturn[newTarget.Hash()] = newTarget } return toReturn } -func makeNCollectors(n int, targetsForEach int) map[string]*strategy.Collector { - toReturn := map[string]*strategy.Collector{} +func makeNCollectors(n int, targetsForEach int) map[string]*Collector { + toReturn := map[string]*Collector{} for i := 0; i < n; i++ { collector := fmt.Sprintf("collector-%d", i) - toReturn[collector] = &strategy.Collector{ + toReturn[collector] = &Collector{ Name: collector, NumTargets: targetsForEach, } @@ -38,7 +36,7 @@ func makeNCollectors(n int, targetsForEach int) map[string]*strategy.Collector { } func TestSetCollectors(t *testing.T) { - s, _ := strategy.New("least-weighted", logger) + s, _ := New("least-weighted", logger) cols := makeNCollectors(3, 0) s.SetCollectors(cols) @@ -54,7 +52,7 @@ func TestSetCollectors(t *testing.T) { func TestAddingAndRemovingTargets(t *testing.T) { // prepare allocator with initial targets and collectors - s, _ := strategy.New("least-weighted", logger) + s, _ := New("least-weighted", logger) cols := makeNCollectors(3, 0) s.SetCollectors(cols) @@ -89,7 +87,7 @@ func TestAddingAndRemovingTargets(t *testing.T) { // Tests that two targets with the same target url and job name but different label set are both added func TestAllocationCollision(t *testing.T) { // prepare allocator with initial targets and collectors - s, _ := strategy.New("least-weighted", logger) + s, _ := New("least-weighted", logger) cols := makeNCollectors(3, 0) s.SetCollectors(cols) @@ -99,10 +97,10 @@ func TestAllocationCollision(t *testing.T) { secondLabels := model.LabelSet{ "test": "test2", } - firstTarget := strategy.NewTargetItem("sample-name", "0.0.0.0:8000", firstLabels, "") - secondTarget := strategy.NewTargetItem("sample-name", "0.0.0.0:8000", secondLabels, "") + firstTarget := NewTargetItem("sample-name", "0.0.0.0:8000", firstLabels, "") + secondTarget := NewTargetItem("sample-name", "0.0.0.0:8000", secondLabels, "") - targetList := map[string]*strategy.TargetItem{ + targetList := map[string]*TargetItem{ firstTarget.Hash(): firstTarget, secondTarget.Hash(): secondTarget, } @@ -123,7 +121,7 @@ func TestAllocationCollision(t *testing.T) { } func TestNoCollectorReassignment(t *testing.T) { - s, _ := strategy.New("least-weighted", logger) + s, _ := New("least-weighted", logger) cols := makeNCollectors(3, 0) s.SetCollectors(cols) @@ -154,9 +152,9 @@ func TestNoCollectorReassignment(t *testing.T) { } func TestSmartCollectorReassignment(t *testing.T) { - s, _ := strategy.New("least-weighted", logger) + s, _ := New("least-weighted", logger) - cols := makeNCollectors(3, 0) + cols := makeNCollectors(4, 0) s.SetCollectors(cols) expectedColLen := len(cols) @@ -165,7 +163,7 @@ func TestSmartCollectorReassignment(t *testing.T) { for _, i := range cols { assert.NotNil(t, s.Collectors()[i.Name]) } - initTargets := makeNNewTargets(6, 3, 0) + initTargets := makeNNewTargets(6, 4, 0) // test that targets and collectors are added properly s.SetTargets(initTargets) @@ -175,8 +173,10 @@ func TestSmartCollectorReassignment(t *testing.T) { assert.Len(t, targetItems, expectedTargetLen) // assign new set of collectors with the same names - newCols := map[string]*strategy.Collector{ - "collector-1": { + newCols := map[string]*Collector{ + "collector-0": { + Name: "collector-0", + }, "collector-1": { Name: "collector-1", }, "collector-2": { Name: "collector-2", @@ -191,10 +191,10 @@ func TestSmartCollectorReassignment(t *testing.T) { for key, targetItem := range targetItems { item, ok := newTargetItems[key] assert.True(t, ok, "all target items should be found in new target item list") - if targetItem.CollectorName != "col-3" { + if targetItem.CollectorName != "collector-3" { assert.Equal(t, targetItem.CollectorName, item.CollectorName) } else { - assert.Equal(t, "col-4", item.CollectorName) + assert.Equal(t, "collector-4", item.CollectorName) } } } @@ -203,7 +203,7 @@ func TestSmartCollectorReassignment(t *testing.T) { func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { // prepare allocator with 3 collectors and 'random' amount of targets - s, _ := strategy.New("least-weighted", logger) + s, _ := New("least-weighted", logger) cols := makeNCollectors(3, 0) s.SetCollectors(cols) diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go new file mode 100644 index 0000000000..f1654260de --- /dev/null +++ b/cmd/otel-allocator/allocation/strategy.go @@ -0,0 +1,89 @@ +package allocation + +import ( + "errors" + "fmt" + "github.com/prometheus/common/model" + "net/url" + + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type AllocatorProvider func(log logr.Logger) Allocator + +var ( + registry = map[string]AllocatorProvider{} + + // TargetsPerCollector records how many targets have been assigned to each collector + // It is currently the responsibility of the strategy to track this information. + TargetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_targets_per_collector", + Help: "The number of targets for each collector.", + }, []string{"collector_name", "strategy"}) + CollectorsAllocatable = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "opentelemetry_allocator_collectors_allocatable", + Help: "Number of collectors the allocator is able to allocate to.", + }, []string{"strategy"}) + TimeToAssign = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "opentelemetry_allocator_time_to_allocate", + Help: "The time it takes to allocate", + }, []string{"method", "strategy"}) +) + +func New(name string, log logr.Logger) (Allocator, error) { + if p, ok := registry[name]; ok { + return p(log), nil + } + return nil, errors.New(fmt.Sprintf("unregistered strategy: %s", name)) +} + +func Register(name string, provider AllocatorProvider) error { + if _, ok := registry[name]; ok { + return errors.New("already registered") + } + registry[name] = provider + return nil +} + +type Allocator interface { + SetCollectors(collectors map[string]*Collector) + SetTargets(targets map[string]*TargetItem) + TargetItems() map[string]*TargetItem + Collectors() map[string]*Collector +} + +type TargetItem struct { + JobName string + Link LinkJSON + TargetURL string + Label model.LabelSet + CollectorName string +} + +func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *TargetItem { + return &TargetItem{ + JobName: jobName, + Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, + TargetURL: targetURL, + Label: label, + CollectorName: collectorName, + } +} + +func (t TargetItem) Hash() string { + return t.JobName + t.TargetURL + t.Label.Fingerprint().String() +} + +// Collector Creates a struct that holds Collector information +// This struct will be parsed into endpoint with Collector and jobs info +// This struct can be extended with information like annotations and labels in the future +type Collector struct { + Name string + NumTargets int +} + +func NewCollector(name string) *Collector { + return &Collector{Name: name} +} diff --git a/cmd/otel-allocator/allocation/strategy/state.go b/cmd/otel-allocator/allocation/strategy/state.go deleted file mode 100644 index 6f6d0a79f2..0000000000 --- a/cmd/otel-allocator/allocation/strategy/state.go +++ /dev/null @@ -1,91 +0,0 @@ -package strategy - -import ( - "fmt" - "net/url" - - "github.com/prometheus/common/model" -) - -type LinkJSON struct { - Link string `json:"_link"` -} - -type CollectorJSON struct { - Link string `json:"_link"` - Jobs []TargetGroupJSON `json:"targets"` -} - -type TargetGroupJSON struct { - Targets []string `json:"targets"` - Labels model.LabelSet `json:"labels"` -} - -type TargetItem struct { - JobName string - Link LinkJSON - TargetURL string - Label model.LabelSet - CollectorName string -} - -func NewTargetItem(jobName string, targetURL string, label model.LabelSet, collectorName string) *TargetItem { - return &TargetItem{ - JobName: jobName, - Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(jobName))}, - TargetURL: targetURL, - Label: label, - CollectorName: collectorName, - } -} - -func (t TargetItem) Hash() string { - return t.JobName + t.TargetURL + t.Label.Fingerprint().String() -} - -// Collector Creates a struct that holds Collector information -// This struct will be parsed into endpoint with Collector and jobs info -// This struct can be extended with information like annotations and labels in the future -type Collector struct { - Name string - NumTargets int -} - -func NewCollector(name string) *Collector { - return &Collector{Name: name} -} - -type State struct { - // collectors is a map from a Collector's name to a Collector instance - collectors map[string]*Collector - // targetItems is a map from a target item's hash to the target items allocated state - targetItems map[string]*TargetItem -} - -func (s State) Collectors() map[string]*Collector { - return s.collectors -} - -func (s State) TargetItems() map[string]*TargetItem { - return s.targetItems -} - -func (s State) SetTargetItem(key string, value *TargetItem) { - s.targetItems[key] = value -} - -func (s State) SetCollector(key string, value *Collector) { - s.collectors[key] = value -} - -func (s State) RemoveCollector(key string) { - delete(s.collectors, key) -} - -func (s State) RemoveTargetItem(key string) { - delete(s.targetItems, key) -} - -func NewState(collectors map[string]*Collector, targetItems map[string]*TargetItem) State { - return State{collectors: collectors, targetItems: targetItems} -} diff --git a/cmd/otel-allocator/allocation/strategy/strategy.go b/cmd/otel-allocator/allocation/strategy/strategy.go deleted file mode 100644 index 11b6f256ef..0000000000 --- a/cmd/otel-allocator/allocation/strategy/strategy.go +++ /dev/null @@ -1,45 +0,0 @@ -package strategy - -import ( - "errors" - "fmt" - - "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -type AllocatorProvider func(log logr.Logger) Allocator - -var ( - registry = map[string]AllocatorProvider{} - - // TargetsPerCollector records how many targets have been assigned to each collector - // It is currently the responsibility of the strategy to track this information. - TargetsPerCollector = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "opentelemetry_allocator_targets_per_collector", - Help: "The number of targets for each collector.", - }, []string{"collector_name"}) -) - -func New(name string, log logr.Logger) (Allocator, error) { - if p, ok := registry[name]; ok { - return p(log), nil - } - return nil, errors.New(fmt.Sprintf("unregistered strategy: %s", name)) -} - -func Register(name string, provider AllocatorProvider) error { - if _, ok := registry[name]; ok { - return errors.New("already registered") - } - registry[name] = provider - return nil -} - -type Allocator interface { - SetCollectors(collectors map[string]*Collector) - SetTargets(targets map[string]*TargetItem) - TargetItems() map[string]*TargetItem - Collectors() map[string]*Collector -} diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index a4ebef8e6c..90a4236f0b 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -2,12 +2,11 @@ package collector import ( "context" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "os" "strconv" "time" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" - "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -50,8 +49,8 @@ func NewClient(logger logr.Logger, kubeConfig *rest.Config) (*Client, error) { }, nil } -func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors map[string]*strategy.Collector)) { - collectorMap := map[string]*strategy.Collector{} +func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func(collectors map[string]*allocation.Collector)) { + collectorMap := map[string]*allocation.Collector{} log := k.log.WithValues("component", "opentelemetry-targetallocator") opts := metav1.ListOptions{ @@ -65,7 +64,7 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( for i := range pods.Items { pod := pods.Items[i] if pod.GetObjectMeta().GetDeletionTimestamp() == nil { - collectorMap[pod.Name] = strategy.NewCollector(pod.Name) + collectorMap[pod.Name] = allocation.NewCollector(pod.Name) } } @@ -87,7 +86,7 @@ func (k *Client) Watch(ctx context.Context, labelMap map[string]string, fn func( }() } -func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]*strategy.Collector, fn func(collectors map[string]*strategy.Collector)) string { +func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap map[string]*allocation.Collector, fn func(collectors map[string]*allocation.Collector)) string { log := k.log.WithValues("component", "opentelemetry-targetallocator") for { collectorsDiscovered.Set(float64(len(collectorMap))) @@ -110,7 +109,7 @@ func runWatch(ctx context.Context, k *Client, c <-chan watch.Event, collectorMap switch event.Type { case watch.Added: - collectorMap[pod.Name] = strategy.NewCollector(pod.Name) + collectorMap[pod.Name] = allocation.NewCollector(pod.Name) case watch.Deleted: delete(collectorMap, pod.Name) } diff --git a/cmd/otel-allocator/collector/collector_test.go b/cmd/otel-allocator/collector/collector_test.go index ff13f003ca..486a155e14 100644 --- a/cmd/otel-allocator/collector/collector_test.go +++ b/cmd/otel-allocator/collector/collector_test.go @@ -3,11 +3,10 @@ package collector import ( "context" "fmt" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "os" "testing" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,7 +15,7 @@ import ( ) var client Client -var collectors = map[string]*strategy.Collector{} +var collectors = map[string]*allocation.Collector{} func TestMain(m *testing.M) { client = Client{ @@ -39,7 +38,7 @@ func TestMain(m *testing.M) { os.Exit(1) } - go runWatch(context.Background(), &client, watcher.ResultChan(), map[string]*strategy.Collector{}, func(colMap map[string]*strategy.Collector) { getCollectors(colMap) }) + go runWatch(context.Background(), &client, watcher.ResultChan(), map[string]*allocation.Collector{}, func(colMap map[string]*allocation.Collector) { getCollectors(colMap) }) code := m.Run() @@ -49,7 +48,7 @@ func TestMain(m *testing.M) { } func TestWatchPodAddition(t *testing.T) { - expected := map[string]*strategy.Collector{ + expected := map[string]*allocation.Collector{ "test-pod1": { Name: "test-pod1", }, @@ -84,7 +83,7 @@ func TestWatchPodDeletion(t *testing.T) { assert.Equal(t, collectors, expected) } -func getCollectors(c map[string]*strategy.Collector) { +func getCollectors(c map[string]*allocation.Collector) { collectors = c } diff --git a/cmd/otel-allocator/utility/utility.go b/cmd/otel-allocator/diff/diff.go similarity index 89% rename from cmd/otel-allocator/utility/utility.go rename to cmd/otel-allocator/diff/diff.go index 30d26d755c..425ca30d86 100644 --- a/cmd/otel-allocator/utility/utility.go +++ b/cmd/otel-allocator/diff/diff.go @@ -1,4 +1,4 @@ -package utility +package diff type Changes[T any] struct { additions map[string]T @@ -13,7 +13,7 @@ func (c Changes[T]) Removals() map[string]T { return c.removals } -func DiffMaps[T any](current, new map[string]T) Changes[T] { +func Maps[T any](current, new map[string]T) Changes[T] { additions := map[string]T{} removals := map[string]T{} // Used as a set to check for removed items diff --git a/cmd/otel-allocator/utility/utility_test.go b/cmd/otel-allocator/diff/diff_test.go similarity index 90% rename from cmd/otel-allocator/utility/utility_test.go rename to cmd/otel-allocator/diff/diff_test.go index 47943b97eb..bcae530d55 100644 --- a/cmd/otel-allocator/utility/utility_test.go +++ b/cmd/otel-allocator/diff/diff_test.go @@ -1,4 +1,4 @@ -package utility +package diff import ( "reflect" @@ -55,7 +55,7 @@ func TestDiffMaps(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := DiffMaps(tt.args.current, tt.args.new); !reflect.DeepEqual(got, tt.want) { + if got := Maps(tt.args.current, tt.args.new); !reflect.DeepEqual(got, tt.want) { t.Errorf("DiffMaps() = %v, want %v", got, tt.want) } }) diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index 003c623404..defa9e8265 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/go-kit/log" "github.com/go-logr/logr" @@ -11,7 +12,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) @@ -60,7 +60,7 @@ func (m *Manager) ApplyConfig(source allocatorWatcher.EventSource, cfg *config.C return m.manager.ApplyConfig(discoveryCfg) } -func (m *Manager) Watch(fn func(targets map[string]*strategy.TargetItem)) { +func (m *Manager) Watch(fn func(targets map[string]*allocation.TargetItem)) { log := m.log.WithValues("component", "opentelemetry-targetallocator") go func() { @@ -70,14 +70,14 @@ func (m *Manager) Watch(fn func(targets map[string]*strategy.TargetItem)) { log.Info("Service Discovery watch event stopped: discovery manager closed") return case tsets := <-m.manager.SyncCh(): - targets := map[string]*strategy.TargetItem{} + targets := map[string]*allocation.TargetItem{} for jobName, tgs := range tsets { var count float64 = 0 for _, tg := range tgs { for _, t := range tg.Targets { count++ - item := &strategy.TargetItem{ + item := &allocation.TargetItem{ JobName: jobName, TargetURL: string(t[model.AddressLabel]), Label: t.Merge(tg.Labels), diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index bcb71deeae..5b63d4bfdc 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -3,12 +3,11 @@ package discovery import ( "context" "fmt" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "os" "sort" "testing" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" - gokitlog "github.com/go-kit/log" "github.com/prometheus/common/model" promconfig "github.com/prometheus/prometheus/config" @@ -34,7 +33,7 @@ func TestMain(m *testing.M) { manager = NewManager(ctrl.Log.WithName("test"), context.Background(), gokitlog.NewNopLogger()) results = make(chan []string) - manager.Watch(func(targets map[string]*strategy.TargetItem) { + manager.Watch(func(targets map[string]*allocation.TargetItem) { var result []string for _, t := range targets { result = append(result, t.TargetURL) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index a69067bdc9..ebc667aa6b 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -20,8 +20,6 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" - _ "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/least_weighted" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery" @@ -57,7 +55,7 @@ func main() { log := ctrl.Log.WithName("allocator") - allocator, err := strategy.New(cfg.GetAllocationStrategy(), log) + allocator, err := allocation.New(cfg.GetAllocationStrategy(), log) if err != nil { setupLog.Error(err, "Unable to initialize allocation strategy") os.Exit(1) @@ -140,13 +138,13 @@ func main() { type server struct { logger logr.Logger - allocator strategy.Allocator + allocator allocation.Allocator discoveryManager *lbdiscovery.Manager k8sClient *collector.Client server *http.Server } -func newServer(log logr.Logger, allocator strategy.Allocator, discoveryManager *lbdiscovery.Manager, k8sclient *collector.Client, listenAddr *string) (*server, error) { +func newServer(log logr.Logger, allocator allocation.Allocator, discoveryManager *lbdiscovery.Manager, k8sclient *collector.Client, listenAddr *string) (*server, error) { s := &server{ logger: log, allocator: allocator, @@ -162,7 +160,7 @@ func newServer(log logr.Logger, allocator strategy.Allocator, discoveryManager * return s, nil } -func configureFileDiscovery(log logr.Logger, allocator strategy.Allocator, discoveryManager *lbdiscovery.Manager, ctx context.Context, cliConfig config.CLIConfig) (*collector.Client, error) { +func configureFileDiscovery(log logr.Logger, allocator allocation.Allocator, discoveryManager *lbdiscovery.Manager, ctx context.Context, cliConfig config.CLIConfig) (*collector.Client, error) { cfg, err := config.Load(*cliConfig.ConfigFilePath) if err != nil { return nil, err @@ -194,9 +192,9 @@ func (s *server) Shutdown(ctx context.Context) error { } func (s *server) JobHandler(w http.ResponseWriter, r *http.Request) { - displayData := make(map[string]strategy.LinkJSON) + displayData := make(map[string]allocation.LinkJSON) for _, v := range s.allocator.TargetItems() { - displayData[v.JobName] = strategy.LinkJSON{Link: v.Link.Link} + displayData[v.JobName] = allocation.LinkJSON{Link: v.Link.Link} } jsonHandler(w, r, displayData) } @@ -215,7 +213,7 @@ func (s *server) PrometheusMiddleware(next http.Handler) http.Handler { func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) { q := r.URL.Query()["collector_id"] - var compareMap = make(map[string][]strategy.TargetItem) // CollectorName+jobName -> TargetItem + var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem for _, v := range s.allocator.TargetItems() { compareMap[v.CollectorName+v.JobName] = append(compareMap[v.CollectorName+v.JobName], *v) } diff --git a/cmd/otel-allocator/watcher/main.go b/cmd/otel-allocator/watcher/main.go index dbfc275b8b..c752a87052 100644 --- a/cmd/otel-allocator/watcher/main.go +++ b/cmd/otel-allocator/watcher/main.go @@ -2,17 +2,17 @@ package watcher import ( "fmt" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/go-logr/logr" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation/strategy" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" ) type Manager struct { Events chan Event Errors chan error - allocator strategy.Allocator + allocator allocation.Allocator watchers []Watcher } @@ -45,7 +45,7 @@ func (e EventSource) String() string { return eventSourceToString[e] } -func NewWatcher(logger logr.Logger, config config.CLIConfig, allocator strategy.Allocator) (*Manager, error) { +func NewWatcher(logger logr.Logger, config config.CLIConfig, allocator allocation.Allocator) (*Manager, error) { watcher := Manager{ allocator: allocator, Events: make(chan Event), From fb83da0ff3e5d8aa8b1afd07cd72b49f6908634b Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Fri, 9 Sep 2022 15:01:04 -0400 Subject: [PATCH 6/7] Unexported some fields, added blocker and comments about invariants --- cmd/otel-allocator/allocation/http.go | 22 +++++++++---------- cmd/otel-allocator/allocation/http_test.go | 8 +++---- .../allocation/least_weighted.go | 13 +++++++---- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/cmd/otel-allocator/allocation/http.go b/cmd/otel-allocator/allocation/http.go index 6a914017e4..88622aef6e 100644 --- a/cmd/otel-allocator/allocation/http.go +++ b/cmd/otel-allocator/allocation/http.go @@ -11,41 +11,41 @@ type LinkJSON struct { Link string `json:"_link"` } -type CollectorJSON struct { +type collectorJSON struct { Link string `json:"_link"` - Jobs []TargetGroupJSON `json:"targets"` + Jobs []targetGroupJSON `json:"targets"` } -type TargetGroupJSON struct { +type targetGroupJSON struct { Targets []string `json:"targets"` Labels model.LabelSet `json:"labels"` } -func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allocator) map[string]CollectorJSON { - displayData := make(map[string]CollectorJSON) +func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator Allocator) map[string]collectorJSON { + displayData := make(map[string]collectorJSON) for _, j := range allocator.TargetItems() { if j.JobName == job { var targetList []TargetItem targetList = append(targetList, cMap[j.CollectorName+j.JobName]...) - var targetGroupList []TargetGroupJSON + var targetGroupList []targetGroupJSON for _, t := range targetList { - targetGroupList = append(targetGroupList, TargetGroupJSON{ + targetGroupList = append(targetGroupList, targetGroupJSON{ Targets: []string{t.TargetURL}, Labels: t.Label, }) } - displayData[j.CollectorName] = CollectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList} + displayData[j.CollectorName] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList} } } return displayData } -func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator Allocator) []TargetGroupJSON { - var tgs []TargetGroupJSON +func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]TargetItem, allocator Allocator) []targetGroupJSON { + var tgs []targetGroupJSON group := make(map[string]string) labelSet := make(map[string]model.LabelSet) if _, ok := allocator.Collectors()[collector]; ok { @@ -59,7 +59,7 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin } } for _, v := range group { - tgs = append(tgs, TargetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) + tgs = append(tgs, targetGroupJSON{Targets: []string{v}, Labels: labelSet[v]}) } return tgs diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index 94cf53a673..1b8cf97260 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -22,7 +22,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { var tests = []struct { name string args args - want []TargetGroupJSON + want []targetGroupJSON }{ { name: "Empty target map", @@ -53,7 +53,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, allocator: baseAllocator, }, - want: []TargetGroupJSON{ + want: []targetGroupJSON{ { Targets: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ @@ -91,7 +91,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, allocator: baseAllocator, }, - want: []TargetGroupJSON{ + want: []targetGroupJSON{ { Targets: []string{"test-url"}, Labels: map[model.LabelName]model.LabelValue{ @@ -130,7 +130,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { }, allocator: baseAllocator, }, - want: []TargetGroupJSON{ + want: []targetGroupJSON{ { Targets: []string{"test-url1"}, Labels: map[model.LabelName]model.LabelValue{ diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 199baa902a..a3cd974d4b 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -11,10 +11,9 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -var ( - _ Allocator = &leastWeightedAllocator{} - strategyName = "least-weighted" -) +var _ Allocator = &leastWeightedAllocator{} + +const strategyName = "least-weighted" /* Load balancer will serve on an HTTP server exposing /jobs//targets @@ -63,6 +62,7 @@ func (allocator *leastWeightedAllocator) Collectors() map[string]*Collector { // findNextCollector finds the next collector with fewer number of targets. // This method is called from within SetTargets and SetCollectors, whose caller // acquires the needed lock. This method assumes there are is at least 1 collector set +// INVARIANT: allocator.collectors must have at least 1 collector set func (allocator *leastWeightedAllocator) findNextCollector() *Collector { var col *Collector for _, v := range allocator.collectors { @@ -79,6 +79,7 @@ func (allocator *leastWeightedAllocator) findNextCollector() *Collector { // addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems // This method is called from within SetTargets and SetCollectors, which acquire the needed lock. // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap +// INVARIANT: allocator.collectors must have at least 1 collector set func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetItem) { chosenCollector := allocator.findNextCollector() targetItem := &TargetItem{ @@ -146,6 +147,10 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetIt allocator.m.Lock() defer allocator.m.Unlock() + if len(allocator.collectors) == 0 { + allocator.log.Info("No collector instances present, cannot set targets") + return + } // Check for target changes targetsDiff := diff.Maps(allocator.targetItems, targets) // If there are any additions or removals From 41c1124ffed6a1f2c97ccad8337ae8ab37ee372b Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 12 Sep 2022 11:05:47 -0400 Subject: [PATCH 7/7] Comments and imports fixed --- apis/v1alpha1/zz_generated.deepcopy.go | 2 +- cmd/otel-allocator/allocation/least_weighted.go | 6 ++++++ cmd/otel-allocator/allocation/strategy.go | 2 +- cmd/otel-allocator/collector/collector.go | 3 ++- cmd/otel-allocator/collector/collector_test.go | 3 ++- cmd/otel-allocator/diff/diff.go | 4 ++++ cmd/otel-allocator/discovery/discovery.go | 2 +- cmd/otel-allocator/discovery/discovery_test.go | 2 +- cmd/otel-allocator/main.go | 3 +-- cmd/otel-allocator/watcher/main.go | 2 +- 10 files changed, 20 insertions(+), 9 deletions(-) diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index cba3b24b99..776bebf9fd 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -20,7 +20,7 @@ package v1alpha1 import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index a3cd974d4b..9e037567d6 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -94,6 +94,9 @@ func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetIt TargetsPerCollector.WithLabelValues(chosenCollector.Name, strategyName).Set(float64(chosenCollector.NumTargets)) } +// handleTargets receives the new and removed targets and reconciles the current state. +// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector +// Any net-new additions are assigned to the next available collector func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*TargetItem]) { // Check for removals for k, target := range allocator.targetItems { @@ -118,6 +121,9 @@ func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*Target } } +// handleCollectors receives the new and removed collectors and reconciles the current state. +// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map +// Finally, any targets of removed collectors are reallocated to the next available collector. func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Collector]) { // Clear removed collectors for _, k := range diff.Removals() { diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index f1654260de..42166caf78 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -3,12 +3,12 @@ package allocation import ( "errors" "fmt" - "github.com/prometheus/common/model" "net/url" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" ) type AllocatorProvider func(log logr.Logger) Allocator diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index 90a4236f0b..e1cf66ec1f 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -2,7 +2,6 @@ package collector import ( "context" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "os" "strconv" "time" @@ -16,6 +15,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) const ( diff --git a/cmd/otel-allocator/collector/collector_test.go b/cmd/otel-allocator/collector/collector_test.go index 486a155e14..059e616702 100644 --- a/cmd/otel-allocator/collector/collector_test.go +++ b/cmd/otel-allocator/collector/collector_test.go @@ -3,7 +3,6 @@ package collector import ( "context" "fmt" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "os" "testing" @@ -12,6 +11,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/fake" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) var client Client diff --git a/cmd/otel-allocator/diff/diff.go b/cmd/otel-allocator/diff/diff.go index 425ca30d86..44ec55e040 100644 --- a/cmd/otel-allocator/diff/diff.go +++ b/cmd/otel-allocator/diff/diff.go @@ -1,5 +1,7 @@ package diff +// Changes is the result of the difference between two maps – items that are added and items that are removed +// This map is used to reconcile state differences. type Changes[T any] struct { additions map[string]T removals map[string]T @@ -13,6 +15,8 @@ func (c Changes[T]) Removals() map[string]T { return c.removals } +// Maps generates Changes for two maps with the same type signature by checking for any removals and then checking for +// additions. func Maps[T any](current, new map[string]T) Changes[T] { additions := map[string]T{} removals := map[string]T{} diff --git a/cmd/otel-allocator/discovery/discovery.go b/cmd/otel-allocator/discovery/discovery.go index defa9e8265..2c0393f619 100644 --- a/cmd/otel-allocator/discovery/discovery.go +++ b/cmd/otel-allocator/discovery/discovery.go @@ -2,7 +2,6 @@ package discovery import ( "context" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/go-kit/log" "github.com/go-logr/logr" @@ -12,6 +11,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) diff --git a/cmd/otel-allocator/discovery/discovery_test.go b/cmd/otel-allocator/discovery/discovery_test.go index 5b63d4bfdc..7edacde407 100644 --- a/cmd/otel-allocator/discovery/discovery_test.go +++ b/cmd/otel-allocator/discovery/discovery_test.go @@ -3,7 +3,6 @@ package discovery import ( "context" "fmt" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "os" "sort" "testing" @@ -15,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" ctrl "sigs.k8s.io/controller-runtime" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher" ) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index ebc667aa6b..80d8ec4743 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -9,8 +9,6 @@ import ( "os/signal" "syscall" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" - gokitlog "github.com/go-kit/log" "github.com/go-logr/logr" "github.com/gorilla/mux" @@ -20,6 +18,7 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/collector" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" lbdiscovery "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/discovery" diff --git a/cmd/otel-allocator/watcher/main.go b/cmd/otel-allocator/watcher/main.go index c752a87052..dac48d5fb2 100644 --- a/cmd/otel-allocator/watcher/main.go +++ b/cmd/otel-allocator/watcher/main.go @@ -2,10 +2,10 @@ package watcher import ( "fmt" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/config" )