From df48eb9085e21c4fd41e38db3da3658f7e54b9c5 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 12 Sep 2022 18:25:46 -0400 Subject: [PATCH 1/9] Added consistent hashing strategy --- .../allocation/consistent_hashing.go | 189 ++++++++++++++++++ .../allocation/consistent_hashing_test.go | 91 +++++++++ .../allocation/least_weighted.go | 23 +-- .../allocation/least_weighted_test.go | 26 ++- cmd/otel-allocator/allocation/strategy.go | 18 ++ cmd/otel-allocator/go.mod | 1 + cmd/otel-allocator/go.sum | 2 + 7 files changed, 325 insertions(+), 25 deletions(-) create mode 100644 cmd/otel-allocator/allocation/consistent_hashing.go create mode 100644 cmd/otel-allocator/allocation/consistent_hashing_test.go diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go new file mode 100644 index 0000000000..57c7226c99 --- /dev/null +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -0,0 +1,189 @@ +package allocation + +import ( + "fmt" + "github.com/buraksezer/consistent" + "github.com/cespare/xxhash/v2" + "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" + "github.com/prometheus/client_golang/prometheus" + "net/url" + "sync" +) + +var _ Allocator = &consistentHashingAllocator{} + +const consistentHashingStrategyName = "consistent-hashing" + +type hasher struct{} + +func (h hasher) Sum64(data []byte) uint64 { + return xxhash.Sum64(data) +} + +type consistentHashingAllocator struct { + // m protects Consistent for concurrent use. + m sync.RWMutex + + hasher *consistent.Consistent + + // collectors is a map from a Collector's name to a Collector instance + collectors map[string]*Collector + + // targetItems is a map from a target item's hash to the target items allocated state + targetItems map[string]*TargetItem + + log logr.Logger +} + +func newConsistentHashingAllocator(log logr.Logger) Allocator { + config := consistent.Config{ + PartitionCount: 1061, + ReplicationFactor: 5, + Load: 1.1, + Hasher: hasher{}, + } + consistentHasher := consistent.New(nil, config) + return &consistentHashingAllocator{ + hasher: consistentHasher, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*TargetItem), + log: log, + } +} + +// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems +// This method is called from within SetTargets and SetCollectors, which acquire the needed lock. +// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap +// INVARIANT: c.collectors must have at least 1 collector set +func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) { + colOwner := c.hasher.LocateKey([]byte(target.Hash())) + targetItem := &TargetItem{ + JobName: target.JobName, + Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, + TargetURL: target.TargetURL, + Label: target.Label, + CollectorName: colOwner.String(), + } + c.targetItems[targetItem.Hash()] = targetItem + c.collectors[colOwner.String()].NumTargets++ + TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets)) +} + +// handleTargets receives the new and removed targets and reconciles the current state. +// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector +// Any net-new additions are assigned to the next available collector +func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem]) { + // Check for removals + for k, target := range c.targetItems { + // if the current target is in the removals list + if _, ok := diff.Removals()[k]; ok { + col := c.collectors[target.CollectorName] + col.NumTargets-- + delete(c.targetItems, k) + TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets)) + } + } + + // Check for additions + for k, target := range diff.Additions() { + // Do nothing if the item is already there + if _, ok := c.targetItems[k]; ok { + continue + } else { + // Add target to target pool and assign a collector + c.addTargetToTargetItems(target) + } + } +} + +// handleCollectors receives the new and removed collectors and reconciles the current state. +// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map +// Finally, any targets of removed collectors are reallocated to the next available collector. +func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collector]) { + // Clear removed collectors + for _, k := range diff.Removals() { + delete(c.collectors, k.Name) + c.hasher.Remove(k.Name) + TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0) + } + // Insert the new collectors + for _, i := range diff.Additions() { + c.collectors[i.Name] = NewCollector(i.Name) + c.hasher.Add(c.collectors[i.Name]) + } + + // Re-Allocate all targets + for _, item := range c.targetItems { + c.addTargetToTargetItems(item) + } +} + +// SetTargets accepts a list of targets that will be used to make +// load balancing decisions. This method should be called when there are +// new targets discovered or existing targets are shutdown. +func (c *consistentHashingAllocator) SetTargets(targets map[string]*TargetItem) { + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName)) + defer timer.ObserveDuration() + + c.m.Lock() + defer c.m.Unlock() + + if len(c.collectors) == 0 { + c.log.Info("No collector instances present, cannot set targets") + return + } + // Check for target changes + targetsDiff := diff.Maps(c.targetItems, targets) + // If there are any additions or removals + if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { + c.handleTargets(targetsDiff) + } + return +} + +// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. +// This method is called when Collectors are added or removed. +func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) { + log := c.log.WithValues("component", "opentelemetry-targetallocator") + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName)) + defer timer.ObserveDuration() + + CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors))) + if len(collectors) == 0 { + log.Info("No collector instances present") + return + } + + c.m.Lock() + defer c.m.Unlock() + + // Check for collector changes + collectorsDiff := diff.Maps(c.collectors, collectors) + if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { + c.handleCollectors(collectorsDiff) + } + return +} + +// TargetItems returns a shallow copy of the targetItems map. +func (c *consistentHashingAllocator) TargetItems() map[string]*TargetItem { + c.m.RLock() + defer c.m.RUnlock() + targetItemsCopy := make(map[string]*TargetItem) + for k, v := range c.targetItems { + targetItemsCopy[k] = v + } + return targetItemsCopy +} + +// Collectors returns a shallow copy of the collectors map. +func (c *consistentHashingAllocator) Collectors() map[string]*Collector { + c.m.RLock() + defer c.m.RUnlock() + collectorsCopy := make(map[string]*Collector) + for k, v := range c.collectors { + collectorsCopy[k] = v + } + return collectorsCopy +} diff --git a/cmd/otel-allocator/allocation/consistent_hashing_test.go b/cmd/otel-allocator/allocation/consistent_hashing_test.go new file mode 100644 index 0000000000..b40f50e6b5 --- /dev/null +++ b/cmd/otel-allocator/allocation/consistent_hashing_test.go @@ -0,0 +1,91 @@ +package allocation + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestCanSetSingleTarget(t *testing.T) { + cols := makeNCollectors(3, 0, 0) + c := newConsistentHashingAllocator(logger) + c.SetCollectors(cols) + c.SetTargets(makeNNewTargets(1, 3, 0)) + actualTargetItems := c.TargetItems() + assert.Len(t, actualTargetItems, 1) + for _, item := range actualTargetItems { + assert.Equal(t, "collector-2", item.CollectorName) + } +} + +func TestRelativelyEvenDistribution(t *testing.T) { + numCols := 15 + numItems := 10000 + cols := makeNCollectors(numCols, 0, 0) + var expectedPerCollector = float64(numItems / numCols) + expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector + c := newConsistentHashingAllocator(logger) + c.SetCollectors(cols) + c.SetTargets(makeNNewTargets(numItems, numCols, 0)) + actualTargetItems := c.TargetItems() + assert.Len(t, actualTargetItems, numItems) + actualCollectors := c.Collectors() + assert.Len(t, actualCollectors, numCols) + for _, col := range actualCollectors { + t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets) + assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta) + } +} + +func TestFullReallocation(t *testing.T) { + cols := makeNCollectors(10, 0, 0) + c := newConsistentHashingAllocator(logger) + c.SetCollectors(cols) + c.SetTargets(makeNNewTargets(10000, 10, 0)) + actualTargetItems := c.TargetItems() + assert.Len(t, actualTargetItems, 10000) + actualCollectors := c.Collectors() + assert.Len(t, actualCollectors, 10) + newCols := makeNCollectors(10, 0, 10) + c.SetCollectors(newCols) + updatedTargetItems := c.TargetItems() + assert.Len(t, updatedTargetItems, 10000) + updatedCollectors := c.Collectors() + assert.Len(t, updatedCollectors, 10) + for _, item := range updatedTargetItems { + _, ok := updatedCollectors[item.CollectorName] + assert.True(t, ok, "Some items weren't reallocated correctly") + } +} + +func TestNumRemapped(t *testing.T) { + numItems := 10_000 + numInitialCols := 15 + numFinalCols := 16 + expectedDelta := float64((numFinalCols - numInitialCols) * (numItems / numFinalCols)) + cols := makeNCollectors(numInitialCols, 0, 0) + c := newConsistentHashingAllocator(logger) + c.SetCollectors(cols) + c.SetTargets(makeNNewTargets(numItems, numInitialCols, 0)) + actualTargetItems := c.TargetItems() + assert.Len(t, actualTargetItems, numItems) + actualCollectors := c.Collectors() + assert.Len(t, actualCollectors, numInitialCols) + newCols := makeNCollectors(numFinalCols, 0, 0) + c.SetCollectors(newCols) + updatedTargetItems := c.TargetItems() + assert.Len(t, updatedTargetItems, numItems) + updatedCollectors := c.Collectors() + assert.Len(t, updatedCollectors, numFinalCols) + countRemapped := 0 + countNotRemapped := 0 + for _, item := range updatedTargetItems { + previousItem, ok := actualTargetItems[item.Hash()] + assert.True(t, ok) + if previousItem.CollectorName != item.CollectorName { + countRemapped++ + } else { + countNotRemapped++ + } + } + assert.InDelta(t, numItems/numFinalCols, countRemapped, expectedDelta) +} diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index 9e037567d6..3824d1a1d6 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -13,7 +13,7 @@ import ( var _ Allocator = &leastWeightedAllocator{} -const strategyName = "least-weighted" +const leastWeightedStrategyName = "least-weighted" /* Load balancer will serve on an HTTP server exposing /jobs//targets @@ -91,7 +91,7 @@ func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetIt } allocator.targetItems[targetItem.Hash()] = targetItem chosenCollector.NumTargets++ - TargetsPerCollector.WithLabelValues(chosenCollector.Name, strategyName).Set(float64(chosenCollector.NumTargets)) + TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets)) } // handleTargets receives the new and removed targets and reconciles the current state. @@ -105,7 +105,7 @@ func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*Target c := allocator.collectors[target.CollectorName] c.NumTargets-- delete(allocator.targetItems, k) - TargetsPerCollector.WithLabelValues(target.CollectorName, strategyName).Set(float64(c.NumTargets)) + TargetsPerCollector.WithLabelValues(target.CollectorName, leastWeightedStrategyName).Set(float64(c.NumTargets)) } } @@ -128,7 +128,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col // Clear removed collectors for _, k := range diff.Removals() { delete(allocator.collectors, k.Name) - TargetsPerCollector.WithLabelValues(k.Name, strategyName).Set(0) + TargetsPerCollector.WithLabelValues(k.Name, leastWeightedStrategyName).Set(0) } // Insert the new collectors for _, i := range diff.Additions() { @@ -147,7 +147,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col // load balancing decisions. This method should be called when there are // new targets discovered or existing targets are shutdown. func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetItem) { - timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", strategyName)) + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName)) defer timer.ObserveDuration() allocator.m.Lock() @@ -170,10 +170,10 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetIt // This method is called when Collectors are added or removed. func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Collector) { log := allocator.log.WithValues("component", "opentelemetry-targetallocator") - timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", strategyName)) + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", leastWeightedStrategyName)) defer timer.ObserveDuration() - CollectorsAllocatable.WithLabelValues(strategyName).Set(float64(len(collectors))) + CollectorsAllocatable.WithLabelValues(leastWeightedStrategyName).Set(float64(len(collectors))) if len(collectors) == 0 { log.Info("No collector instances present") return @@ -190,17 +190,10 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co return } -func NewAllocator(log logr.Logger) Allocator { +func newLeastWeightedAllocator(log logr.Logger) Allocator { return &leastWeightedAllocator{ log: log, collectors: make(map[string]*Collector), targetItems: make(map[string]*TargetItem), } } - -func init() { - err := Register(strategyName, NewAllocator) - if err != nil { - panic(err) - } -} diff --git a/cmd/otel-allocator/allocation/least_weighted_test.go b/cmd/otel-allocator/allocation/least_weighted_test.go index c15c30a301..6b31188b29 100644 --- a/cmd/otel-allocator/allocation/least_weighted_test.go +++ b/cmd/otel-allocator/allocation/least_weighted_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "math/rand" + "strconv" "testing" "github.com/prometheus/common/model" @@ -17,15 +18,20 @@ func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*Ta toReturn := map[string]*TargetItem{} for i := startingIndex; i < n+startingIndex; i++ { collector := fmt.Sprintf("collector-%d", i%numCollectors) - newTarget := NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", nil, collector) + label := model.LabelSet{ + "collector": model.LabelValue(collector), + "i": model.LabelValue(strconv.Itoa(i)), + "total": model.LabelValue(strconv.Itoa(n + startingIndex)), + } + newTarget := NewTargetItem(fmt.Sprintf("test-job-%d", i), "test-url", label, collector) toReturn[newTarget.Hash()] = newTarget } return toReturn } -func makeNCollectors(n int, targetsForEach int) map[string]*Collector { +func makeNCollectors(n int, targetsForEach int, startingIndex int) map[string]*Collector { toReturn := map[string]*Collector{} - for i := 0; i < n; i++ { + for i := startingIndex; i < n+startingIndex; i++ { collector := fmt.Sprintf("collector-%d", i) toReturn[collector] = &Collector{ Name: collector, @@ -38,7 +44,7 @@ func makeNCollectors(n int, targetsForEach int) map[string]*Collector { func TestSetCollectors(t *testing.T) { s, _ := New("least-weighted", logger) - cols := makeNCollectors(3, 0) + cols := makeNCollectors(3, 0, 0) s.SetCollectors(cols) expectedColLen := len(cols) @@ -54,7 +60,7 @@ func TestAddingAndRemovingTargets(t *testing.T) { // prepare allocator with initial targets and collectors s, _ := New("least-weighted", logger) - cols := makeNCollectors(3, 0) + cols := makeNCollectors(3, 0, 0) s.SetCollectors(cols) initTargets := makeNNewTargets(6, 3, 0) @@ -89,7 +95,7 @@ func TestAllocationCollision(t *testing.T) { // prepare allocator with initial targets and collectors s, _ := New("least-weighted", logger) - cols := makeNCollectors(3, 0) + cols := makeNCollectors(3, 0, 0) s.SetCollectors(cols) firstLabels := model.LabelSet{ "test": "test1", @@ -123,7 +129,7 @@ func TestAllocationCollision(t *testing.T) { func TestNoCollectorReassignment(t *testing.T) { s, _ := New("least-weighted", logger) - cols := makeNCollectors(3, 0) + cols := makeNCollectors(3, 0, 0) s.SetCollectors(cols) expectedColLen := len(cols) @@ -143,7 +149,7 @@ func TestNoCollectorReassignment(t *testing.T) { assert.Len(t, targetItems, expectedTargetLen) // assign new set of collectors with the same names - newCols := makeNCollectors(3, 0) + newCols := makeNCollectors(3, 0, 0) s.SetCollectors(newCols) newTargetItems := s.TargetItems() @@ -154,7 +160,7 @@ func TestNoCollectorReassignment(t *testing.T) { func TestSmartCollectorReassignment(t *testing.T) { s, _ := New("least-weighted", logger) - cols := makeNCollectors(4, 0) + cols := makeNCollectors(4, 0, 0) s.SetCollectors(cols) expectedColLen := len(cols) @@ -205,7 +211,7 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { // prepare allocator with 3 collectors and 'random' amount of targets s, _ := New("least-weighted", logger) - cols := makeNCollectors(3, 0) + cols := makeNCollectors(3, 0, 0) s.SetCollectors(cols) targets := makeNNewTargets(27, 3, 0) diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 42166caf78..3732546e0e 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -3,6 +3,7 @@ package allocation import ( "errors" "fmt" + "github.com/buraksezer/consistent" "net/url" "github.com/go-logr/logr" @@ -76,6 +77,8 @@ func (t TargetItem) Hash() string { return t.JobName + t.TargetURL + t.Label.Fingerprint().String() } +var _ consistent.Member = Collector{} + // Collector Creates a struct that holds Collector information // This struct will be parsed into endpoint with Collector and jobs info // This struct can be extended with information like annotations and labels in the future @@ -84,6 +87,21 @@ type Collector struct { NumTargets int } +func (c Collector) String() string { + return c.Name +} + func NewCollector(name string) *Collector { return &Collector{Name: name} } + +func init() { + err := Register(leastWeightedStrategyName, newLeastWeightedAllocator) + if err != nil { + panic(err) + } + err = Register(consistentHashingStrategyName, newConsistentHashingAllocator) + if err != nil { + panic(err) + } +} diff --git a/cmd/otel-allocator/go.mod b/cmd/otel-allocator/go.mod index af1eec2eec..93fafa28ef 100644 --- a/cmd/otel-allocator/go.mod +++ b/cmd/otel-allocator/go.mod @@ -42,6 +42,7 @@ require ( github.com/aws/aws-sdk-go v1.44.41 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect + github.com/buraksezer/consistent v0.9.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe // indirect github.com/containerd/containerd v1.5.7 // indirect diff --git a/cmd/otel-allocator/go.sum b/cmd/otel-allocator/go.sum index e3d28aeff3..5230d2b703 100644 --- a/cmd/otel-allocator/go.sum +++ b/cmd/otel-allocator/go.sum @@ -305,6 +305,8 @@ github.com/buger/jsonparser v0.0.0-20180808090653-f4dd9f5a6b44/go.mod h1:bbYlZJ7 github.com/bugsnag/bugsnag-go v0.0.0-20141110184014-b1d153021fcd/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8= github.com/bugsnag/osext v0.0.0-20130617224835-0dd3f918b21b/go.mod h1:obH5gd0BsqsP2LwDJ9aOkm/6J86V6lyAXCoQWGw3K50= github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= +github.com/buraksezer/consistent v0.9.0 h1:Zfs6bX62wbP3QlbPGKUhqDw7SmNkOzY5bHZIYXYpR5g= +github.com/buraksezer/consistent v0.9.0/go.mod h1:6BrVajWq7wbKZlTOUPs/XVfR8c0maujuPowduSpZqmw= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= github.com/cactus/go-statsd-client/statsd v0.0.0-20191106001114-12b4e2b38748/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= From 64607dc0a10ba204b6a76c9e2993f095fa0befdb Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 13 Sep 2022 14:19:40 -0400 Subject: [PATCH 2/9] testing fix, metrics fix --- .../allocation/consistent_hashing.go | 25 +++++++++++-------- .../allocation/consistent_hashing_test.go | 2 +- .../allocation/least_weighted_test.go | 11 ++++++-- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index 57c7226c99..b375b70026 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -22,10 +22,10 @@ func (h hasher) Sum64(data []byte) uint64 { } type consistentHashingAllocator struct { - // m protects Consistent for concurrent use. + // m protects consistentHasher, collectors and targetItems for concurrent use. m sync.RWMutex - hasher *consistent.Consistent + consistentHasher *consistent.Consistent // collectors is a map from a Collector's name to a Collector instance collectors map[string]*Collector @@ -45,10 +45,10 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator { } consistentHasher := consistent.New(nil, config) return &consistentHashingAllocator{ - hasher: consistentHasher, - collectors: make(map[string]*Collector), - targetItems: make(map[string]*TargetItem), - log: log, + consistentHasher: consistentHasher, + collectors: make(map[string]*Collector), + targetItems: make(map[string]*TargetItem), + log: log, } } @@ -57,7 +57,12 @@ func newConsistentHashingAllocator(log logr.Logger) Allocator { // This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap // INVARIANT: c.collectors must have at least 1 collector set func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) { - colOwner := c.hasher.LocateKey([]byte(target.Hash())) + // Check if this is a reassignment, if so, decrement the previous collector's NumTargets + if previousColName, ok := c.collectors[target.CollectorName]; ok { + previousColName.NumTargets-- + TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets)) + } + colOwner := c.consistentHasher.LocateKey([]byte(target.Hash())) targetItem := &TargetItem{ JobName: target.JobName, Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, @@ -99,18 +104,18 @@ func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem // handleCollectors receives the new and removed collectors and reconciles the current state. // Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map -// Finally, any targets of removed collectors are reallocated to the next available collector. +// Finally, update all targets' collectors to match the consistent hashing. func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collector]) { // Clear removed collectors for _, k := range diff.Removals() { delete(c.collectors, k.Name) - c.hasher.Remove(k.Name) + c.consistentHasher.Remove(k.Name) TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0) } // Insert the new collectors for _, i := range diff.Additions() { c.collectors[i.Name] = NewCollector(i.Name) - c.hasher.Add(c.collectors[i.Name]) + c.consistentHasher.Add(c.collectors[i.Name]) } // Re-Allocate all targets diff --git a/cmd/otel-allocator/allocation/consistent_hashing_test.go b/cmd/otel-allocator/allocation/consistent_hashing_test.go index b40f50e6b5..1f38d893cf 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing_test.go +++ b/cmd/otel-allocator/allocation/consistent_hashing_test.go @@ -25,7 +25,7 @@ func TestRelativelyEvenDistribution(t *testing.T) { expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector c := newConsistentHashingAllocator(logger) c.SetCollectors(cols) - c.SetTargets(makeNNewTargets(numItems, numCols, 0)) + c.SetTargets(makeNNewTargets(numItems, 0, 0)) actualTargetItems := c.TargetItems() assert.Len(t, actualTargetItems, numItems) actualCollectors := c.Collectors() diff --git a/cmd/otel-allocator/allocation/least_weighted_test.go b/cmd/otel-allocator/allocation/least_weighted_test.go index 6b31188b29..d6b98b3d09 100644 --- a/cmd/otel-allocator/allocation/least_weighted_test.go +++ b/cmd/otel-allocator/allocation/least_weighted_test.go @@ -14,10 +14,17 @@ import ( var logger = logf.Log.WithName("unit-tests") +func colIndex(index, numCols int) int { + if numCols == 0 { + return -1 + } + return index % numCols +} + func makeNNewTargets(n int, numCollectors int, startingIndex int) map[string]*TargetItem { toReturn := map[string]*TargetItem{} for i := startingIndex; i < n+startingIndex; i++ { - collector := fmt.Sprintf("collector-%d", i%numCollectors) + collector := fmt.Sprintf("collector-%d", colIndex(i, numCollectors)) label := model.LabelSet{ "collector": model.LabelValue(collector), "i": model.LabelValue(strconv.Itoa(i)), @@ -169,7 +176,7 @@ func TestSmartCollectorReassignment(t *testing.T) { for _, i := range cols { assert.NotNil(t, s.Collectors()[i.Name]) } - initTargets := makeNNewTargets(6, 4, 0) + initTargets := makeNNewTargets(6, 0, 0) // test that targets and collectors are added properly s.SetTargets(initTargets) From 1ce7876cd21c7a9c81ea4edeec4a083dfd58d347 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 14 Sep 2022 10:39:59 -0400 Subject: [PATCH 3/9] Add configurable replicas --- apis/v1alpha1/opentelemetrycollector_types.go | 4 ++++ apis/v1alpha1/opentelemetrycollector_webhook.go | 9 ++++++--- apis/v1alpha1/zz_generated.deepcopy.go | 9 +++++++-- .../opentelemetry.io_opentelemetrycollectors.yaml | 6 ++++++ .../bases/opentelemetry.io_opentelemetrycollectors.yaml | 6 ++++++ docs/api.md | 9 +++++++++ pkg/targetallocator/deployment.go | 4 +--- 7 files changed, 39 insertions(+), 8 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index ac79522da0..0239730268 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -123,6 +123,10 @@ type OpenTelemetryTargetAllocator struct { // All CR instances which the ServiceAccount has access to will be retrieved. This includes other namespaces. // +optional PrometheusCR OpenTelemetryTargetAllocatorPrometheusCR `json:"prometheusCR,omitempty"` + // Replicas is the number of pod instances for the underlying TargetAllocator, this can only be set to values other + // than 1 if a strategy that allows for high availability is chosen. + // +optional + Replicas *int32 `json:"replicas,omitempty"` } type OpenTelemetryTargetAllocatorPrometheusCR struct { diff --git a/apis/v1alpha1/opentelemetrycollector_webhook.go b/apis/v1alpha1/opentelemetrycollector_webhook.go index 6a4b1e936f..f6138d6a5b 100644 --- a/apis/v1alpha1/opentelemetrycollector_webhook.go +++ b/apis/v1alpha1/opentelemetrycollector_webhook.go @@ -56,12 +56,15 @@ func (r *OpenTelemetryCollector) Default() { r.Labels["app.kubernetes.io/managed-by"] = "opentelemetry-operator" } + // We can default to one because dependent objects Deployment and HorizontalPodAutoScaler + // default to 1 as well. + one := int32(1) if r.Spec.Replicas == nil { - // We can default to one because dependent objects Deployment and HorizontalPodAutoScaler - // default to 1 as well. - one := int32(1) r.Spec.Replicas = &one } + if r.Spec.TargetAllocator.Enabled && r.Spec.TargetAllocator.Replicas == nil { + r.Spec.TargetAllocator.Replicas = &one + } } // +kubebuilder:webhook:verbs=create;update,path=/validate-opentelemetry-io-v1alpha1-opentelemetrycollector,mutating=false,failurePolicy=fail,groups=opentelemetry.io,resources=opentelemetrycollectors,versions=v1alpha1,name=vopentelemetrycollectorcreateupdate.kb.io,sideEffects=none,admissionReviewVersions=v1 diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 776bebf9fd..d1c24913ff 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -20,7 +20,7 @@ package v1alpha1 import ( - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -322,7 +322,7 @@ func (in *OpenTelemetryCollectorSpec) DeepCopyInto(out *OpenTelemetryCollectorSp (*out)[key] = val } } - out.TargetAllocator = in.TargetAllocator + in.TargetAllocator.DeepCopyInto(&out.TargetAllocator) if in.VolumeMounts != nil { in, out := &in.VolumeMounts, &out.VolumeMounts *out = make([]v1.VolumeMount, len(*in)) @@ -409,6 +409,11 @@ func (in *OpenTelemetryCollectorStatus) DeepCopy() *OpenTelemetryCollectorStatus func (in *OpenTelemetryTargetAllocator) DeepCopyInto(out *OpenTelemetryTargetAllocator) { *out = *in out.PrometheusCR = in.PrometheusCR + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OpenTelemetryTargetAllocator. diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index 01dc8f2e2b..feb10100db 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -707,6 +707,12 @@ spec: custom resources as targets or not. type: boolean type: object + replicas: + description: Replicas is the number of pod instances for the underlying + TargetAllocator, this can only be set to values other than 1 + if a strategy that allows for high availability is chosen. + format: int32 + type: integer serviceAccount: description: ServiceAccount indicates the name of an existing service account to use with this instance. diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index b2f0bf3d0c..82fc80f7bd 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -705,6 +705,12 @@ spec: custom resources as targets or not. type: boolean type: object + replicas: + description: Replicas is the number of pod instances for the underlying + TargetAllocator, this can only be set to values other than 1 + if a strategy that allows for high availability is chosen. + format: int32 + type: integer serviceAccount: description: ServiceAccount indicates the name of an existing service account to use with this instance. diff --git a/docs/api.md b/docs/api.md index 6dcb8b56ec..9f7c7d886c 100644 --- a/docs/api.md +++ b/docs/api.md @@ -2896,6 +2896,15 @@ TargetAllocator indicates a value which determines whether to spawn a target all PrometheusCR defines the configuration for the retrieval of PrometheusOperator CRDs ( servicemonitor.monitoring.coreos.com/v1 and podmonitor.monitoring.coreos.com/v1 ) retrieval. All CR instances which the ServiceAccount has access to will be retrieved. This includes other namespaces.
false + + replicas + integer + + Replicas is the number of pod instances for the underlying TargetAllocator, this can only be set to values other than 1 if a strategy that allows for high availability is chosen.
+
+ Format: int32
+ + false serviceAccount string diff --git a/pkg/targetallocator/deployment.go b/pkg/targetallocator/deployment.go index f7be218dda..0eb9198534 100644 --- a/pkg/targetallocator/deployment.go +++ b/pkg/targetallocator/deployment.go @@ -30,8 +30,6 @@ func Deployment(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTele labels := Labels(otelcol) labels["app.kubernetes.io/name"] = naming.TargetAllocator(otelcol) - var replicas int32 = 1 - return appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: naming.TargetAllocator(otelcol), @@ -39,7 +37,7 @@ func Deployment(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTele Labels: labels, }, Spec: appsv1.DeploymentSpec{ - Replicas: &replicas, + Replicas: otelcol.Spec.TargetAllocator.Replicas, Selector: &metav1.LabelSelector{ MatchLabels: labels, }, From 15d2caba191790f0bea575dfc730cc8a039c2a9a Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 14 Sep 2022 10:50:25 -0400 Subject: [PATCH 4/9] Fix linting --- apis/v1alpha1/opentelemetrycollector_types.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 0239730268..d8f6d1a62f 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -108,6 +108,10 @@ type OpenTelemetryCollectorSpec struct { // OpenTelemetryTargetAllocator defines the configurations for the Prometheus target allocator. type OpenTelemetryTargetAllocator struct { + // Replicas is the number of pod instances for the underlying TargetAllocator, this can only be set to values other + // than 1 if a strategy that allows for high availability is chosen. + // +optional + Replicas *int32 `json:"replicas,omitempty"` // AllocationStrategy determines which strategy the target allocator should use for allocation AllocationStrategy string `json:"allocationStrategy,omitempty"` // ServiceAccount indicates the name of an existing service account to use with this instance. @@ -123,10 +127,6 @@ type OpenTelemetryTargetAllocator struct { // All CR instances which the ServiceAccount has access to will be retrieved. This includes other namespaces. // +optional PrometheusCR OpenTelemetryTargetAllocatorPrometheusCR `json:"prometheusCR,omitempty"` - // Replicas is the number of pod instances for the underlying TargetAllocator, this can only be set to values other - // than 1 if a strategy that allows for high availability is chosen. - // +optional - Replicas *int32 `json:"replicas,omitempty"` } type OpenTelemetryTargetAllocatorPrometheusCR struct { From d8be4273e13b4a92e50dd95d26e50e1920869c48 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 14 Sep 2022 11:18:57 -0400 Subject: [PATCH 5/9] Fix test and add a new one --- pkg/collector/reconcile/deployment_test.go | 59 +++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/pkg/collector/reconcile/deployment_test.go b/pkg/collector/reconcile/deployment_test.go index 3867900754..4db2319790 100644 --- a/pkg/collector/reconcile/deployment_test.go +++ b/pkg/collector/reconcile/deployment_test.go @@ -127,6 +127,7 @@ func TestExpectedDeployments(t *testing.T) { t.Run("should not update target allocator deployment replicas when collector max replicas is set", func(t *testing.T) { replicas, maxReplicas := int32(2), int32(10) + oneReplica := int32(1) param := Params{ Client: k8sClient, Instance: v1alpha1.OpenTelemetryCollector{ @@ -144,7 +145,8 @@ func TestExpectedDeployments(t *testing.T) { Replicas: &replicas, Mode: v1alpha1.ModeStatefulSet, TargetAllocator: v1alpha1.OpenTelemetryTargetAllocator{ - Enabled: true, + Enabled: true, + Replicas: &oneReplica, }, Config: ` receivers: @@ -177,6 +179,61 @@ func TestExpectedDeployments(t *testing.T) { assert.Equal(t, *allocator.Spec.Replicas, int32(1)) }) + t.Run("should update target allocator deployment replicas when changed", func(t *testing.T) { + initialReplicas, finalReplicas := int32(1), int32(2) + param := Params{ + Client: k8sClient, + Instance: v1alpha1.OpenTelemetryCollector{ + TypeMeta: metav1.TypeMeta{ + Kind: "opentelemetry.io", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + UID: instanceUID, + }, + Spec: v1alpha1.OpenTelemetryCollectorSpec{ + Replicas: &initialReplicas, + Mode: v1alpha1.ModeStatefulSet, + TargetAllocator: v1alpha1.OpenTelemetryTargetAllocator{ + Enabled: true, + Replicas: &initialReplicas, + }, + Config: ` + receivers: + jaeger: + protocols: + grpc: + processors: + + exporters: + logging: + + service: + pipelines: + traces: + receivers: [jaeger] + processors: [] + exporters: [logging] + + `, + }, + }, + Scheme: testScheme, + Log: logger, + } + expected := []v1.Deployment{} + allocator := targetallocator.Deployment(param.Config, param.Log, param.Instance) + expected = append(expected, allocator) + + assert.Len(t, expected, 1) + assert.Equal(t, *allocator.Spec.Replicas, int32(1)) + param.Instance.Spec.TargetAllocator.Replicas = &finalReplicas + finalAllocator := targetallocator.Deployment(param.Config, param.Log, param.Instance) + assert.Equal(t, *finalAllocator.Spec.Replicas, int32(2)) + }) + t.Run("should update deployment", func(t *testing.T) { createObjectIfNotExists(t, "test-collector", &expectedDeploy) err := expectedDeployments(context.Background(), param, []v1.Deployment{expectedDeploy}) From ed23ab3fc01bf69e67f0b7f1ed9b618eecb054b6 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Wed, 14 Sep 2022 11:48:57 -0400 Subject: [PATCH 6/9] Rename var to rerun tests --- pkg/collector/reconcile/deployment_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/collector/reconcile/deployment_test.go b/pkg/collector/reconcile/deployment_test.go index 4db2319790..d5d794d889 100644 --- a/pkg/collector/reconcile/deployment_test.go +++ b/pkg/collector/reconcile/deployment_test.go @@ -180,7 +180,7 @@ func TestExpectedDeployments(t *testing.T) { }) t.Run("should update target allocator deployment replicas when changed", func(t *testing.T) { - initialReplicas, finalReplicas := int32(1), int32(2) + initialReplicas, nextReplicas := int32(1), int32(2) param := Params{ Client: k8sClient, Instance: v1alpha1.OpenTelemetryCollector{ @@ -229,7 +229,7 @@ func TestExpectedDeployments(t *testing.T) { assert.Len(t, expected, 1) assert.Equal(t, *allocator.Spec.Replicas, int32(1)) - param.Instance.Spec.TargetAllocator.Replicas = &finalReplicas + param.Instance.Spec.TargetAllocator.Replicas = &nextReplicas finalAllocator := targetallocator.Deployment(param.Config, param.Log, param.Instance) assert.Equal(t, *finalAllocator.Spec.Replicas, int32(2)) }) From abca69e16a921bb3b4f3310a4d429ac64b78a405 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 15 Sep 2022 11:27:48 -0400 Subject: [PATCH 7/9] Comments and things --- apis/v1alpha1/opentelemetrycollector_types.go | 7 +++++-- apis/v1alpha1/zz_generated.deepcopy.go | 2 +- .../opentelemetry.io_opentelemetrycollectors.yaml | 10 +++++++--- cmd/otel-allocator/allocation/consistent_hashing.go | 8 +++++--- .../allocation/consistent_hashing_test.go | 3 ++- cmd/otel-allocator/allocation/http_test.go | 4 ++-- cmd/otel-allocator/allocation/strategy.go | 2 +- .../opentelemetry.io_opentelemetrycollectors.yaml | 10 +++++++--- docs/api.md | 4 ++-- 9 files changed, 32 insertions(+), 18 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index d8f6d1a62f..adc644e04e 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -108,11 +108,14 @@ type OpenTelemetryCollectorSpec struct { // OpenTelemetryTargetAllocator defines the configurations for the Prometheus target allocator. type OpenTelemetryTargetAllocator struct { - // Replicas is the number of pod instances for the underlying TargetAllocator, this can only be set to values other - // than 1 if a strategy that allows for high availability is chosen. + // Replicas is the number of pod instances for the underlying TargetAllocator, this should only be set to a value + // other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy + // that can be run in a high availability mode is consistent-hashing. // +optional Replicas *int32 `json:"replicas,omitempty"` // AllocationStrategy determines which strategy the target allocator should use for allocation + // The current options are least-weighted and consistent-hashing. The default option is least-weighted + // +optional AllocationStrategy string `json:"allocationStrategy,omitempty"` // ServiceAccount indicates the name of an existing service account to use with this instance. // +optional diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index d1c24913ff..9d7def8fa7 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -20,7 +20,7 @@ package v1alpha1 import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index feb10100db..55feba7458 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -685,7 +685,9 @@ spec: properties: allocationStrategy: description: AllocationStrategy determines which strategy the - target allocator should use for allocation + target allocator should use for allocation The current options + are least-weighted and consistent-hashing. The default option + is least-weighted type: string enabled: description: Enabled indicates whether to use a target allocation @@ -709,8 +711,10 @@ spec: type: object replicas: description: Replicas is the number of pod instances for the underlying - TargetAllocator, this can only be set to values other than 1 - if a strategy that allows for high availability is chosen. + TargetAllocator, this should only be set to a value other than + 1 if a strategy that allows for high availability is chosen. + Currently, the only allocation strategy that can be run in a + high availability mode is consistent-hashing. format: int32 type: integer serviceAccount: diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index b375b70026..31a78e35ab 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -2,13 +2,15 @@ package allocation import ( "fmt" + "net/url" + "sync" + "github.com/buraksezer/consistent" "github.com/cespare/xxhash/v2" "github.com/go-logr/logr" - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" "github.com/prometheus/client_golang/prometheus" - "net/url" - "sync" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" ) var _ Allocator = &consistentHashingAllocator{} diff --git a/cmd/otel-allocator/allocation/consistent_hashing_test.go b/cmd/otel-allocator/allocation/consistent_hashing_test.go index 1f38d893cf..8793af53be 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing_test.go +++ b/cmd/otel-allocator/allocation/consistent_hashing_test.go @@ -1,8 +1,9 @@ package allocation import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestCanSetSingleTarget(t *testing.T) { diff --git a/cmd/otel-allocator/allocation/http_test.go b/cmd/otel-allocator/allocation/http_test.go index 6e2241654f..c54a7a4dcc 100644 --- a/cmd/otel-allocator/allocation/http_test.go +++ b/cmd/otel-allocator/allocation/http_test.go @@ -159,7 +159,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { "test-label": "test-value", "foo": "bar", }, - TargetURL: "test-url", + TargetURL: "test-url", CollectorName: "test-collector", }, TargetItem{ @@ -167,7 +167,7 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) { Label: model.LabelSet{ "test-label": "test-value", }, - TargetURL: "test-url", + TargetURL: "test-url", CollectorName: "test-collector", }, }, diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 3732546e0e..949a397fd4 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -3,9 +3,9 @@ package allocation import ( "errors" "fmt" - "github.com/buraksezer/consistent" "net/url" + "github.com/buraksezer/consistent" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index 82fc80f7bd..6d16cf8e58 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -683,7 +683,9 @@ spec: properties: allocationStrategy: description: AllocationStrategy determines which strategy the - target allocator should use for allocation + target allocator should use for allocation The current options + are least-weighted and consistent-hashing. The default option + is least-weighted type: string enabled: description: Enabled indicates whether to use a target allocation @@ -707,8 +709,10 @@ spec: type: object replicas: description: Replicas is the number of pod instances for the underlying - TargetAllocator, this can only be set to values other than 1 - if a strategy that allows for high availability is chosen. + TargetAllocator, this should only be set to a value other than + 1 if a strategy that allows for high availability is chosen. + Currently, the only allocation strategy that can be run in a + high availability mode is consistent-hashing. format: int32 type: integer serviceAccount: diff --git a/docs/api.md b/docs/api.md index 9f7c7d886c..4f44cdaa64 100644 --- a/docs/api.md +++ b/docs/api.md @@ -2872,7 +2872,7 @@ TargetAllocator indicates a value which determines whether to spawn a target all allocationStrategy string - AllocationStrategy determines which strategy the target allocator should use for allocation
+ AllocationStrategy determines which strategy the target allocator should use for allocation The current options are least-weighted and consistent-hashing. The default option is least-weighted
false @@ -2900,7 +2900,7 @@ TargetAllocator indicates a value which determines whether to spawn a target all replicas integer - Replicas is the number of pod instances for the underlying TargetAllocator, this can only be set to values other than 1 if a strategy that allows for high availability is chosen.
+ Replicas is the number of pod instances for the underlying TargetAllocator, this should only be set to a value other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy that can be run in a high availability mode is consistent-hashing.

Format: int32
From 19ed4941c70ce9da9d54870f1c329f902c2cd457 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 15 Sep 2022 13:53:00 -0400 Subject: [PATCH 8/9] Grammar --- apis/v1alpha1/opentelemetrycollector_types.go | 4 ++-- .../manifests/opentelemetry.io_opentelemetrycollectors.yaml | 4 ++-- .../crd/bases/opentelemetry.io_opentelemetrycollectors.yaml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index aaa9054b7d..1c9382fe21 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -116,12 +116,12 @@ type OpenTelemetryCollectorSpec struct { // OpenTelemetryTargetAllocator defines the configurations for the Prometheus target allocator. type OpenTelemetryTargetAllocator struct { - // Replicas is the number of pod instances for the underlying TargetAllocator, this should only be set to a value + // Replicas is the number of pod instances for the underlying TargetAllocator. This should only be set to a value // other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy // that can be run in a high availability mode is consistent-hashing. // +optional Replicas *int32 `json:"replicas,omitempty"` - // AllocationStrategy determines which strategy the target allocator should use for allocation + // AllocationStrategy determines which strategy the target allocator should use for allocation. // The current options are least-weighted and consistent-hashing. The default option is least-weighted // +optional AllocationStrategy string `json:"allocationStrategy,omitempty"` diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index f7d10eadfc..1572235e13 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -809,7 +809,7 @@ spec: properties: allocationStrategy: description: AllocationStrategy determines which strategy the - target allocator should use for allocation The current options + target allocator should use for allocation. The current options are least-weighted and consistent-hashing. The default option is least-weighted type: string @@ -835,7 +835,7 @@ spec: type: object replicas: description: Replicas is the number of pod instances for the underlying - TargetAllocator, this should only be set to a value other than + TargetAllocator. This should only be set to a value other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy that can be run in a high availability mode is consistent-hashing. diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index e2ac769a29..27e6e0180e 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -807,7 +807,7 @@ spec: properties: allocationStrategy: description: AllocationStrategy determines which strategy the - target allocator should use for allocation The current options + target allocator should use for allocation. The current options are least-weighted and consistent-hashing. The default option is least-weighted type: string @@ -833,7 +833,7 @@ spec: type: object replicas: description: Replicas is the number of pod instances for the underlying - TargetAllocator, this should only be set to a value other than + TargetAllocator. This should only be set to a value other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy that can be run in a high availability mode is consistent-hashing. From df0794e526a8beb6dd216bce74cda3939b591aa0 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 15 Sep 2022 14:37:42 -0400 Subject: [PATCH 9/9] docs :facepalm: --- docs/api.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/api.md b/docs/api.md index 1ceb75ba34..dac36db06f 100644 --- a/docs/api.md +++ b/docs/api.md @@ -3116,7 +3116,7 @@ TargetAllocator indicates a value which determines whether to spawn a target all allocationStrategy string - AllocationStrategy determines which strategy the target allocator should use for allocation The current options are least-weighted and consistent-hashing. The default option is least-weighted
+ AllocationStrategy determines which strategy the target allocator should use for allocation. The current options are least-weighted and consistent-hashing. The default option is least-weighted
false @@ -3144,7 +3144,7 @@ TargetAllocator indicates a value which determines whether to spawn a target all replicas integer - Replicas is the number of pod instances for the underlying TargetAllocator, this should only be set to a value other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy that can be run in a high availability mode is consistent-hashing.
+ Replicas is the number of pod instances for the underlying TargetAllocator. This should only be set to a value other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy that can be run in a high availability mode is consistent-hashing.

Format: int32