diff --git a/.chloggen/add_fallback_strategy_for_per_node_strategy.yaml b/.chloggen/add_fallback_strategy_for_per_node_strategy.yaml new file mode 100755 index 0000000000..ba864fb365 --- /dev/null +++ b/.chloggen/add_fallback_strategy_for_per_node_strategy.yaml @@ -0,0 +1,21 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: target allocator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added allocation_fallback_strategy option as fallback strategy for per-node allocation strategy, can be enabled with feature flag operator.targetallocator.fallbackstrategy + +# One or more tracking issues related to the change +issues: [3477] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + If using per-node allocation strategy, targets that are not attached to a node will not + be allocated. As the per-node strategy is required when running as a daemonset, it is + not possible to assign some targets under a daemonset deployment. + Feature flag operator.targetallocator.fallbackstrategy has been added and results in consistent-hashing + being used as the fallback allocation strategy for "per-node" only at this time. diff --git a/cmd/otel-allocator/allocation/allocator.go b/cmd/otel-allocator/allocation/allocator.go index cbe5d1d31d..b0a9125ba9 100644 --- a/cmd/otel-allocator/allocation/allocator.go +++ b/cmd/otel-allocator/allocation/allocator.go @@ -76,6 +76,11 @@ func (a *allocator) SetFilter(filter Filter) { a.filter = filter } +// SetFallbackStrategy sets the fallback strategy to use. +func (a *allocator) SetFallbackStrategy(strategy Strategy) { + a.strategy.SetFallbackStrategy(strategy) +} + // SetTargets accepts a list of targets that will be used to make // load balancing decisions. This method should be called when there are // new targets discovered or existing targets are shutdown. diff --git a/cmd/otel-allocator/allocation/consistent_hashing.go b/cmd/otel-allocator/allocation/consistent_hashing.go index 9659b73a0d..c8a16903bc 100644 --- a/cmd/otel-allocator/allocation/consistent_hashing.go +++ b/cmd/otel-allocator/allocation/consistent_hashing.go @@ -83,3 +83,5 @@ func (s *consistentHashingStrategy) SetCollectors(collectors map[string]*Collect s.consistentHasher = consistent.New(members, s.config) } + +func (s *consistentHashingStrategy) SetFallbackStrategy(fallbackStrategy Strategy) {} diff --git a/cmd/otel-allocator/allocation/least_weighted.go b/cmd/otel-allocator/allocation/least_weighted.go index caa2febbd9..49d935715d 100644 --- a/cmd/otel-allocator/allocation/least_weighted.go +++ b/cmd/otel-allocator/allocation/least_weighted.go @@ -54,3 +54,5 @@ func (s *leastWeightedStrategy) GetCollectorForTarget(collectors map[string]*Col } func (s *leastWeightedStrategy) SetCollectors(_ map[string]*Collector) {} + +func (s *leastWeightedStrategy) SetFallbackStrategy(fallbackStrategy Strategy) {} diff --git a/cmd/otel-allocator/allocation/per_node.go b/cmd/otel-allocator/allocation/per_node.go index a5e2bfa3f8..3d9c76d90d 100644 --- a/cmd/otel-allocator/allocation/per_node.go +++ b/cmd/otel-allocator/allocation/per_node.go @@ -25,21 +25,31 @@ const perNodeStrategyName = "per-node" var _ Strategy = &perNodeStrategy{} type perNodeStrategy struct { - collectorByNode map[string]*Collector + collectorByNode map[string]*Collector + fallbackStrategy Strategy } func newPerNodeStrategy() Strategy { return &perNodeStrategy{ - collectorByNode: make(map[string]*Collector), + collectorByNode: make(map[string]*Collector), + fallbackStrategy: nil, } } +func (s *perNodeStrategy) SetFallbackStrategy(fallbackStrategy Strategy) { + s.fallbackStrategy = fallbackStrategy +} + func (s *perNodeStrategy) GetName() string { return perNodeStrategyName } func (s *perNodeStrategy) GetCollectorForTarget(collectors map[string]*Collector, item *target.Item) (*Collector, error) { targetNodeName := item.GetNodeName() + if targetNodeName == "" && s.fallbackStrategy != nil { + return s.fallbackStrategy.GetCollectorForTarget(collectors, item) + } + collector, ok := s.collectorByNode[targetNodeName] if !ok { return nil, fmt.Errorf("could not find collector for node %s", targetNodeName) @@ -54,4 +64,8 @@ func (s *perNodeStrategy) SetCollectors(collectors map[string]*Collector) { s.collectorByNode[collector.NodeName] = collector } } + + if s.fallbackStrategy != nil { + s.fallbackStrategy.SetCollectors(collectors) + } } diff --git a/cmd/otel-allocator/allocation/per_node_test.go b/cmd/otel-allocator/allocation/per_node_test.go index ebbe3f31e6..4d17e6bbb3 100644 --- a/cmd/otel-allocator/allocation/per_node_test.go +++ b/cmd/otel-allocator/allocation/per_node_test.go @@ -26,7 +26,17 @@ import ( var loggerPerNode = logf.Log.WithName("unit-tests") -// Tests that two targets with the same target url and job name but different label set are both added. +func GetTargetsWithNodeName(targets []*target.Item) (targetsWithNodeName []*target.Item) { + for _, item := range targets { + if item.GetNodeName() != "" { + targetsWithNodeName = append(targetsWithNodeName, item) + } + } + return targetsWithNodeName +} + +// Tests that four targets, with one of them lacking node labels, are assigned except for the +// target that lacks node labels. func TestAllocationPerNode(t *testing.T) { // prepare allocator with initial targets and collectors s, _ := New("per-node", loggerPerNode) @@ -93,6 +103,77 @@ func TestAllocationPerNode(t *testing.T) { } } +// Tests that four targets, with one of them missing node labels, are all assigned. +func TestAllocationPerNodeUsingFallback(t *testing.T) { + // prepare allocator with initial targets and collectors + s, _ := New("per-node", loggerPerNode, WithFallbackStrategy(consistentHashingStrategyName)) + + cols := MakeNCollectors(4, 0) + s.SetCollectors(cols) + firstLabels := labels.Labels{ + {Name: "test", Value: "test1"}, + {Name: "__meta_kubernetes_pod_node_name", Value: "node-0"}, + } + secondLabels := labels.Labels{ + {Name: "test", Value: "test2"}, + {Name: "__meta_kubernetes_node_name", Value: "node-1"}, + } + // no label, should be allocated by the fallback strategy + thirdLabels := labels.Labels{ + {Name: "test", Value: "test3"}, + } + // endpointslice target kind and name + fourthLabels := labels.Labels{ + {Name: "test", Value: "test4"}, + {Name: "__meta_kubernetes_endpointslice_address_target_kind", Value: "Node"}, + {Name: "__meta_kubernetes_endpointslice_address_target_name", Value: "node-3"}, + } + + firstTarget := target.NewItem("sample-name", "0.0.0.0:8000", firstLabels, "") + secondTarget := target.NewItem("sample-name", "0.0.0.0:8000", secondLabels, "") + thirdTarget := target.NewItem("sample-name", "0.0.0.0:8000", thirdLabels, "") + fourthTarget := target.NewItem("sample-name", "0.0.0.0:8000", fourthLabels, "") + + targetList := map[string]*target.Item{ + firstTarget.Hash(): firstTarget, + secondTarget.Hash(): secondTarget, + thirdTarget.Hash(): thirdTarget, + fourthTarget.Hash(): fourthTarget, + } + + // test that targets and collectors are added properly + s.SetTargets(targetList) + + // verify length + actualItems := s.TargetItems() + + // all targets should be allocated + expectedTargetLen := len(targetList) + assert.Len(t, actualItems, expectedTargetLen) + + // verify allocation to nodes + for targetHash, item := range targetList { + actualItem, found := actualItems[targetHash] + + assert.True(t, found, "target with hash %s not found", item.Hash()) + + itemsForCollector := s.GetTargetsForCollectorAndJob(actualItem.CollectorName, actualItem.JobName) + + // first two should be assigned one to each collector; if third target, it should be assigned + // according to the fallback strategy which may assign it to the otherwise empty collector or + // one of the others, depending on the strategy and collector loop order + if targetHash == thirdTarget.Hash() { + assert.Empty(t, item.GetNodeName()) + assert.NotZero(t, len(itemsForCollector)) + continue + } + + // Only check targets that have been assigned using the per-node (not fallback) strategy here + assert.Len(t, GetTargetsWithNodeName(itemsForCollector), 1) + assert.Equal(t, actualItem, GetTargetsWithNodeName(itemsForCollector)[0]) + } +} + func TestTargetsWithNoCollectorsPerNode(t *testing.T) { // prepare allocator with initial targets and collectors c, _ := New("per-node", loggerPerNode) diff --git a/cmd/otel-allocator/allocation/strategy.go b/cmd/otel-allocator/allocation/strategy.go index 29ae7fd99a..47fafd5662 100644 --- a/cmd/otel-allocator/allocation/strategy.go +++ b/cmd/otel-allocator/allocation/strategy.go @@ -29,6 +29,8 @@ import ( type AllocatorProvider func(log logr.Logger, opts ...AllocationOption) Allocator var ( + strategies = map[string]Strategy{} + registry = map[string]AllocatorProvider{} // TargetsPerCollector records how many targets have been assigned to each collector. @@ -67,6 +69,16 @@ func WithFilter(filter Filter) AllocationOption { } } +func WithFallbackStrategy(fallbackStrategy string) AllocationOption { + var strategy, ok = strategies[fallbackStrategy] + if fallbackStrategy != "" && !ok { + panic(fmt.Errorf("unregistered strategy used as fallback: %s", fallbackStrategy)) + } + return func(allocator Allocator) { + allocator.SetFallbackStrategy(strategy) + } +} + func RecordTargetsKept(targets map[string]*target.Item) { targetsRemaining.Add(float64(len(targets))) } @@ -101,6 +113,7 @@ type Allocator interface { Collectors() map[string]*Collector GetTargetsForCollectorAndJob(collector string, job string) []*target.Item SetFilter(filter Filter) + SetFallbackStrategy(strategy Strategy) } type Strategy interface { @@ -110,6 +123,8 @@ type Strategy interface { // SetCollectors call. Strategies which don't need this information can just ignore it. SetCollectors(map[string]*Collector) GetName() string + // Add fallback strategy for strategies whose main allocation method can sometimes leave targets unassigned + SetFallbackStrategy(Strategy) } var _ consistent.Member = Collector{} @@ -136,22 +151,18 @@ func NewCollector(name, node string) *Collector { } func init() { - err := Register(leastWeightedStrategyName, func(log logr.Logger, opts ...AllocationOption) Allocator { - return newAllocator(log, newleastWeightedStrategy(), opts...) - }) - if err != nil { - panic(err) - } - err = Register(consistentHashingStrategyName, func(log logr.Logger, opts ...AllocationOption) Allocator { - return newAllocator(log, newConsistentHashingStrategy(), opts...) - }) - if err != nil { - panic(err) + strategies = map[string]Strategy{ + leastWeightedStrategyName: newleastWeightedStrategy(), + consistentHashingStrategyName: newConsistentHashingStrategy(), + perNodeStrategyName: newPerNodeStrategy(), } - err = Register(perNodeStrategyName, func(log logr.Logger, opts ...AllocationOption) Allocator { - return newAllocator(log, newPerNodeStrategy(), opts...) - }) - if err != nil { - panic(err) + + for strategyName, strategy := range strategies { + err := Register(strategyName, func(log logr.Logger, opts ...AllocationOption) Allocator { + return newAllocator(log, strategy, opts...) + }) + if err != nil { + panic(err) + } } } diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 946fba268f..ee55fe0a32 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -46,16 +46,17 @@ const ( ) type Config struct { - ListenAddr string `yaml:"listen_addr,omitempty"` - KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"` - ClusterConfig *rest.Config `yaml:"-"` - RootLogger logr.Logger `yaml:"-"` - CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"` - PromConfig *promconfig.Config `yaml:"config"` - AllocationStrategy string `yaml:"allocation_strategy,omitempty"` - FilterStrategy string `yaml:"filter_strategy,omitempty"` - PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"` - HTTPS HTTPSServerConfig `yaml:"https,omitempty"` + ListenAddr string `yaml:"listen_addr,omitempty"` + KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"` + ClusterConfig *rest.Config `yaml:"-"` + RootLogger logr.Logger `yaml:"-"` + CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"` + PromConfig *promconfig.Config `yaml:"config"` + AllocationStrategy string `yaml:"allocation_strategy,omitempty"` + AllocationFallbackStrategy string `yaml:"allocation_fallback_strategy,omitempty"` + FilterStrategy string `yaml:"filter_strategy,omitempty"` + PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"` + HTTPS HTTPSServerConfig `yaml:"https,omitempty"` } type PrometheusCRConfig struct { @@ -165,8 +166,9 @@ func unmarshal(cfg *Config, configFile string) error { func CreateDefaultConfig() Config { return Config{ - AllocationStrategy: DefaultAllocationStrategy, - FilterStrategy: DefaultFilterStrategy, + AllocationStrategy: DefaultAllocationStrategy, + AllocationFallbackStrategy: "", + FilterStrategy: DefaultFilterStrategy, PrometheusCR: PrometheusCRConfig{ ScrapeInterval: DefaultCRScrapeInterval, }, diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index f9531d6740..be2418902e 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -81,7 +81,13 @@ func main() { log := ctrl.Log.WithName("allocator") allocatorPrehook = prehook.New(cfg.FilterStrategy, log) - allocator, err = allocation.New(cfg.AllocationStrategy, log, allocation.WithFilter(allocatorPrehook)) + + var allocationOptions []allocation.AllocationOption + allocationOptions = append(allocationOptions, allocation.WithFilter(allocatorPrehook)) + if cfg.AllocationFallbackStrategy != "" { + allocationOptions = append(allocationOptions, allocation.WithFallbackStrategy(cfg.AllocationFallbackStrategy)) + } + allocator, err = allocation.New(cfg.AllocationStrategy, log, allocationOptions...) if err != nil { setupLog.Error(err, "Unable to initialize allocation strategy") os.Exit(1) diff --git a/cmd/otel-allocator/server/mocks_test.go b/cmd/otel-allocator/server/mocks_test.go index e44b178fa8..8620d70367 100644 --- a/cmd/otel-allocator/server/mocks_test.go +++ b/cmd/otel-allocator/server/mocks_test.go @@ -32,6 +32,7 @@ func (m *mockAllocator) SetTargets(_ map[string]*target.Item) func (m *mockAllocator) Collectors() map[string]*allocation.Collector { return nil } func (m *mockAllocator) GetTargetsForCollectorAndJob(_ string, _ string) []*target.Item { return nil } func (m *mockAllocator) SetFilter(_ allocation.Filter) {} +func (m *mockAllocator) SetFallbackStrategy(_ allocation.Strategy) {} func (m *mockAllocator) TargetItems() map[string]*target.Item { return m.targetItems diff --git a/internal/manifests/targetallocator/configmap.go b/internal/manifests/targetallocator/configmap.go index b17df29151..36defd088e 100644 --- a/internal/manifests/targetallocator/configmap.go +++ b/internal/manifests/targetallocator/configmap.go @@ -90,6 +90,11 @@ func ConfigMap(params Params) (*corev1.ConfigMap, error) { } else { taConfig["allocation_strategy"] = v1beta1.TargetAllocatorAllocationStrategyConsistentHashing } + + if featuregate.EnableTargetAllocatorFallbackStrategy.IsEnabled() { + taConfig["allocation_fallback_strategy"] = v1beta1.TargetAllocatorAllocationStrategyConsistentHashing + } + taConfig["filter_strategy"] = taSpec.FilterStrategy if taSpec.PrometheusCR.Enabled { diff --git a/internal/manifests/targetallocator/configmap_test.go b/internal/manifests/targetallocator/configmap_test.go index de863874db..967eef25e8 100644 --- a/internal/manifests/targetallocator/configmap_test.go +++ b/internal/manifests/targetallocator/configmap_test.go @@ -297,6 +297,63 @@ prometheus_cr: assert.Equal(t, expectedLabels, actual.Labels) assert.Equal(t, expectedData, actual.Data) }) + + t.Run("should return expected target allocator config map allocation fallback strategy", func(t *testing.T) { + expectedLabels["app.kubernetes.io/component"] = "opentelemetry-targetallocator" + expectedLabels["app.kubernetes.io/name"] = "my-instance-targetallocator" + + cfg := config.New(config.WithCertManagerAvailability(certmanager.Available)) + + flgs := featuregate.Flags(colfg.GlobalRegistry()) + err := flgs.Parse([]string{"--feature-gates=operator.targetallocator.fallbackstrategy"}) + require.NoError(t, err) + + testParams := Params{ + Collector: collector, + TargetAllocator: targetAllocator, + Config: cfg, + } + + expectedData := map[string]string{ + targetAllocatorFilename: `allocation_fallback_strategy: consistent-hashing +allocation_strategy: consistent-hashing +collector_selector: + matchlabels: + app.kubernetes.io/component: opentelemetry-collector + app.kubernetes.io/instance: default.my-instance + app.kubernetes.io/managed-by: opentelemetry-operator + app.kubernetes.io/part-of: opentelemetry + matchexpressions: [] +config: + scrape_configs: + - job_name: otel-collector + scrape_interval: 10s + static_configs: + - targets: + - 0.0.0.0:8888 + - 0.0.0.0:9999 +filter_strategy: relabel-config +https: + ca_file_path: /tls/ca.crt + enabled: true + listen_addr: :8443 + tls_cert_file_path: /tls/tls.crt + tls_key_file_path: /tls/tls.key +prometheus_cr: + enabled: true + pod_monitor_selector: null + scrape_interval: 30s + service_monitor_selector: null +`, + } + + actual, err := ConfigMap(testParams) + assert.NoError(t, err) + + assert.Equal(t, "my-instance-targetallocator", actual.Name) + assert.Equal(t, expectedLabels, actual.Labels) + assert.Equal(t, expectedData, actual.Data) + }) } func TestGetScrapeConfigsFromOtelConfig(t *testing.T) { diff --git a/pkg/featuregate/featuregate.go b/pkg/featuregate/featuregate.go index 03a6f8392a..e08b0fb0c3 100644 --- a/pkg/featuregate/featuregate.go +++ b/pkg/featuregate/featuregate.go @@ -67,6 +67,14 @@ var ( featuregate.WithRegisterDescription("enables mTLS between the target allocator and the collector"), featuregate.WithRegisterFromVersion("v0.111.0"), ) + // EnableTargetAllocatorFallbackStrategy is the feature gate that enables consistent-hashing as the fallback + // strategy for allocation strategies that might not assign all jobs (per-node). + EnableTargetAllocatorFallbackStrategy = featuregate.GlobalRegistry().MustRegister( + "operator.targetallocator.fallbackstrategy", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("enables fallback allocation strategy for the target allocator"), + featuregate.WithRegisterFromVersion("v0.114.0"), + ) // EnableConfigDefaulting is the feature gate that enables the operator to default the endpoint for known components. EnableConfigDefaulting = featuregate.GlobalRegistry().MustRegister( "operator.collector.default.config",