Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added consistent hashing strategy #1087

Merged
merged 12 commits into from
Sep 16, 2022
9 changes: 8 additions & 1 deletion apis/v1alpha1/opentelemetrycollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@ type OpenTelemetryCollectorSpec struct {

// OpenTelemetryTargetAllocator defines the configurations for the Prometheus target allocator.
type OpenTelemetryTargetAllocator struct {
// AllocationStrategy determines which strategy the target allocator should use for allocation
// Replicas is the number of pod instances for the underlying TargetAllocator. This should only be set to a value
// other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy
// that can be run in a high availability mode is consistent-hashing.
// +optional
Replicas *int32 `json:"replicas,omitempty"`
// AllocationStrategy determines which strategy the target allocator should use for allocation.
// The current options are least-weighted and consistent-hashing. The default option is least-weighted
// +optional
AllocationStrategy string `json:"allocationStrategy,omitempty"`
// ServiceAccount indicates the name of an existing service account to use with this instance.
// +optional
Expand Down
9 changes: 6 additions & 3 deletions apis/v1alpha1/opentelemetrycollector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ func (r *OpenTelemetryCollector) Default() {
r.Labels["app.kubernetes.io/managed-by"] = "opentelemetry-operator"
}

// We can default to one because dependent objects Deployment and HorizontalPodAutoScaler
// default to 1 as well.
one := int32(1)
if r.Spec.Replicas == nil {
// We can default to one because dependent objects Deployment and HorizontalPodAutoScaler
// default to 1 as well.
one := int32(1)
r.Spec.Replicas = &one
}
if r.Spec.TargetAllocator.Enabled && r.Spec.TargetAllocator.Replicas == nil {
r.Spec.TargetAllocator.Replicas = &one
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to share the pointer here? If these values are ever modified is it always by assigning a new pointer or is it ever possible that the underlying value will be changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to modify because this is just for the webhook. The object passed to the operator wouldn't have a spec with a shared pointer (this worked in my testing)

}
}

// +kubebuilder:webhook:verbs=create;update,path=/validate-opentelemetry-io-v1alpha1-opentelemetrycollector,mutating=false,failurePolicy=fail,groups=opentelemetry.io,resources=opentelemetrycollectors,versions=v1alpha1,name=vopentelemetrycollectorcreateupdate.kb.io,sideEffects=none,admissionReviewVersions=v1
Expand Down
7 changes: 6 additions & 1 deletion apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,9 @@ spec:
properties:
allocationStrategy:
description: AllocationStrategy determines which strategy the
target allocator should use for allocation
target allocator should use for allocation. The current options
are least-weighted and consistent-hashing. The default option
is least-weighted
type: string
enabled:
description: Enabled indicates whether to use a target allocation
Expand All @@ -831,6 +833,14 @@ spec:
custom resources as targets or not.
type: boolean
type: object
replicas:
description: Replicas is the number of pod instances for the underlying
TargetAllocator. This should only be set to a value other than
1 if a strategy that allows for high availability is chosen.
Currently, the only allocation strategy that can be run in a
high availability mode is consistent-hashing.
format: int32
type: integer
serviceAccount:
description: ServiceAccount indicates the name of an existing
service account to use with this instance.
Expand Down
196 changes: 196 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package allocation

import (
"fmt"
"net/url"
"sync"

"github.com/buraksezer/consistent"
"github.com/cespare/xxhash/v2"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
)

var _ Allocator = &consistentHashingAllocator{}

const consistentHashingStrategyName = "consistent-hashing"

type hasher struct{}

func (h hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}

type consistentHashingAllocator struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this code looks very similar to the least weighted strategy. It may be worth investigating in the future if we can combine these as I tried to do in #1068

// m protects consistentHasher, collectors and targetItems for concurrent use.
m sync.RWMutex

consistentHasher *consistent.Consistent

// collectors is a map from a Collector's name to a Collector instance
collectors map[string]*Collector

// targetItems is a map from a target item's hash to the target items allocated state
targetItems map[string]*TargetItem

log logr.Logger
}

func newConsistentHashingAllocator(log logr.Logger) Allocator {
config := consistent.Config{
PartitionCount: 1061,
ReplicationFactor: 5,
Load: 1.1,
Comment on lines +43 to +45
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these configuration options be exposed to the operator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure... I think i'm going to open a follow up issue to make these configurable as it would require some refactoring of how configuration is passed down to the allocation strategies as right now it's only the string. Is that alright?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fine. I think using the string for configuration might be fine, as long as it's well specified. Something like URL query params style might work:

allocationStrategy: "consistentHashing?load=1.5&partitionCount=3000"

That would make it a straightforward addition while leaving plenty of flexibility. Don't need to solve now though, can figure it out on the follow up issue.

Hasher: hasher{},
Comment on lines +43 to +46
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should this be configurable

}
consistentHasher := consistent.New(nil, config)
return &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*TargetItem),
log: log,
}
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap
// INVARIANT: c.collectors must have at least 1 collector set
func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) {
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets
if previousColName, ok := c.collectors[target.CollectorName]; ok {
previousColName.NumTargets--
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets))
}
colOwner := c.consistentHasher.LocateKey([]byte(target.Hash()))
targetItem := &TargetItem{
JobName: target.JobName,
Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
CollectorName: colOwner.String(),
}
c.targetItems[targetItem.Hash()] = targetItem
c.collectors[colOwner.String()].NumTargets++
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
}

