Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added consistent hashing strategy #1087

Merged
merged 12 commits into from
Sep 16, 2022
189 changes: 189 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package allocation

import (
"fmt"
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash/v2"
"github.com/go-logr/logr"
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/prometheus/client_golang/prometheus"
"net/url"
"sync"
)

var _ Allocator = &consistentHashingAllocator{}

const consistentHashingStrategyName = "consistent-hashing"

type hasher struct{}

func (h hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}

type consistentHashingAllocator struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this code looks very similar to the least weighted strategy. It may be worth investigating in the future if we can combine these as I tried to do in #1068

// m protects Consistent for concurrent use.
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
m sync.RWMutex

hasher *consistent.Consistent

// collectors is a map from a Collector's name to a Collector instance
collectors map[string]*Collector

// targetItems is a map from a target item's hash to the target items allocated state
targetItems map[string]*TargetItem

log logr.Logger
}

func newConsistentHashingAllocator(log logr.Logger) Allocator {
config := consistent.Config{
PartitionCount: 1061,
ReplicationFactor: 5,
Load: 1.1,
Comment on lines +43 to +45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these configuration options be exposed to the operator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure... I think i'm going to open a follow up issue to make these configurable as it would require some refactoring of how configuration is passed down to the allocation strategies as right now it's only the string. Is that alright?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fine. I think using the string for configuration might be fine, as long as it's well specified. Something like URL query params style might work:

allocationStrategy: "consistentHashing?load=1.5&partitionCount=3000"

That would make it a straightforward addition while leaving plenty of flexibility. Don't need to solve now though, can figure it out on the follow up issue.

Hasher: hasher{},
Comment on lines +43 to +46
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should this be configurable

}
consistentHasher := consistent.New(nil, config)
return &consistentHashingAllocator{
hasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*TargetItem),
log: log,
}
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap
// INVARIANT: c.collectors must have at least 1 collector set
func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) {
colOwner := c.hasher.LocateKey([]byte(target.Hash()))
targetItem := &TargetItem{
JobName: target.JobName,
Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
CollectorName: colOwner.String(),
}
c.targetItems[targetItem.Hash()] = targetItem
c.collectors[colOwner.String()].NumTargets++
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
}

// handleTargets receives the new and removed targets and reconciles the current state.
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector
// Any net-new additions are assigned to the next available collector
func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem]) {
// Check for removals
for k, target := range c.targetItems {
// if the current target is in the removals list
if _, ok := diff.Removals()[k]; ok {
col := c.collectors[target.CollectorName]
col.NumTargets--
delete(c.targetItems, k)
TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets))
}
}

// Check for additions
for k, target := range diff.Additions() {
// Do nothing if the item is already there
if _, ok := c.targetItems[k]; ok {
continue
} else {
Comment on lines +97 to +100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? addTargetToTargetItems() has provision for when the new target has an existing collector attached to it, which I think would only happen when the target existed previously as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is still necessary. the provision in addTargetToTargetItems() is for when we reassign a collector after a new collector is added which shifts the amount of members in the hashring. In this case, we are iterating through the newly added targets and seeing if the target has already been added. Because we're using consistent hashing, the target would get unassigned and then reassigned to the same collector which seems like unnecessary work.

// Add target to target pool and assign a collector
c.addTargetToTargetItems(target)
}
}
}

// handleCollectors receives the new and removed collectors and reconciles the current state.
// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map
// Finally, any targets of removed collectors are reallocated to the next available collector.
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collector]) {
// Clear removed collectors
for _, k := range diff.Removals() {
delete(c.collectors, k.Name)
c.hasher.Remove(k.Name)
TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0)
}
// Insert the new collectors
for _, i := range diff.Additions() {
c.collectors[i.Name] = NewCollector(i.Name)
c.hasher.Add(c.collectors[i.Name])
}

// Re-Allocate all targets
for _, item := range c.targetItems {
c.addTargetToTargetItems(item)
}
}

// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (c *consistentHashingAllocator) SetTargets(targets map[string]*TargetItem) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()

c.m.Lock()
defer c.m.Unlock()

if len(c.collectors) == 0 {
c.log.Info("No collector instances present, cannot set targets")
return
}
// Check for target changes
targetsDiff := diff.Maps(c.targetItems, targets)
// If there are any additions or removals
if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 {
c.handleTargets(targetsDiff)
}
return
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) {
log := c.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName))
defer timer.ObserveDuration()

CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
return
}

c.m.Lock()
defer c.m.Unlock()

// Check for collector changes
collectorsDiff := diff.Maps(c.collectors, collectors)
if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 {
c.handleCollectors(collectorsDiff)
}
return
}

// TargetItems returns a shallow copy of the targetItems map.
func (c *consistentHashingAllocator) TargetItems() map[string]*TargetItem {
c.m.RLock()
defer c.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
for k, v := range c.targetItems {
targetItemsCopy[k] = v
}
return targetItemsCopy
}

