From 964a652c4cce9f24361d05301351112e079297f8 Mon Sep 17 00:00:00 2001 From: Matt H Date: Thu, 12 Dec 2024 01:09:38 -0800 Subject: [PATCH] [exporter/loadbalancing] Add return_hostnames option to k8s resolver (#35411) **Description:** Adds an optional configuration option to the k8s resolver which allows for hostnames to be returned instead of IPs. This allows certain scenarios like using istio in sidecar mode. Requirements have been added to the documentation. **Link to tracking Issue:** #18412 **Testing:** Added corresponding hostname-based tests for adding backends/endpoints as well as deleting them. There were also tests missing for the k8s handler and so some tests were added as well there. Specifically failing if you want hostnames, but endpoints are returned that do not have hostnames. Aside from unit tests, also ran this in our lab cluster and deleted pods or performed rollouts to our statefulset. Somewhat tangential to the PR itself, but istio now reports mTLS traffic with zero workarounds required which was the motivation for the issue. **Documentation:** Added documentation explaining the new option and the requirements needed to use it. Also added an additional "important" note object specifically calling out that the k8s resolver needs RBAC to work. --- ...xporter-return-hostnames-k8s-resolver.yaml | 27 +++++ exporter/loadbalancingexporter/README.md | 3 + exporter/loadbalancingexporter/config.go | 7 +- .../loadbalancingexporter/loadbalancer.go | 1 + .../loadbalancingexporter/resolver_k8s.go | 24 ++-- .../resolver_k8s_handler.go | 57 ++++++++-- .../resolver_k8s_handler_test.go | 105 ++++++++++++++++++ .../resolver_k8s_test.go | 68 ++++++++++-- 8 files changed, 259 insertions(+), 33 deletions(-) create mode 100644 .chloggen/lbexporter-return-hostnames-k8s-resolver.yaml create mode 100644 exporter/loadbalancingexporter/resolver_k8s_handler_test.go diff --git a/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml b/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml new file mode 100644 index 000000000000..c8bd302960f2 --- /dev/null +++ b/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds a an optional configuration to the k8s resolver which returns hostnames instead of IPs for headless services pointing at statefulsets + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [18412] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/loadbalancingexporter/README.md b/exporter/loadbalancingexporter/README.md index 5df7812fb204..a6d5e8606d8e 100644 --- a/exporter/loadbalancingexporter/README.md +++ b/exporter/loadbalancingexporter/README.md @@ -96,6 +96,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th * `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace. * `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods. * `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used. + * `return_hostnames` will return hostnames instead of IPs. This is useful in certain situations like using istio in sidecar mode. To use this feature, the `service` must be a headless `Service`, pointing at a `StatefulSet`, and the `service` must be what is specified under `.spec.serviceName` in the `StatefulSet`. * The `aws_cloud_map` node accepts the following properties: * `namespace` The CloudMap namespace where the service is register, e.g. `cloudmap`. If no `namespace` is specified, this will fail to start the Load Balancer exporter. * `service_name` The name of the service that you specified when you registered the instance, e.g. `otelcollectors`. If no `service_name` is specified, this will fail to start the Load Balancer exporter. @@ -231,6 +232,8 @@ service: ``` Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md)) +> [!IMPORTANT] +> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information. ```yaml receivers: diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index b9682df16892..cee37b4d14fe 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -69,9 +69,10 @@ type DNSResolver struct { // K8sSvcResolver defines the configuration for the DNS resolver type K8sSvcResolver struct { - Service string `mapstructure:"service"` - Ports []int32 `mapstructure:"ports"` - Timeout time.Duration `mapstructure:"timeout"` + Service string `mapstructure:"service"` + Ports []int32 `mapstructure:"ports"` + Timeout time.Duration `mapstructure:"timeout"` + ReturnHostnames bool `mapstructure:"return_hostnames"` } type AWSCloudMapResolver struct { diff --git a/exporter/loadbalancingexporter/loadbalancer.go b/exporter/loadbalancingexporter/loadbalancer.go index 12ef0b5025fa..c14ad3d91183 100644 --- a/exporter/loadbalancingexporter/loadbalancer.go +++ b/exporter/loadbalancingexporter/loadbalancer.go @@ -102,6 +102,7 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component oCfg.Resolver.K8sSvc.Service, oCfg.Resolver.K8sSvc.Ports, oCfg.Resolver.K8sSvc.Timeout, + oCfg.Resolver.K8sSvc.ReturnHostnames, telemetry, ) if err != nil { diff --git a/exporter/loadbalancingexporter/resolver_k8s.go b/exporter/loadbalancingexporter/resolver_k8s.go index 87940260a880..040eb8120814 100644 --- a/exporter/loadbalancingexporter/resolver_k8s.go +++ b/exporter/loadbalancingexporter/resolver_k8s.go @@ -61,6 +61,7 @@ type k8sResolver struct { endpoints []string onChangeCallbacks []func([]string) + returnNames bool stopCh chan struct{} updateLock sync.RWMutex @@ -75,6 +76,7 @@ func newK8sResolver(clt kubernetes.Interface, service string, ports []int32, timeout time.Duration, + returnNames bool, tb *metadata.TelemetryBuilder, ) (*k8sResolver, error) { if len(service) == 0 { @@ -115,9 +117,10 @@ func newK8sResolver(clt kubernetes.Interface, epsStore := &sync.Map{} h := &handler{ - endpoints: epsStore, - logger: logger, - telemetry: tb, + endpoints: epsStore, + logger: logger, + telemetry: tb, + returnNames: returnNames, } r := &k8sResolver{ logger: logger, @@ -131,6 +134,7 @@ func newK8sResolver(clt kubernetes.Interface, stopCh: make(chan struct{}), lwTimeout: timeout, telemetry: tb, + returnNames: returnNames, } h.callback = r.resolve @@ -187,13 +191,19 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) { defer r.shutdownWg.Done() var backends []string - r.endpointsStore.Range(func(address, _ any) bool { - addr := address.(string) + var ep string + r.endpointsStore.Range(func(host, _ any) bool { + switch r.returnNames { + case true: + ep = fmt.Sprintf("%s.%s.%s", host, r.svcName, r.svcNs) + default: + ep = host.(string) + } if len(r.port) == 0 { - backends = append(backends, addr) + backends = append(backends, ep) } else { for _, port := range r.port { - backends = append(backends, net.JoinHostPort(addr, strconv.FormatInt(int64(port), 10))) + backends = append(backends, net.JoinHostPort(ep, strconv.FormatInt(int64(port), 10))) } } return true diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler.go b/exporter/loadbalancingexporter/resolver_k8s_handler.go index 186111eba4d5..895c7fbfca2c 100644 --- a/exporter/loadbalancingexporter/resolver_k8s_handler.go +++ b/exporter/loadbalancingexporter/resolver_k8s_handler.go @@ -17,19 +17,31 @@ import ( var _ cache.ResourceEventHandler = (*handler)(nil) +const ( + epMissingHostnamesMsg = "Endpoints object missing hostnames" +) + type handler struct { - endpoints *sync.Map - callback func(ctx context.Context) ([]string, error) - logger *zap.Logger - telemetry *metadata.TelemetryBuilder + endpoints *sync.Map + callback func(ctx context.Context) ([]string, error) + logger *zap.Logger + telemetry *metadata.TelemetryBuilder + returnNames bool } func (h handler) OnAdd(obj any, _ bool) { var endpoints map[string]bool + var ok bool switch object := obj.(type) { case *corev1.Endpoints: - endpoints = convertToEndpoints(object) + ok, endpoints = convertToEndpoints(h.returnNames, object) + if !ok { + h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) + return + } + default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj)) h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) @@ -56,8 +68,14 @@ func (h handler) OnUpdate(oldObj, newObj any) { return } - oldEndpoints := convertToEndpoints(oldEps) - newEndpoints := convertToEndpoints(newEps) + _, oldEndpoints := convertToEndpoints(h.returnNames, oldEps) + hostnameOk, newEndpoints := convertToEndpoints(h.returnNames, newEps) + if !hostnameOk { + h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", newEps)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) + return + } + changed := false // Iterate through old endpoints and remove those that are not in the new list. @@ -80,6 +98,7 @@ func (h handler) OnUpdate(oldObj, newObj any) { } else { h.logger.Debug("No changes detected in the endpoints for the service", zap.Any("old", oldEps), zap.Any("new", newEps)) } + default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj)) h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) @@ -89,13 +108,20 @@ func (h handler) OnUpdate(oldObj, newObj any) { func (h handler) OnDelete(obj any) { var endpoints map[string]bool + var ok bool + switch object := obj.(type) { case *cache.DeletedFinalStateUnknown: h.OnDelete(object.Obj) return case *corev1.Endpoints: if object != nil { - endpoints = convertToEndpoints(object) + ok, endpoints = convertToEndpoints(h.returnNames, object) + if !ok { + h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj)) + h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) + return + } } default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj)) @@ -110,14 +136,21 @@ func (h handler) OnDelete(obj any) { } } -func convertToEndpoints(eps ...*corev1.Endpoints) map[string]bool { - ipAddress := map[string]bool{} +func convertToEndpoints(retNames bool, eps ...*corev1.Endpoints) (bool, map[string]bool) { + res := map[string]bool{} for _, ep := range eps { for _, subsets := range ep.Subsets { for _, addr := range subsets.Addresses { - ipAddress[addr.IP] = true + if retNames { + if addr.Hostname == "" { + return false, nil + } + res[addr.Hostname] = true + } else { + res[addr.IP] = true + } } } } - return ipAddress + return true, res } diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler_test.go b/exporter/loadbalancingexporter/resolver_k8s_handler_test.go new file mode 100644 index 000000000000..1a8cbd70c223 --- /dev/null +++ b/exporter/loadbalancingexporter/resolver_k8s_handler_test.go @@ -0,0 +1,105 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestConvertToEndpoints(tst *testing.T) { + // Create dummy Endpoints objects + endpoints1 := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-endpoints-1", + Namespace: "test-namespace", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + Hostname: "pod-1", + IP: "192.168.10.101", + }, + }, + }, + }, + } + endpoints2 := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-endpoints-2", + Namespace: "test-namespace", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + Hostname: "pod-2", + IP: "192.168.10.102", + }, + }, + }, + }, + } + endpoints3 := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-endpoints-3", + Namespace: "test-namespace", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "192.168.10.103", + }, + }, + }, + }, + } + + tests := []struct { + name string + returnNames bool + includedEndpoints []*corev1.Endpoints + expectedEndpoints map[string]bool + wantNil bool + }{ + { + name: "return hostnames", + returnNames: true, + includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2}, + expectedEndpoints: map[string]bool{"pod-1": true, "pod-2": true}, + wantNil: false, + }, + { + name: "return IPs", + returnNames: false, + includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2, endpoints3}, + expectedEndpoints: map[string]bool{"192.168.10.101": true, "192.168.10.102": true, "192.168.10.103": true}, + wantNil: false, + }, + { + name: "missing hostname", + returnNames: true, + includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints3}, + expectedEndpoints: nil, + wantNil: true, + }, + } + + for _, tt := range tests { + tst.Run(tt.name, func(tst *testing.T) { + ok, res := convertToEndpoints(tt.returnNames, tt.includedEndpoints...) + if tt.wantNil { + assert.Nil(tst, res) + } else { + assert.Equal(tst, tt.expectedEndpoints, res) + } + assert.Equal(tst, !tt.wantNil, ok) + }) + } +} diff --git a/exporter/loadbalancingexporter/resolver_k8s_test.go b/exporter/loadbalancingexporter/resolver_k8s_test.go index 5a4e77dd593b..c344fc1c6bb8 100644 --- a/exporter/loadbalancingexporter/resolver_k8s_test.go +++ b/exporter/loadbalancingexporter/resolver_k8s_test.go @@ -21,10 +21,11 @@ import ( func TestK8sResolve(t *testing.T) { type args struct { - logger *zap.Logger - service string - ports []int32 - namespace string + logger *zap.Logger + service string + ports []int32 + namespace string + returnHostnames bool } type suiteContext struct { endpoint *corev1.Endpoints @@ -32,7 +33,7 @@ func TestK8sResolve(t *testing.T) { resolver *k8sResolver } setupSuite := func(t *testing.T, args args) (*suiteContext, func(*testing.T)) { - service, defaultNs, ports := args.service, args.namespace, args.ports + service, defaultNs, ports, returnHostnames := args.service, args.namespace, args.ports, args.returnHostnames endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: service, @@ -41,7 +42,10 @@ func TestK8sResolve(t *testing.T) { Subsets: []corev1.EndpointSubset{ { Addresses: []corev1.EndpointAddress{ - {IP: "192.168.10.100"}, + { + Hostname: "pod-0", + IP: "192.168.10.100", + }, }, }, }, @@ -50,14 +54,18 @@ func TestK8sResolve(t *testing.T) { for _, subset := range endpoint.Subsets { for _, address := range subset.Addresses { for _, port := range args.ports { - expectInit = append(expectInit, fmt.Sprintf("%s:%d", address.IP, port)) + if returnHostnames { + expectInit = append(expectInit, fmt.Sprintf("%s.%s.%s:%d", address.Hostname, service, defaultNs, port)) + } else { + expectInit = append(expectInit, fmt.Sprintf("%s:%d", address.IP, port)) + } } } } cl := fake.NewSimpleClientset(endpoint) _, tb := getTelemetryAssets(t) - res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout, tb) + res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout, returnHostnames, tb) require.NoError(t, err) require.NoError(t, res.start(context.Background())) @@ -81,7 +89,7 @@ func TestK8sResolve(t *testing.T) { verifyFn func(*suiteContext, args) error }{ { - name: "simulate append the backend ip address", + name: "add new IP to existing backends", args: args{ logger: zap.NewNop(), service: "lb", @@ -153,7 +161,45 @@ func TestK8sResolve(t *testing.T) { }, }, { - name: "simulate change the backend ip address", + name: "add new hostname to existing backends", + args: args{ + logger: zap.NewNop(), + service: "lb", + namespace: "default", + ports: []int32{8080, 9090}, + returnHostnames: true, + }, + simulateFn: func(suiteCtx *suiteContext, args args) error { + endpoint, exist := suiteCtx.endpoint.DeepCopy(), suiteCtx.endpoint.DeepCopy() + endpoint.Subsets = append(endpoint.Subsets, corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{{IP: "10.10.0.11", Hostname: "pod-1"}}, + }) + patch := client.MergeFrom(exist) + data, err := patch.Data(endpoint) + if err != nil { + return err + } + _, err = suiteCtx.clientset.CoreV1().Endpoints(args.namespace). + Patch(context.TODO(), args.service, types.MergePatchType, data, metav1.PatchOptions{}) + return err + }, + verifyFn: func(ctx *suiteContext, _ args) error { + if _, err := ctx.resolver.resolve(context.Background()); err != nil { + return err + } + + assert.Equal(t, []string{ + "pod-0.lb.default:8080", + "pod-0.lb.default:9090", + "pod-1.lb.default:8080", + "pod-1.lb.default:9090", + }, ctx.resolver.Endpoints(), "resolver failed, endpoints not equal") + + return nil + }, + }, + { + name: "change existing backend ip address", args: args{ logger: zap.NewNop(), service: "lb", @@ -281,7 +327,7 @@ func Test_newK8sResolver(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, tb := getTelemetryAssets(t) - got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout, tb) + got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout, false, tb) if tt.wantErr != nil { require.ErrorIs(t, err, tt.wantErr) } else {