// handleTargets receives the new and removed targets and reconciles the current state.
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector
// Any net-new additions are assigned to the next available collector
func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem]) {
// Check for removals
for k, target := range c.targetItems {
// if the current target is in the removals list
if _, ok := diff.Removals()[k]; ok {
col := c.collectors[target.CollectorName]
col.NumTargets--
delete(c.targetItems, k)
TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets))
}
}

// Check for additions
for k, target := range diff.Additions() {
// Do nothing if the item is already there
if _, ok := c.targetItems[k]; ok {
continue
} else {
Comment on lines +97 to +100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? addTargetToTargetItems() has provision for when the new target has an existing collector attached to it, which I think would only happen when the target existed previously as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is still necessary. the provision in addTargetToTargetItems() is for when we reassign a collector after a new collector is added which shifts the amount of members in the hashring. In this case, we are iterating through the newly added targets and seeing if the target has already been added. Because we're using consistent hashing, the target would get unassigned and then reassigned to the same collector which seems like unnecessary work.

// Add target to target pool and assign a collector
c.addTargetToTargetItems(target)
}
}
}

// handleCollectors receives the new and removed collectors and reconciles the current state.
// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map
// Finally, update all targets' collectors to match the consistent hashing.
func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collector]) {
// Clear removed collectors
for _, k := range diff.Removals() {
delete(c.collectors, k.Name)
c.consistentHasher.Remove(k.Name)
TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0)
}
// Insert the new collectors
for _, i := range diff.Additions() {
c.collectors[i.Name] = NewCollector(i.Name)
c.consistentHasher.Add(c.collectors[i.Name])
}

// Re-Allocate all targets
for _, item := range c.targetItems {
c.addTargetToTargetItems(item)
}
}

// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (c *consistentHashingAllocator) SetTargets(targets map[string]*TargetItem) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()

c.m.Lock()
defer c.m.Unlock()

if len(c.collectors) == 0 {
c.log.Info("No collector instances present, cannot set targets")
return
}
// Check for target changes
targetsDiff := diff.Maps(c.targetItems, targets)
// If there are any additions or removals
if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 {
c.handleTargets(targetsDiff)
}
return
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) {
log := c.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName))
defer timer.ObserveDuration()

CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
return
}

c.m.Lock()
defer c.m.Unlock()

// Check for collector changes
collectorsDiff := diff.Maps(c.collectors, collectors)
if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 {
c.handleCollectors(collectorsDiff)
}
return
}

// TargetItems returns a shallow copy of the targetItems map.
func (c *consistentHashingAllocator) TargetItems() map[string]*TargetItem {
c.m.RLock()
defer c.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
for k, v := range c.targetItems {
targetItemsCopy[k] = v
}
return targetItemsCopy
}

// Collectors returns a shallow copy of the collectors map.
func (c *consistentHashingAllocator) Collectors() map[string]*Collector {
c.m.RLock()
defer c.m.RUnlock()
collectorsCopy := make(map[string]*Collector)
for k, v := range c.collectors {
collectorsCopy[k] = v
}
return collectorsCopy
}
92 changes: 92 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package allocation

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCanSetSingleTarget(t *testing.T) {
cols := makeNCollectors(3, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(1, 3, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, 1)
for _, item := range actualTargetItems {
assert.Equal(t, "collector-2", item.CollectorName)
}
}

func TestRelativelyEvenDistribution(t *testing.T) {
numCols := 15
numItems := 10000
cols := makeNCollectors(numCols, 0, 0)
var expectedPerCollector = float64(numItems / numCols)
expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(numItems, 0, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, numItems)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numCols)
for _, col := range actualCollectors {
t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets)
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta)
}
}

func TestFullReallocation(t *testing.T) {
cols := makeNCollectors(10, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(10000, 10, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, 10000)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, 10)
newCols := makeNCollectors(10, 0, 10)
c.SetCollectors(newCols)
updatedTargetItems := c.TargetItems()
assert.Len(t, updatedTargetItems, 10000)
updatedCollectors := c.Collectors()
assert.Len(t, updatedCollectors, 10)
for _, item := range updatedTargetItems {
_, ok := updatedCollectors[item.CollectorName]
assert.True(t, ok, "Some items weren't reallocated correctly")
}
}

func TestNumRemapped(t *testing.T) {
numItems := 10_000
numInitialCols := 15
numFinalCols := 16
expectedDelta := float64((numFinalCols - numInitialCols) * (numItems / numFinalCols))
cols := makeNCollectors(numInitialCols, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(numItems, numInitialCols, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, numItems)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numInitialCols)
newCols := makeNCollectors(numFinalCols, 0, 0)
c.SetCollectors(newCols)
updatedTargetItems := c.TargetItems()
assert.Len(t, updatedTargetItems, numItems)
updatedCollectors := c.Collectors()
assert.Len(t, updatedCollectors, numFinalCols)
countRemapped := 0
countNotRemapped := 0
for _, item := range updatedTargetItems {
previousItem, ok := actualTargetItems[item.Hash()]
assert.True(t, ok)
if previousItem.CollectorName != item.CollectorName {
countRemapped++
} else {
countNotRemapped++
}
}
assert.InDelta(t, numItems/numFinalCols, countRemapped, expectedDelta)
}
4 changes: 2 additions & 2 deletions cmd/otel-allocator/allocation/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,15 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
"test-label": "test-value",
"foo": "bar",
},
TargetURL: "test-url",
TargetURL: "test-url",
CollectorName: "test-collector",
},
TargetItem{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
},
TargetURL: "test-url",
TargetURL: "test-url",
CollectorName: "test-collector",
},
},
Expand Down
Loading