// Collectors returns a shallow copy of the collectors map.
func (c *consistentHashingAllocator) Collectors() map[string]*Collector {
c.m.RLock()
defer c.m.RUnlock()
collectorsCopy := make(map[string]*Collector)
for k, v := range c.collectors {
collectorsCopy[k] = v
}
return collectorsCopy
}
91 changes: 91 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package allocation

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestCanSetSingleTarget(t *testing.T) {
cols := makeNCollectors(3, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(1, 3, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, 1)
for _, item := range actualTargetItems {
assert.Equal(t, "collector-2", item.CollectorName)
}
}

func TestRelativelyEvenDistribution(t *testing.T) {
numCols := 15
numItems := 10000
cols := makeNCollectors(numCols, 0, 0)
var expectedPerCollector = float64(numItems / numCols)
expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(numItems, numCols, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, numItems)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numCols)
for _, col := range actualCollectors {
t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets)
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta)
}
}

func TestFullReallocation(t *testing.T) {
cols := makeNCollectors(10, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(10000, 10, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, 10000)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, 10)
newCols := makeNCollectors(10, 0, 10)
c.SetCollectors(newCols)
updatedTargetItems := c.TargetItems()
assert.Len(t, updatedTargetItems, 10000)
updatedCollectors := c.Collectors()
assert.Len(t, updatedCollectors, 10)
for _, item := range updatedTargetItems {
_, ok := updatedCollectors[item.CollectorName]
assert.True(t, ok, "Some items weren't reallocated correctly")
}
}

func TestNumRemapped(t *testing.T) {
numItems := 10_000
numInitialCols := 15
numFinalCols := 16
expectedDelta := float64((numFinalCols - numInitialCols) * (numItems / numFinalCols))
cols := makeNCollectors(numInitialCols, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(numItems, numInitialCols, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, numItems)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numInitialCols)
newCols := makeNCollectors(numFinalCols, 0, 0)
c.SetCollectors(newCols)
updatedTargetItems := c.TargetItems()
assert.Len(t, updatedTargetItems, numItems)
updatedCollectors := c.Collectors()
assert.Len(t, updatedCollectors, numFinalCols)
countRemapped := 0
countNotRemapped := 0
for _, item := range updatedTargetItems {
previousItem, ok := actualTargetItems[item.Hash()]
assert.True(t, ok)
if previousItem.CollectorName != item.CollectorName {
countRemapped++
} else {
countNotRemapped++
}
}
assert.InDelta(t, numItems/numFinalCols, countRemapped, expectedDelta)
}
23 changes: 8 additions & 15 deletions cmd/otel-allocator/allocation/least_weighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var _ Allocator = &leastWeightedAllocator{}

const strategyName = "least-weighted"
const leastWeightedStrategyName = "least-weighted"

/*
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets
Expand Down Expand Up @@ -91,7 +91,7 @@ func (allocator *leastWeightedAllocator) addTargetToTargetItems(target *TargetIt
}
allocator.targetItems[targetItem.Hash()] = targetItem
chosenCollector.NumTargets++
TargetsPerCollector.WithLabelValues(chosenCollector.Name, strategyName).Set(float64(chosenCollector.NumTargets))
TargetsPerCollector.WithLabelValues(chosenCollector.Name, leastWeightedStrategyName).Set(float64(chosenCollector.NumTargets))
}

// handleTargets receives the new and removed targets and reconciles the current state.
Expand All @@ -105,7 +105,7 @@ func (allocator *leastWeightedAllocator) handleTargets(diff diff.Changes[*Target
c := allocator.collectors[target.CollectorName]
c.NumTargets--
delete(allocator.targetItems, k)
TargetsPerCollector.WithLabelValues(target.CollectorName, strategyName).Set(float64(c.NumTargets))
TargetsPerCollector.WithLabelValues(target.CollectorName, leastWeightedStrategyName).Set(float64(c.NumTargets))
}
}

Expand All @@ -128,7 +128,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
// Clear removed collectors
for _, k := range diff.Removals() {
delete(allocator.collectors, k.Name)
TargetsPerCollector.WithLabelValues(k.Name, strategyName).Set(0)
TargetsPerCollector.WithLabelValues(k.Name, leastWeightedStrategyName).Set(0)
}
// Insert the new collectors
for _, i := range diff.Additions() {
Expand All @@ -147,7 +147,7 @@ func (allocator *leastWeightedAllocator) handleCollectors(diff diff.Changes[*Col
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetItem) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", strategyName))
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", leastWeightedStrategyName))
defer timer.ObserveDuration()

allocator.m.Lock()
Expand All @@ -170,10 +170,10 @@ func (allocator *leastWeightedAllocator) SetTargets(targets map[string]*TargetIt
// This method is called when Collectors are added or removed.
func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Collector) {
log := allocator.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", strategyName))
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", leastWeightedStrategyName))
defer timer.ObserveDuration()

CollectorsAllocatable.WithLabelValues(strategyName).Set(float64(len(collectors)))
CollectorsAllocatable.WithLabelValues(leastWeightedStrategyName).Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
return
Expand All @@ -190,17 +190,10 @@ func (allocator *leastWeightedAllocator) SetCollectors(collectors map[string]*Co
return
}

func NewAllocator(log logr.Logger) Allocator {
func newLeastWeightedAllocator(log logr.Logger) Allocator {
return &leastWeightedAllocator{
log: log,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*TargetItem),
}
}

func init() {
err := Register(strategyName, NewAllocator)
if err != nil {
panic(err)
}
}
Loading