Skip to content

Commit

Permalink
Fix OTEL_RESOURCE_ATTRIBUTES variable (#1085)
Browse files Browse the repository at this point in the history
Co-authored-by: Marc Tuduri <[email protected]>
  • Loading branch information
rafaelroquetto and marctc authored Aug 7, 2024
1 parent 5d75be2 commit 29c0403
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 30 deletions.
3 changes: 2 additions & 1 deletion pkg/export/alloy/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error)
if err != nil {
slog.Error("error fetching user defined attributes", "error", err)
}
envResourceAttrs := otel.ResourceAttrsFromEnv()

for spans := range in {
for i := range spans {
Expand All @@ -49,7 +50,7 @@ func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error)
}

for _, tc := range tr.cfg.Traces {
traces := otel.GenerateTraces(span, tr.hostID, traceAttrs)
traces := otel.GenerateTraces(span, tr.hostID, traceAttrs, envResourceAttrs)
err := tc.ConsumeTraces(tr.ctx, traces)
if err != nil {
slog.Error("error sending trace to consumer", "error", err)
Expand Down
58 changes: 46 additions & 12 deletions pkg/export/otel/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
envProtocol = "OTEL_EXPORTER_OTLP_PROTOCOL"
envHeaders = "OTEL_EXPORTER_OTLP_HEADERS"
envTracesHeaders = "OTEL_EXPORTER_OTLP_TRACES_HEADERS"
envResourceAttrs = "OTEL_RESOURCE_ATTRIBUTES"
)

// Buckets defines the histograms bucket boundaries, and allows users to
Expand Down Expand Up @@ -86,7 +87,6 @@ func getResourceAttrs(hostID string, service *svc.ID) []attribute.KeyValue {
for k, v := range service.Metadata {
attrs = append(attrs, k.OTEL().String(v))
}

return attrs
}

Expand Down Expand Up @@ -333,24 +333,58 @@ func (l *LogrAdaptor) WithName(name string) logr.LogSink {
return &LogrAdaptor{inner: l.inner.With("name", name)}
}

// headersFromEnv returns a map of the headers as specified by the
// OTEL_EXPORTER_OTLP_*HEADERS group of variables. This is,
// a comma-separated list of key=values. For example:
// api-key=key,other-config-value=value
func headersFromEnv(varName string) map[string]string {
headersStr, ok := os.LookupEnv(varName)
headers := map[string]string{}

addToMap := func(k string, v string) {
headers[k] = v
}

parseOTELEnvVar(varName, addToMap)

return headers
}

type varHandler func(k string, v string)

// parseOTELEnvVar parses a comma separated group of variables
// in the format specified by OTEL_EXPORTER_OTLP_*HEADERS or
// OTEL_RESOURCE_ATTRIBUTES, i.e. a comma-separated list of
// key=values. For example: api-key=key,other-config-value=value
// The values are passed as parameters to the handler function
func parseOTELEnvVar(varName string, handler varHandler) {
envVar, ok := os.LookupEnv(varName)

if !ok {
return nil
return
}
headers := map[string]string{}

// split all the comma-separated key=value entries
for _, entry := range strings.Split(headersStr, ",") {
for _, entry := range strings.Split(envVar, ",") {
// split only by the first '=' appearance, as values might
// have base64 '=' padding symbols
keyVal := strings.SplitN(entry, "=", 2)
if len(keyVal) > 1 {
headers[strings.TrimSpace(keyVal[0])] = strings.TrimSpace(keyVal[1])
if len(keyVal) < 2 {
continue
}

k := strings.TrimSpace(keyVal[0])
v := strings.TrimSpace(keyVal[1])

if k == "" || v == "" {
continue
}

handler(strings.TrimSpace(keyVal[0]), strings.TrimSpace(keyVal[1]))
}
return headers
}

func ResourceAttrsFromEnv() []attribute.KeyValue {
var otelResourceAttrs []attribute.KeyValue
apply := func(k string, v string) {
otelResourceAttrs = append(otelResourceAttrs, attribute.String(k, v))
}

parseOTELEnvVar(envResourceAttrs, apply)
return otelResourceAttrs
}
61 changes: 61 additions & 0 deletions pkg/export/otel/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package otel

import (
"fmt"
"os"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -84,3 +86,62 @@ func TestOtlpOptions_AsTraceGRPC(t *testing.T) {
})
}
}

func TestParseOTELEnvVar(t *testing.T) {
type testCase struct {
envVar string
expected map[string]string
}

testCases := []testCase{
{envVar: "foo=bar", expected: map[string]string{"foo": "bar"}},
{envVar: "foo=bar,", expected: map[string]string{"foo": "bar"}},
{envVar: "foo=bar,baz", expected: map[string]string{"foo": "bar"}},
{envVar: "foo=bar,baz=baz", expected: map[string]string{"foo": "bar", "baz": "baz"}},
{envVar: "foo=bar,baz=baz ", expected: map[string]string{"foo": "bar", "baz": "baz"}},
{envVar: " foo=bar, baz=baz ", expected: map[string]string{"foo": "bar", "baz": "baz"}},
{envVar: " foo = bar , baz =baz ", expected: map[string]string{"foo": "bar", "baz": "baz"}},
{envVar: " foo = bar , baz =baz= ", expected: map[string]string{"foo": "bar", "baz": "baz="}},
{envVar: ",a=b , c=d,=", expected: map[string]string{"a": "b", "c": "d"}},
{envVar: "=", expected: map[string]string{}},
{envVar: "====", expected: map[string]string{}},
{envVar: "a====b", expected: map[string]string{"a": "===b"}},
{envVar: "", expected: map[string]string{}},
}

const dummyVar = "foo"

for _, tc := range testCases {
t.Run(fmt.Sprint(tc), func(t *testing.T) {
actual := map[string]string{}

apply := func(k string, v string) {
actual[k] = v
}

err := os.Setenv(dummyVar, tc.envVar)

assert.NoError(t, err)

parseOTELEnvVar(dummyVar, apply)

assert.True(t, reflect.DeepEqual(actual, tc.expected))

err = os.Unsetenv(dummyVar)

assert.NoError(t, err)
})
}
}

func TestParseOTELEnvVar_nil(t *testing.T) {
actual := map[string]string{}

apply := func(k string, v string) {
actual[k] = v
}

parseOTELEnvVar("NOT_SET_VAR", apply)

assert.True(t, reflect.DeepEqual(actual, map[string]string{}))
}
3 changes: 2 additions & 1 deletion pkg/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ func (mr *MetricsReporter) setupGraphMeters(m *Metrics, meter instrument.Meter)
func (mr *MetricsReporter) newMetricSet(service *svc.ID) (*Metrics, error) {
mlog := mlog().With("service", service)
mlog.Debug("creating new Metrics reporter")
resources := resource.NewWithAttributes(semconv.SchemaURL, getAppResourceAttrs(mr.hostID, service)...)
resourceAttributes := append(getAppResourceAttrs(mr.hostID, service), ResourceAttrsFromEnv()...)
resources := resource.NewWithAttributes(semconv.SchemaURL, resourceAttributes...)

opts := []metric.Option{
metric.WithResource(resources),
Expand Down
31 changes: 31 additions & 0 deletions pkg/export/otel/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,37 @@ func TestAppMetrics_ByInstrumentation(t *testing.T) {

}

func TestAppMetrics_ResourceAttributes(t *testing.T) {
defer restoreEnvAfterExecution()()

require.NoError(t, os.Setenv(envResourceAttrs, "deployment.environment=production,source=upstream.beyla"))

ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

otlp, err := collector.Start(ctx)
require.NoError(t, err)

now := syncedClock{now: time.Now()}
timeNow = now.Now

otelExporter := makeExporter(ctx, t, []string{instrumentations.InstrumentationHTTP}, otlp)
require.NoError(t, err)

metrics := make(chan []request.Span, 1)
go otelExporter(metrics)

metrics <- []request.Span{
{ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200},
}

res := readNChan(t, otlp.Records(), 1, timeout)
assert.Len(t, res, 1)
attributes := res[0].ResourceAttributes
assert.Equal(t, "production", attributes["deployment.environment"])
assert.Equal(t, "upstream.beyla", attributes["source"])
}

func TestMetricsConfig_Enabled(t *testing.T) {
assert.True(t, (&MetricsConfig{Features: []string{FeatureApplication, FeatureNetwork}, CommonEndpoint: "foo"}).Enabled())
assert.True(t, (&MetricsConfig{Features: []string{FeatureApplication}, MetricsEndpoint: "foo"}).Enabled())
Expand Down
14 changes: 9 additions & 5 deletions pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,15 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
return
}

envResourceAttrs := ResourceAttrsFromEnv()

for spans := range in {
for i := range spans {
span := &spans[i]
if span.IgnoreSpan == request.IgnoreTraces || !tr.acceptSpan(span) {
continue
}
traces := GenerateTraces(span, tr.ctxInfo.HostID, traceAttrs)
traces := GenerateTraces(span, tr.ctxInfo.HostID, traceAttrs, envResourceAttrs)
err := exp.ConsumeTraces(tr.ctx, traces)
if err != nil {
slog.Error("error sending trace to consumer", "error", err)
Expand Down Expand Up @@ -363,16 +365,18 @@ func getRetrySettings(cfg TracesConfig) configretry.BackOffConfig {
}

// GenerateTraces creates a ptrace.Traces from a request.Span
func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}) ptrace.Traces {
func GenerateTraces(span *request.Span, hostID string, userAttrs map[attr.Name]struct{}, envResourceAttrs []attribute.KeyValue) ptrace.Traces {
t := span.Timings()
start := spanStartTime(t)
hasSubSpans := t.Start.After(start)
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ss := rs.ScopeSpans().AppendEmpty()
resourceAttrs := attrsToMap(getAppResourceAttrs(hostID, &span.ServiceID))
resourceAttrs.PutStr(string(semconv.OTelLibraryNameKey), reporterName)
resourceAttrs.CopyTo(rs.Resource().Attributes())
resourceAttrs := getAppResourceAttrs(hostID, &span.ServiceID)
resourceAttrs = append(resourceAttrs, envResourceAttrs...)
resourceAttrsMap := attrsToMap(resourceAttrs)
resourceAttrsMap.PutStr(string(semconv.OTelLibraryNameKey), reporterName)
resourceAttrsMap.CopyTo(rs.Resource().Attributes())

traceID := pcommon.TraceID(span.TraceID)
spanID := pcommon.SpanID(randomSpanID())
Expand Down
34 changes: 23 additions & 11 deletions pkg/export/otel/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestGenerateTraces(t *testing.T) {
TraceID: traceID,
SpanID: spanID,
}
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{})
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestGenerateTraces(t *testing.T) {
SpanID: spanID,
TraceID: traceID,
}
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{})
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand Down Expand Up @@ -410,7 +410,7 @@ func TestGenerateTraces(t *testing.T) {
Route: "/test",
Status: 200,
}
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{})
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestGenerateTraces(t *testing.T) {
SpanID: spanID,
TraceID: traceID,
}
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{})
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -472,7 +472,7 @@ func TestGenerateTraces(t *testing.T) {
ParentSpanID: parentSpanID,
TraceID: traceID,
}
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{})
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -492,7 +492,7 @@ func TestGenerateTraces(t *testing.T) {
Method: "GET",
Route: "/test",
}
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{})
traces := GenerateTraces(span, "host-id", map[attr.Name]struct{}{}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -508,7 +508,7 @@ func TestGenerateTraces(t *testing.T) {
func TestGenerateTracesAttributes(t *testing.T) {
t.Run("test SQL trace generation, no statement", func(t *testing.T) {
span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"")
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{})
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -529,7 +529,7 @@ func TestGenerateTracesAttributes(t *testing.T) {

t.Run("test SQL trace generation, unknown attribute", func(t *testing.T) {
span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"")
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{"db.operation.name": {}})
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{"db.operation.name": {}}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -550,7 +550,7 @@ func TestGenerateTracesAttributes(t *testing.T) {

t.Run("test SQL trace generation, unknown attribute", func(t *testing.T) {
span := makeSQLRequestSpan("SELECT password FROM credentials WHERE username=\"bill\"")
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{attr.DBQueryText: {}})
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{attr.DBQueryText: {}}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -570,7 +570,7 @@ func TestGenerateTracesAttributes(t *testing.T) {
})
t.Run("test Kafka trace generation", func(t *testing.T) {
span := request.Span{Type: request.EventTypeKafkaClient, Method: "process", Path: "important-topic", OtherNamespace: "test"}
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{})
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{}, []attribute.KeyValue{})

assert.Equal(t, 1, traces.ResourceSpans().Len())
assert.Equal(t, 1, traces.ResourceSpans().At(0).ScopeSpans().Len())
Expand All @@ -586,6 +586,18 @@ func TestGenerateTracesAttributes(t *testing.T) {
ensureTraceStrAttr(t, attrs, semconv.MessagingClientIDKey, "test")

})
t.Run("test env var resource attributes", func(t *testing.T) {
defer restoreEnvAfterExecution()()
require.NoError(t, os.Setenv(envResourceAttrs, "deployment.environment=productions,source.upstream=beyla"))
span := request.Span{Type: request.EventTypeHTTP, Method: "GET", Route: "/test", Status: 200}
traces := GenerateTraces(&span, "host-id", map[attr.Name]struct{}{}, ResourceAttrsFromEnv())

assert.Equal(t, 1, traces.ResourceSpans().Len())
rs := traces.ResourceSpans().At(0)
attrs := rs.Resource().Attributes()
ensureTraceStrAttr(t, attrs, attribute.Key("deployment.environment"), "productions")
ensureTraceStrAttr(t, attrs, attribute.Key("source.upstream"), "beyla")
})
}

func TestAttrsToMap(t *testing.T) {
Expand Down Expand Up @@ -1143,7 +1155,7 @@ func generateTracesForSpans(t *testing.T, tr *tracesOTELReceiver, spans []reques
if span.IgnoreSpan == request.IgnoreTraces || !tr.acceptSpan(span) {
continue
}
res = append(res, GenerateTraces(span, "host-id", traceAttrs))
res = append(res, GenerateTraces(span, "host-id", traceAttrs, []attribute.KeyValue{}))
}

return res
Expand Down

0 comments on commit 29c0403

Please sign in to comment.