From 991dee9cf3d40283844c39181e9345c1f0e46dfb Mon Sep 17 00:00:00 2001
From: Matt Hoey <matt.hoey@missionlane.com>
Date: Mon, 23 Sep 2024 16:55:54 -0700
Subject: [PATCH] [exporter/loadbalancing] Add return_hostnames option to k8s
 resolver

Resolves 18412
---
 ...xporter-return-hostnames-k8s-resolver.yaml |  27 +++++
 exporter/loadbalancingexporter/README.md      |   3 +
 exporter/loadbalancingexporter/config.go      |   7 +-
 .../loadbalancingexporter/loadbalancer.go     |   1 +
 .../loadbalancingexporter/resolver_k8s.go     |  24 ++--
 .../resolver_k8s_handler.go                   |  57 ++++++++--
 .../resolver_k8s_handler_test.go              | 106 ++++++++++++++++++
 .../resolver_k8s_test.go                      |  69 ++++++++++--
 8 files changed, 261 insertions(+), 33 deletions(-)
 create mode 100644 .chloggen/lbexporter-return-hostnames-k8s-resolver.yaml
 create mode 100644 exporter/loadbalancingexporter/resolver_k8s_handler_test.go

diff --git a/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml b/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml
new file mode 100644
index 000000000000..c8bd302960f2
--- /dev/null
+++ b/.chloggen/lbexporter-return-hostnames-k8s-resolver.yaml
@@ -0,0 +1,27 @@
+# Use this changelog template to create an entry for release notes.
+
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: enhancement
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: loadbalancingexporter
+
+# A brief description of the change.  Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: Adds a an optional configuration to the k8s resolver which returns hostnames instead of IPs for headless services pointing at statefulsets
+
+# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
+issues: [18412]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext:
+
+# If your change doesn't affect end users or the exported elements of any package,
+# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
+# Optional: The change log or logs in which this entry should be included.
+# e.g. '[user]' or '[user, api]'
+# Include 'user' if the change is relevant to end users.
+# Include 'api' if there is a change to a library API.
+# Default: '[user]'
+change_logs: []
diff --git a/exporter/loadbalancingexporter/README.md b/exporter/loadbalancingexporter/README.md
index 5df7812fb204..a6d5e8606d8e 100644
--- a/exporter/loadbalancingexporter/README.md
+++ b/exporter/loadbalancingexporter/README.md
@@ -96,6 +96,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
   * `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
   * `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.
   * `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.
+  * `return_hostnames` will return hostnames instead of IPs. This is useful in certain situations like using istio in sidecar mode. To use this feature, the `service` must be a headless `Service`, pointing at a `StatefulSet`, and the `service` must be what is specified under `.spec.serviceName` in the `StatefulSet`.
 * The `aws_cloud_map` node accepts the following properties:
   * `namespace` The CloudMap namespace where the service is register, e.g. `cloudmap`. If no `namespace` is specified, this will fail to start the Load Balancer exporter.
   * `service_name` The name of the service that you specified when you registered the instance, e.g. `otelcollectors`.  If no `service_name` is specified, this will fail to start the Load Balancer exporter.
@@ -231,6 +232,8 @@ service:
 ```
 
 Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))
