Skip to content

Commit

Permalink
[processor/resource][processor/attributes] Option to read client addr…
Browse files Browse the repository at this point in the history
…ess (#34048)

We have options to extract headers and auth info from the client. This
change adds an option to extract client address by specifying
`client.address` value in the `from_context` field.

The code to extract address from the client info is taken from the
k8sattributes receiver.

Fixes
#34051

---------

Co-authored-by: Curtis Robert <[email protected]>
  • Loading branch information
dmitryax and crobert-1 authored Jul 16, 2024
1 parent 00e4501 commit d3ba8ea
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 36 deletions.
22 changes: 22 additions & 0 deletions .chloggen/attribute-from-client-address.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/resource, processor/attributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add an option to extract value from a client address by specifying `client.address` value in the `from_context` field.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34051]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 7 additions & 2 deletions internal/coreinternal/attraction/attraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/clientutil"
)

// Settings specifies the processor settings.
Expand Down Expand Up @@ -341,14 +343,17 @@ func (ap *AttrProc) Process(ctx context.Context, logger *zap.Logger, attrs pcomm

func getAttributeValueFromContext(ctx context.Context, key string) (pcommon.Value, bool) {
const (
metadataPrefix = "metadata."
authPrefix = "auth."
metadataPrefix = "metadata."
authPrefix = "auth."
clientAddressKey = "client.address"
)

ci := client.FromContext(ctx)
var vals []string

switch {
case key == clientAddressKey:
vals = []string{clientutil.Address(ci)}
case strings.HasPrefix(key, metadataPrefix):
mdKey := strings.TrimPrefix(key, metadataPrefix)
vals = ci.Metadata.Get(mdKey)
Expand Down
10 changes: 10 additions & 0 deletions internal/coreinternal/attraction/attraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
"net"
"regexp"
"testing"

Expand Down Expand Up @@ -958,6 +959,9 @@ func TestFromContext(t *testing.T) {
Auth: mockInfoAuth{
"source_auth_val": "auth_val",
},
Addr: &net.IPAddr{
IP: net.IPv4(192, 168, 1, 1),
},
})

testCases := []struct {
Expand Down Expand Up @@ -1008,6 +1012,12 @@ func TestFromContext(t *testing.T) {
expectedAttributes: map[string]any{},
action: &ActionKeyValue{Key: "dest", FromContext: "auth.unknown_val", Action: INSERT},
},
{
name: "with_address",
ctx: mdCtx,
expectedAttributes: map[string]any{"dest": "192.168.1.1"},
action: &ActionKeyValue{Key: "dest", FromContext: "client.address", Action: INSERT},
},
}

for _, tc := range testCases {
Expand Down
40 changes: 40 additions & 0 deletions internal/coreinternal/clientutil/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package clientutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/clientutil"

import (
"net"
"strings"

"go.opentelemetry.io/collector/client"
)

// Address returns the address of the client connecting to the collector.
func Address(client client.Info) string {
if client.Addr == nil {
return ""
}
switch addr := client.Addr.(type) {
case *net.UDPAddr:
return addr.IP.String()
case *net.TCPAddr:
return addr.IP.String()
case *net.IPAddr:
return addr.IP.String()
}

// If this is not a known address type, check for known "untyped" formats.
// 1.1.1.1:<port>

lastColonIndex := strings.LastIndex(client.Addr.String(), ":")
if lastColonIndex != -1 {
ipString := client.Addr.String()[:lastColonIndex]
ip := net.ParseIP(ipString)
if ip != nil {
return ip.String()
}
}

return client.Addr.String()
}
79 changes: 79 additions & 0 deletions internal/coreinternal/clientutil/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package clientutil

import (
"net"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/client"
)

type fakeAddr string

func (s fakeAddr) String() string {
return string(s)
}

func (fakeAddr) Network() string {
return "tcp"
}

func TestAddress(t *testing.T) {
tests := []struct {
name string
client client.Info
want string
}{
{
name: "UDPAddr",
client: client.Info{
Addr: &net.UDPAddr{
IP: net.IPv4(192, 0, 2, 1),
Port: 1234,
},
},
want: "192.0.2.1",
},
{
name: "TCPAddr",
client: client.Info{
Addr: &net.TCPAddr{
IP: net.IPv4(192, 0, 2, 2),
Port: 1234,
},
},
want: "192.0.2.2",
},
{
name: "IPAddr",
client: client.Info{
Addr: &net.IPAddr{
IP: net.IPv4(192, 0, 2, 3),
},
},
want: "192.0.2.3",
},
{
name: "fake_addr_with_port",
client: client.Info{
Addr: fakeAddr("1.1.1.1:3200"),
},
want: "1.1.1.1",
},
{
name: "fake_addr_without_port",
client: client.Info{
Addr: fakeAddr("1.1.1.1"),
},
want: "1.1.1.1",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, Address(tt.client))
})
}
}
3 changes: 2 additions & 1 deletion processor/attributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ For the actions `insert`, `update` and `upsert`,
# If the key is prefixed with `metadata.`, the values are searched
# in the receiver's transport protocol additional information like gRPC Metadata or HTTP Headers.
# If the key is prefixed with `auth.`, the values are searched
# in the authentication information set by the server authenticator.
# in the authentication information set by the server authenticator.
# Refer to the server authenticator's documentation part of your pipeline for more information about which attributes are available.
# If the key is `client.address`, the value will be set to the client address.
# If the key doesn't exist, no action is performed.
# If the key has multiple values the values will be joined with `;` separator.
from_context: <other key>
Expand Down
9 changes: 9 additions & 0 deletions processor/k8sattributesprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21.0
require (
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.104.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest v0.104.0
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -132,7 +133,15 @@ retract (
v0.65.0
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest => ../../internal/k8stest

// ambiguous import: found package cloud.google.com/go/compute/metadata in multiple modules
replace cloud.google.com/go v0.54.0 => cloud.google.com/go v0.110.10

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
36 changes: 3 additions & 33 deletions processor/k8sattributesprocessor/pod_association.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr
import (
"context"
"net"
"strings"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/clientutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube"
)

Expand All @@ -22,7 +22,7 @@ func extractPodID(ctx context.Context, attrs pcommon.Map, associations []kube.As
return extractPodIDNoAssociations(ctx, attrs)
}

connectionIP := connectionIP(ctx)
connectionIP := clientutil.Address(client.FromContext(ctx))
for _, asso := range associations {
skip := false

Expand Down Expand Up @@ -81,7 +81,7 @@ func extractPodIDNoAssociations(ctx context.Context, attrs pcommon.Map) kube.Pod
}
}

connectionIP := connectionIP(ctx)
connectionIP := clientutil.Address(client.FromContext(ctx))
if connectionIP != "" {
return kube.PodIdentifier{
kube.PodIdentifierAttributeFromConnection(connectionIP),
Expand All @@ -98,36 +98,6 @@ func extractPodIDNoAssociations(ctx context.Context, attrs pcommon.Map) kube.Pod
return kube.PodIdentifier{}
}

func connectionIP(ctx context.Context) string {
c := client.FromContext(ctx)
if c.Addr == nil {
return ""
}
switch addr := c.Addr.(type) {
case *net.UDPAddr:
return addr.IP.String()
case *net.TCPAddr:
return addr.IP.String()
case *net.IPAddr:
return addr.IP.String()
}

// If this is not a known address type, check for known "untyped" formats.
// 1.1.1.1:<port>

lastColonIndex := strings.LastIndex(c.Addr.String(), ":")
if lastColonIndex != -1 {
ipString := c.Addr.String()[:lastColonIndex]
ip := net.ParseIP(ipString)
if ip != nil {
return ip.String()
}
}

return c.Addr.String()

}

func stringAttributeFromMap(attrs pcommon.Map, key string) string {
if val, ok := attrs.Get(key); ok {
if val.Type() == pcommon.ValueTypeStr {
Expand Down

0 comments on commit d3ba8ea

Please sign in to comment.