Skip to content

Commit

Permalink
[exporter/loadbalancing] Add return_hostnames option to k8s resolver (#…
Browse files Browse the repository at this point in the history
…35411)

**Description:** Adds an optional configuration option to the k8s
resolver which allows for hostnames to be returned instead of IPs. This
allows certain scenarios like using istio in sidecar mode. Requirements
have been added to the documentation.

**Link to tracking Issue:** #18412

**Testing:** Added corresponding hostname-based tests for adding
backends/endpoints as well as deleting them. There were also tests
missing for the k8s handler and so some tests were added as well there.
Specifically failing if you want hostnames, but endpoints are returned
that do not have hostnames.

Aside from unit tests, also ran this in our lab cluster and deleted pods
or performed rollouts to our statefulset.

Somewhat tangential to the PR itself, but istio now reports mTLS traffic
with zero workarounds required which was the motivation for the issue.

**Documentation:** Added documentation explaining the new option and the
requirements needed to use it. Also added an additional "important" note
object specifically calling out that the k8s resolver needs RBAC to
work.
  • Loading branch information
snuggie12 authored Dec 12, 2024
1 parent ab0f6a2 commit 964a652
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 33 deletions.
27 changes: 27 additions & 0 deletions .chloggen/lbexporter-return-hostnames-k8s-resolver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds a an optional configuration to the k8s resolver which returns hostnames instead of IPs for headless services pointing at statefulsets

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [18412]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 3 additions & 0 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.
* `return_hostnames` will return hostnames instead of IPs. This is useful in certain situations like using istio in sidecar mode. To use this feature, the `service` must be a headless `Service`, pointing at a `StatefulSet`, and the `service` must be what is specified under `.spec.serviceName` in the `StatefulSet`.
* The `aws_cloud_map` node accepts the following properties:
* `namespace` The CloudMap namespace where the service is register, e.g. `cloudmap`. If no `namespace` is specified, this will fail to start the Load Balancer exporter.
* `service_name` The name of the service that you specified when you registered the instance, e.g. `otelcollectors`. If no `service_name` is specified, this will fail to start the Load Balancer exporter.
Expand Down Expand Up @@ -231,6 +232,8 @@ service:
```
Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))
> [!IMPORTANT]
> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information.
```yaml
receivers:
Expand Down
7 changes: 4 additions & 3 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ type DNSResolver struct {

// K8sSvcResolver defines the configuration for the DNS resolver
type K8sSvcResolver struct {
Service string `mapstructure:"service"`
Ports []int32 `mapstructure:"ports"`
Timeout time.Duration `mapstructure:"timeout"`
Service string `mapstructure:"service"`
Ports []int32 `mapstructure:"ports"`
Timeout time.Duration `mapstructure:"timeout"`
ReturnHostnames bool `mapstructure:"return_hostnames"`
}

type AWSCloudMapResolver struct {
Expand Down
1 change: 1 addition & 0 deletions exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component
oCfg.Resolver.K8sSvc.Service,
oCfg.Resolver.K8sSvc.Ports,
oCfg.Resolver.K8sSvc.Timeout,
oCfg.Resolver.K8sSvc.ReturnHostnames,
telemetry,
)
if err != nil {
Expand Down
24 changes: 17 additions & 7 deletions exporter/loadbalancingexporter/resolver_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type k8sResolver struct {

endpoints []string
onChangeCallbacks []func([]string)
returnNames bool

stopCh chan struct{}
updateLock sync.RWMutex
Expand All @@ -75,6 +76,7 @@ func newK8sResolver(clt kubernetes.Interface,
service string,
ports []int32,
timeout time.Duration,
returnNames bool,
tb *metadata.TelemetryBuilder,
) (*k8sResolver, error) {
if len(service) == 0 {
Expand Down Expand Up @@ -115,9 +117,10 @@ func newK8sResolver(clt kubernetes.Interface,

epsStore := &sync.Map{}
h := &handler{
endpoints: epsStore,
logger: logger,
telemetry: tb,
endpoints: epsStore,
logger: logger,
telemetry: tb,
returnNames: returnNames,
}
r := &k8sResolver{
logger: logger,
Expand All @@ -131,6 +134,7 @@ func newK8sResolver(clt kubernetes.Interface,
stopCh: make(chan struct{}),
lwTimeout: timeout,
telemetry: tb,
returnNames: returnNames,
}
h.callback = r.resolve

Expand Down Expand Up @@ -187,13 +191,19 @@ func (r *k8sResolver) resolve(ctx context.Context) ([]string, error) {
defer r.shutdownWg.Done()

var backends []string
r.endpointsStore.Range(func(address, _ any) bool {
addr := address.(string)
var ep string
r.endpointsStore.Range(func(host, _ any) bool {
switch r.returnNames {
case true:
ep = fmt.Sprintf("%s.%s.%s", host, r.svcName, r.svcNs)
default:
ep = host.(string)
}
if len(r.port) == 0 {
backends = append(backends, addr)
backends = append(backends, ep)
} else {
for _, port := range r.port {
backends = append(backends, net.JoinHostPort(addr, strconv.FormatInt(int64(port), 10)))
backends = append(backends, net.JoinHostPort(ep, strconv.FormatInt(int64(port), 10)))
}
}
return true
Expand Down
57 changes: 45 additions & 12 deletions exporter/loadbalancingexporter/resolver_k8s_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,31 @@ import (

var _ cache.ResourceEventHandler = (*handler)(nil)

const (
epMissingHostnamesMsg = "Endpoints object missing hostnames"
)

type handler struct {
endpoints *sync.Map
callback func(ctx context.Context) ([]string, error)
logger *zap.Logger
telemetry *metadata.TelemetryBuilder
endpoints *sync.Map
callback func(ctx context.Context) ([]string, error)
logger *zap.Logger
telemetry *metadata.TelemetryBuilder
returnNames bool
}

func (h handler) OnAdd(obj any, _ bool) {
var endpoints map[string]bool
var ok bool

switch object := obj.(type) {
case *corev1.Endpoints:
endpoints = convertToEndpoints(object)
ok, endpoints = convertToEndpoints(h.returnNames, object)
if !ok {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
Expand All @@ -56,8 +68,14 @@ func (h handler) OnUpdate(oldObj, newObj any) {
return
}

oldEndpoints := convertToEndpoints(oldEps)
newEndpoints := convertToEndpoints(newEps)
_, oldEndpoints := convertToEndpoints(h.returnNames, oldEps)
hostnameOk, newEndpoints := convertToEndpoints(h.returnNames, newEps)
if !hostnameOk {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", newEps))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

changed := false

// Iterate through old endpoints and remove those that are not in the new list.
Expand All @@ -80,6 +98,7 @@ func (h handler) OnUpdate(oldObj, newObj any) {
} else {
h.logger.Debug("No changes detected in the endpoints for the service", zap.Any("old", oldEps), zap.Any("new", newEps))
}

default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
Expand All @@ -89,13 +108,20 @@ func (h handler) OnUpdate(oldObj, newObj any) {

func (h handler) OnDelete(obj any) {
var endpoints map[string]bool
var ok bool

switch object := obj.(type) {
case *cache.DeletedFinalStateUnknown:
h.OnDelete(object.Obj)
return
case *corev1.Endpoints:
if object != nil {
endpoints = convertToEndpoints(object)
ok, endpoints = convertToEndpoints(h.returnNames, object)
if !ok {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}
}
default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the removal of the pods for a service", zap.Any("obj", obj))
Expand All @@ -110,14 +136,21 @@ func (h handler) OnDelete(obj any) {
}
}

func convertToEndpoints(eps ...*corev1.Endpoints) map[string]bool {
ipAddress := map[string]bool{}
func convertToEndpoints(retNames bool, eps ...*corev1.Endpoints) (bool, map[string]bool) {
res := map[string]bool{}
for _, ep := range eps {
for _, subsets := range ep.Subsets {
for _, addr := range subsets.Addresses {
ipAddress[addr.IP] = true
if retNames {
if addr.Hostname == "" {
return false, nil
}
res[addr.Hostname] = true
} else {
res[addr.IP] = true
}
}
}
}
return ipAddress
return true, res
}
105 changes: 105 additions & 0 deletions exporter/loadbalancingexporter/resolver_k8s_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package loadbalancingexporter

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestConvertToEndpoints(tst *testing.T) {
// Create dummy Endpoints objects
endpoints1 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-1",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
Hostname: "pod-1",
IP: "192.168.10.101",
},
},
},
},
}
endpoints2 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-2",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
Hostname: "pod-2",
IP: "192.168.10.102",
},
},
},
},
}
endpoints3 := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-endpoints-3",
Namespace: "test-namespace",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.10.103",
},
},
},
},
}

tests := []struct {
name string
returnNames bool
includedEndpoints []*corev1.Endpoints
expectedEndpoints map[string]bool
wantNil bool
}{
{
name: "return hostnames",
returnNames: true,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2},
expectedEndpoints: map[string]bool{"pod-1": true, "pod-2": true},
wantNil: false,
},
{
name: "return IPs",
returnNames: false,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints2, endpoints3},
expectedEndpoints: map[string]bool{"192.168.10.101": true, "192.168.10.102": true, "192.168.10.103": true},
wantNil: false,
},
{
name: "missing hostname",
returnNames: true,
includedEndpoints: []*corev1.Endpoints{endpoints1, endpoints3},
expectedEndpoints: nil,
wantNil: true,
},
}

for _, tt := range tests {
tst.Run(tt.name, func(tst *testing.T) {
ok, res := convertToEndpoints(tt.returnNames, tt.includedEndpoints...)
if tt.wantNil {
assert.Nil(tst, res)
} else {
assert.Equal(tst, tt.expectedEndpoints, res)
}
assert.Equal(tst, !tt.wantNil, ok)
})
}
}
Loading

0 comments on commit 964a652

Please sign in to comment.