+> [!IMPORTANT]
+> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information.
 
 ```yaml
 receivers:
diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go
index b9682df16892..cee37b4d14fe 100644
--- a/exporter/loadbalancingexporter/config.go
+++ b/exporter/loadbalancingexporter/config.go
@@ -69,9 +69,10 @@ type DNSResolver struct {
 
 // K8sSvcResolver defines the configuration for the DNS resolver
 type K8sSvcResolver struct {
-	Service string        `mapstructure:"service"`
-	Ports   []int32       `mapstructure:"ports"`
-	Timeout time.Duration `mapstructure:"timeout"`
+	Service         string        `mapstructure:"service"`
+	Ports           []int32       `mapstructure:"ports"`
+	Timeout         time.Duration `mapstructure:"timeout"`
+	ReturnHostnames bool          `mapstructure:"return_hostnames"`
 }
 
 type AWSCloudMapResolver struct {
diff --git a/exporter/loadbalancingexporter/loadbalancer.go b/exporter/loadbalancingexporter/loadbalancer.go
index 12ef0b5025fa..c14ad3d91183 100644
--- a/exporter/loadbalancingexporter/loadbalancer.go
+++ b/exporter/loadbalancingexporter/loadbalancer.go
@@ -102,6 +102,7 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component
 			oCfg.Resolver.K8sSvc.Service,
 			oCfg.Resolver.K8sSvc.Ports,
 			oCfg.Resolver.K8sSvc.Timeout,
+			oCfg.Resolver.K8sSvc.ReturnHostnames,
 			telemetry,
 		)
 		if err != nil {
diff --git a/exporter/loadbalancingexporter/resolver_k8s.go b/exporter/loadbalancingexporter/resolver_k8s.go
index 87940260a880..040eb8120814 100644
--- a/exporter/loadbalancingexporter/resolver_k8s.go
+++ b/exporter/loadbalancingexporter/resolver_k8s.go
@@ -61,6 +61,7 @@ type k8sResolver struct {
 
 	endpoints         []string
 	onChangeCallbacks []func([]string)
+	returnNames       bool
 
 	stopCh             chan struct{}
 	updateLock         sync.RWMutex
@@ -75,6 +76,7 @@ func newK8sResolver(clt kubernetes.Interface,
 	service string,
 	ports []int32,
 	timeout time.Duration,
+	returnNames bool,
 	tb *metadata.TelemetryBuilder,
 ) (*k8sResolver, error) {
 	if len(service) == 0 {
@@ -115,9 +117,10 @@ func newK8sResolver(clt kubernetes.Interface,
 
 	epsStore := &sync.Map{}
 	h := &handler{
-		endpoints: epsStore,
-		logger:    logger,
-		telemetry: tb,
+		endpoints:   epsStore,
+		logger:      logger,
+		telemetry:   tb,
+		returnNames: returnNames,
 	}
 	r := &k8sResolver{
 		logger:         logger,
@@ -131,6 +134,7 @@ func newK8sResolver(clt kubernetes.Interface,
 		stopCh:         make(chan struct{}),
 		lwTimeout:      timeout,
 		telemetry:      tb,
+		returnNames:    returnNames,
 	}
 	h.callback = r.resolve
 
@@ -187,13 +191,19 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) {
 	defer r.shutdownWg.Done()
 
 	var backends []string
-	r.endpointsStore.Range(func(address, _ any) bool {
-		addr := address.(string)
+	var ep string
+	r.endpointsStore.Range(func(host, _ any) bool {
+		switch r.returnNames {
+		case true:
+			ep = fmt.Sprintf("%s.%s.%s", host, r.svcName, r.svcNs)
+		default:
+			ep = host.(string)
+		}
 		if len(r.port) == 0 {
-			backends = append(backends, addr)
+			backends = append(backends, ep)
 		} else {
 			for _, port := range r.port {
-				backends = append(backends, net.JoinHostPort(addr, strconv.FormatInt(int64(port), 10)))
+				backends = append(backends, net.JoinHostPort(ep, strconv.FormatInt(int64(port), 10)))
 			}
 		}
 		return true
diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler.go b/exporter/loadbalancingexporter/resolver_k8s_handler.go
index 186111eba4d5..895c7fbfca2c 100644
--- a/exporter/loadbalancingexporter/resolver_k8s_handler.go
+++ b/exporter/loadbalancingexporter/resolver_k8s_handler.go
@@ -17,19 +17,31 @@ import (
 
 var _ cache.ResourceEventHandler = (*handler)(nil)
 
+const (
+	epMissingHostnamesMsg = "Endpoints object missing hostnames"
+)
+
 type handler struct {
-	endpoints *sync.Map
-	callback  func(ctx context.Context) ([]string, error)
-	logger    *zap.Logger
-	telemetry *metadata.TelemetryBuilder
+	endpoints   *sync.Map
+	callback    func(ctx context.Context) ([]string, error)
+	logger      *zap.Logger
+	telemetry   *metadata.TelemetryBuilder
+	returnNames bool
 }
 
 func (h handler) OnAdd(obj any, _ bool) {
 	var endpoints map[string]bool
+	var ok bool
 
 	switch object := obj.(type) {
 	case *corev1.Endpoints:
-		endpoints = convertToEndpoints(object)
+		ok, endpoints = convertToEndpoints(h.returnNames, object)
+		if !ok {
+			h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
+			h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
+			return
+		}
+
 	default: // unsupported
 		h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj))
 		h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
@@ -56,8 +68,14 @@ func (h handler) OnUpdate(oldObj, newObj any) {
 			return
 		}
 
-		oldEndpoints := convertToEndpoints(oldEps)
-		newEndpoints := convertToEndpoints(newEps)
+		_, oldEndpoints := convertToEndpoints(h.returnNames, oldEps)
+		hostnameOk, newEndpoints := convertToEndpoints(h.returnNames, newEps)
+		if !hostnameOk {
+			h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", newEps))
+			h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
+			return
+		}
+
 		changed := false
 
 		// Iterate through old endpoints and remove those that are not in the new list.
@@ -80,6 +98,7 @@ func (h handler) OnUpdate(oldObj, newObj any) {
 		} else {
 			h.logger.Debug("No changes detected in the endpoints for the service", zap.Any("old", oldEps), zap.Any("new", newEps))
 		}
+
 	default: // unsupported
 		h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj))
 		h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
@@ -89,13 +108,20 @@ func (h handler) OnUpdate(oldObj, newObj any) {
 
 func (h handler) OnDelete(obj any) {
 	var endpoints map[string]bool
+	var ok bool
+
 	switch object := obj.(type) {
 	case *cache.DeletedFinalStateUnknown:
 		h.OnDelete(object.Obj)
 		return
 	case *corev1.Endpoints:
 		if object != nil {
-			endpoints = convertToEndpoints(object)
+			ok, endpoints = convertToEndpoints(h.returnNames, object)
+			if !ok {
+				h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
+				h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
+				return
+			}
 		}
 	default: // unsupported
 		h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj))
@@ -110,14 +136,21 @@ func (h handler) OnDelete(obj any) {
 	}
 }
 
-func convertToEndpoints(eps ...*corev1.Endpoints) map[string]bool {
-	ipAddress := map[string]bool{}
+func convertToEndpoints(retNames bool, eps ...*corev1.Endpoints) (bool, map[string]bool) {
+	res := map[string]bool{}
 	for _, ep := range eps {
 		for _, subsets := range ep.Subsets {
 			for _, addr := range subsets.Addresses {
-				ipAddress[addr.IP] = true
+				if retNames {
+					if addr.Hostname == "" {
+						return false, nil
+					}
+					res[addr.Hostname] = true
+				} else {
+					res[addr.IP] = true
+				}
 			}
 		}
 	}
-	return ipAddress
+	return true, res
 }
diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler_test.go b/exporter/loadbalancingexporter/resolver_k8s_handler_test.go
new file mode 100644
index 000000000000..9a7a6011c30d
--- /dev/null
+++ b/exporter/loadbalancingexporter/resolver_k8s_handler_test.go
@@ -0,0 +1,106 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package loadbalancingexporter
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	corev1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+func TestConvertToEndpoints(tst *testing.T) {
+	// Create dummy Endpoints objects
+	endpoints1 := &corev1.Endpoints{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      "test-endpoints-1",
+			Namespace: "test-namespace",
+		},
+		Subsets: []corev1.EndpointSubset{
+			{
+				Addresses: []corev1.EndpointAddress{
+					{
+						Hostname: "pod-1",
+						IP:       "192.168.10.101",
+					},
+				},
+			},
+		},
+	}
+	endpoints2 := &corev1.Endpoints{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      "test-endpoints-2",
+			Namespace: "test-namespace",
+		},
+		Subsets: []corev1.EndpointSubset{
+			{
+				Addresses: []corev1.EndpointAddress{
+					{
+						Hostname: "pod-2",
+						IP:       "192.168.10.102",
+					},
+				},
+			},
+		},
+	}
+	endpoints3 := &corev1.Endpoints{
+		ObjectMeta: metav1.ObjectMeta{
+			Name:      "test-endpoints-3",
+			Namespace: "test-namespace",
+		},
+		Subsets: []corev1.EndpointSubset{
+			{
+				Addresses: []corev1.EndpointAddress{
+					{
+						IP: "192.168.10.103",
+					},
+				},
+			},
+		},
+	}
+
+	tests := []struct {
+		name              string
+		returnNames       bool
+		includedEndpoints []*corev1.Endpoints
+		expectedEndpoints map[string]bool
+		wantNil           bool
+	}{
+		{
+			name:              "return hostnames",
+			returnNames:       true,
+			includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2},
+			expectedEndpoints: map[string]bool{"pod-1": true, "pod-2": true},
+			wantNil:           false,
+		},
+		{
+			name:              "return IPs",
+			returnNames:       false,
+			includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2, endpoints3},
+			expectedEndpoints: map[string]bool{"192.168.10.101": true, "192.168.10.102": true, "192.168.10.103": true},
+			wantNil:           false,
+		},
+		{
+			name:              "missing hostname",
+			returnNames:       true,
+			includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints3},
+			expectedEndpoints: nil,
+			wantNil:           true,
+		},
+	}
+
+	for _, tt := range tests {
+		tst.Run(tt.name, func(tst *testing.T) {
+			ok, res := convertToEndpoints(tt.returnNames, tt.includedEndpoints...)
+			if tt.wantNil {
+				assert.Nil(tst, res)
+			} else {
+				assert.Equal(tst, tt.expectedEndpoints, res)
+			}
+			assert.Equal(tst, !tt.wantNil, ok)
+		})
+	}
+
+}
diff --git a/exporter/loadbalancingexporter/resolver_k8s_test.go b/exporter/loadbalancingexporter/resolver_k8s_test.go
index 5a4e77dd593b..671c7447bf1e 100644
--- a/exporter/loadbalancingexporter/resolver_k8s_test.go
+++ b/exporter/loadbalancingexporter/resolver_k8s_test.go
@@ -21,10 +21,11 @@ import (
 
 func TestK8sResolve(t *testing.T) {
 	type args struct {
-		logger    *zap.Logger
-		service   string
-		ports     []int32
-		namespace string
+		logger          *zap.Logger
+		service         string
+		ports           []int32
+		namespace       string
+		returnHostnames bool
 	}
 	type suiteContext struct {
 		endpoint  *corev1.Endpoints
@@ -32,7 +33,7 @@ func TestK8sResolve(t *testing.T) {
 		resolver  *k8sResolver
 	}
 	setupSuite := func(t *testing.T, args args) (*suiteContext, func(*testing.T)) {
-		service, defaultNs, ports := args.service, args.namespace, args.ports
+		service, defaultNs, ports, returnHostnames := args.service, args.namespace, args.ports, args.returnHostnames
 		endpoint := &corev1.Endpoints{
 			ObjectMeta: metav1.ObjectMeta{
 				Name:      service,
@@ -41,7 +42,10 @@ func TestK8sResolve(t *testing.T) {
 			Subsets: []corev1.EndpointSubset{
 				{
 					Addresses: []corev1.EndpointAddress{
-						{IP: "192.168.10.100"},
+						{
+							Hostname: "pod-0",
+							IP:       "192.168.10.100",
+						},
 					},
 				},
 			},
@@ -50,14 +54,18 @@ func TestK8sResolve(t *testing.T) {
 		for _, subset := range endpoint.Subsets {
 			for _, address := range subset.Addresses {
 				for _, port := range args.ports {
-					expectInit = append(expectInit, fmt.Sprintf("%s:%d", address.IP, port))
+					if returnHostnames {
+						expectInit = append(expectInit, fmt.Sprintf("%s.%s.%s:%d", address.Hostname, service, defaultNs, port))
+					} else {
+						expectInit = append(expectInit, fmt.Sprintf("%s:%d", address.IP, port))
+					}
 				}
 			}
 		}
 
 		cl := fake.NewSimpleClientset(endpoint)
 		_, tb := getTelemetryAssets(t)
-		res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout, tb)
+		res, err := newK8sResolver(cl, zap.NewNop(), service, ports, defaultListWatchTimeout, returnHostnames, tb)
 		require.NoError(t, err)
 
 		require.NoError(t, res.start(context.Background()))
@@ -81,7 +89,7 @@ func TestK8sResolve(t *testing.T) {
 		verifyFn   func(*suiteContext, args) error
 	}{
 		{
-			name: "simulate append the backend ip address",
+			name: "add new IP to existing backends",
 			args: args{
 				logger:    zap.NewNop(),
 				service:   "lb",
@@ -153,7 +161,46 @@ func TestK8sResolve(t *testing.T) {
 			},
 		},
 		{
-			name: "simulate change the backend ip address",
+			name: "add new hostname to existing backends",
+			args: args{
+				logger:          zap.NewNop(),
+				service:         "lb",
+				namespace:       "default",
+				ports:           []int32{8080, 9090},
+				returnHostnames: true,
+			},
+			simulateFn: func(suiteCtx *suiteContext, args args) error {
+				endpoint, exist := suiteCtx.endpoint.DeepCopy(), suiteCtx.endpoint.DeepCopy()
+				endpoint.Subsets = append(endpoint.Subsets, corev1.EndpointSubset{
+					Addresses: []corev1.EndpointAddress{{IP: "10.10.0.11", Hostname: "pod-1"}},
+				})
+				patch := client.MergeFrom(exist)
+				data, err := patch.Data(endpoint)
+				if err != nil {
+					return err
+				}
+				_, err = suiteCtx.clientset.CoreV1().Endpoints(args.namespace).
+					Patch(context.TODO(), args.service, types.MergePatchType, data, metav1.PatchOptions{})
+				return err
+
+			},
+			verifyFn: func(ctx *suiteContext, _ args) error {
+				if _, err := ctx.resolver.resolve(context.Background()); err != nil {
+					return err
+				}
+
+				assert.Equal(t, []string{
+					"pod-0.lb.default:8080",
+					"pod-0.lb.default:9090",
+					"pod-1.lb.default:8080",
+					"pod-1.lb.default:9090",
+				}, ctx.resolver.Endpoints(), "resolver failed, endpoints not equal")
+
+				return nil
+			},
+		},
+		{
+			name: "change existing backend ip address",
 			args: args{
 				logger:    zap.NewNop(),
 				service:   "lb",
@@ -281,7 +328,7 @@ func Test_newK8sResolver(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			_, tb := getTelemetryAssets(t)
-			got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout, tb)
+			got, err := newK8sResolver(fake.NewSimpleClientset(), tt.args.logger, tt.args.service, tt.args.ports, defaultListWatchTimeout, false, tb)
 			if tt.wantErr != nil {
 				require.ErrorIs(t, err, tt.wantErr)
 			} else {