From 0a654bc94a75e5b9b7bd7961af0975eb039f0db8 Mon Sep 17 00:00:00 2001 From: kyungeunni Date: Thu, 20 Jun 2024 14:08:46 +0900 Subject: [PATCH] feat: support messaging.destination.name --- .../internal/modeldecoder/v2/span_test.go | 71 +++--- input/otlp/traces.go | 12 +- input/otlp/traces_test.go | 207 ++++++++++++------ 3 files changed, 187 insertions(+), 103 deletions(-) diff --git a/input/elasticapm/internal/modeldecoder/v2/span_test.go b/input/elasticapm/internal/modeldecoder/v2/span_test.go index ff931ed9..463df398 100644 --- a/input/elasticapm/internal/modeldecoder/v2/span_test.go +++ b/input/elasticapm/internal/modeldecoder/v2/span_test.go @@ -424,38 +424,47 @@ func TestDecodeMapToSpanModel(t *testing.T) { }) t.Run("messaging", func(t *testing.T) { - attrs := map[string]interface{}{ - "messaging.system": "kafka", - "messaging.destination": "myTopic", - "net.peer.ip": "10.20.30.40", - "net.peer.port": json.Number("123"), - } - - var input span - var event modelpb.APMEvent - modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues()) - input.OTel.Attributes = attrs - input.OTel.SpanKind.Set("PRODUCER") - input.Type.Reset() - mapToSpanModel(&input, &event) + for _, attrs := range []map[string]any{ + { + "messaging.system": "kafka", + "messaging.destination": "myTopic", + "net.peer.ip": "10.20.30.40", + "net.peer.port": json.Number("123"), + }, + { + "messaging.system": "kafka", + "messaging.destination.name": "myTopic", + "net.peer.ip": "10.20.30.40", + "net.peer.port": json.Number("123"), + }, + } { - assert.Equal(t, "messaging", event.Span.Type) - assert.Equal(t, "kafka", event.Span.Subtype) - assert.Equal(t, "send", event.Span.Action) - assert.Equal(t, "PRODUCER", event.Span.Kind) - assert.Equal(t, &modelpb.Destination{ - Address: "10.20.30.40", - Port: 123, - }, event.Destination) - assert.Empty(t, cmp.Diff(&modelpb.DestinationService{ - Type: "messaging", - Name: "kafka", - Resource: "kafka/myTopic", - }, event.Span.DestinationService, protocmp.Transform())) - assert.Empty(t, cmp.Diff(&modelpb.ServiceTarget{ - Type: "kafka", - Name: "myTopic", - }, event.Service.Target, protocmp.Transform())) + var input span + var event modelpb.APMEvent + modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues()) + input.OTel.Attributes = attrs + input.OTel.SpanKind.Set("PRODUCER") + input.Type.Reset() + mapToSpanModel(&input, &event) + + assert.Equal(t, "messaging", event.Span.Type) + assert.Equal(t, "kafka", event.Span.Subtype) + assert.Equal(t, "send", event.Span.Action) + assert.Equal(t, "PRODUCER", event.Span.Kind) + assert.Equal(t, &modelpb.Destination{ + Address: "10.20.30.40", + Port: 123, + }, event.Destination) + assert.Empty(t, cmp.Diff(&modelpb.DestinationService{ + Type: "messaging", + Name: "kafka", + Resource: "kafka/myTopic", + }, event.Span.DestinationService, protocmp.Transform())) + assert.Empty(t, cmp.Diff(&modelpb.ServiceTarget{ + Type: "kafka", + Name: "myTopic", + }, event.Service.Target, protocmp.Transform())) + } }) t.Run("network", func(t *testing.T) { diff --git a/input/otlp/traces.go b/input/otlp/traces.go index 0f2bead1..9abb2d9c 100644 --- a/input/otlp/traces.go +++ b/input/otlp/traces.go @@ -439,7 +439,11 @@ func TranslateTransaction( event.Network.Carrier.Icc = stringval // messaging.* - case "message_bus.destination", semconv.AttributeMessagingDestination: + // + // messaging.destination is now called messaging.destination.name in the latest semconv + // https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging + // keep both of them for the backward compatibility + case "message_bus.destination", semconv.AttributeMessagingDestination, "messaging.destination.name": isMessaging = true messagingQueueName = stringval case semconv.AttributeMessagingSystem: @@ -784,7 +788,11 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode event.Session.Id = stringval // messaging.* - case "message_bus.destination", semconv.AttributeMessagingDestination: + // + // messaging.destination is now called messaging.destination.name in the latest semconv + // https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging + // keep both of them for the backward compatibility + case "message_bus.destination", semconv.AttributeMessagingDestination, "messaging.destination.name": message.QueueName = stringval isMessaging = true case semconv.AttributeMessagingOperation: diff --git a/input/otlp/traces_test.go b/input/otlp/traces_test.go index 56046700..84f9ecfe 100644 --- a/input/otlp/traces_test.go +++ b/input/otlp/traces_test.go @@ -677,27 +677,37 @@ func TestMessagingTransaction(t *testing.T) { } func TestMessagingSpan(t *testing.T) { - event := transformSpanWithAttributes(t, map[string]interface{}{ - "messaging.system": "kafka", - "messaging.destination": "myTopic", - "net.peer.ip": "10.20.30.40", - "net.peer.port": 123, - }, func(s ptrace.Span) { - s.SetKind(ptrace.SpanKindProducer) - }) - assert.Equal(t, "messaging", event.Span.Type) - assert.Equal(t, "kafka", event.Span.Subtype) - assert.Equal(t, "send", event.Span.Action) - assert.Empty(t, event.Labels) - assert.Equal(t, &modelpb.Destination{ - Address: "10.20.30.40", - Port: 123, - }, event.Destination) - assert.Empty(t, cmp.Diff(&modelpb.DestinationService{ - Type: "messaging", - Name: "kafka", - Resource: "kafka/myTopic", - }, event.Span.DestinationService, protocmp.Transform())) + for _, attr := range []map[string]any{ + { + "messaging.system": "kafka", + "messaging.destination": "myTopic", + "net.peer.ip": "10.20.30.40", + "net.peer.port": 123, + }, + { + "messaging.system": "kafka", + "messaging.destination.name": "myTopic", + "net.peer.ip": "10.20.30.40", + "net.peer.port": 123, + }, + } { + event := transformSpanWithAttributes(t, attr, func(s ptrace.Span) { + s.SetKind(ptrace.SpanKindProducer) + }) + assert.Equal(t, "messaging", event.Span.Type) + assert.Equal(t, "kafka", event.Span.Subtype) + assert.Equal(t, "send", event.Span.Action) + assert.Empty(t, event.Labels) + assert.Equal(t, &modelpb.Destination{ + Address: "10.20.30.40", + Port: 123, + }, event.Destination) + assert.Empty(t, cmp.Diff(&modelpb.DestinationService{ + Type: "messaging", + Name: "kafka", + Resource: "kafka/myTopic", + }, event.Span.DestinationService, protocmp.Transform())) + } } func TestMessagingSpan_DestinationResource(t *testing.T) { @@ -708,40 +718,66 @@ func TestMessagingSpan_DestinationResource(t *testing.T) { assert.Empty(t, cmp.Diff(expectedDestinationService, event.Span.DestinationService, protocmp.Transform())) } + setAttr := func(t *testing.T, baseAttr map[string]any, key string, val any) map[string]any { + t.Helper() + newAttr := make(map[string]any) + // Copy from the original map to the target map + for key, value := range baseAttr { + newAttr[key] = value + } + newAttr[key] = val + return newAttr + } + t.Run("system_destination_peerservice_peeraddress", func(t *testing.T) { - test(t, &modelpb.Destination{ - Address: "127.0.0.1", - }, &modelpb.DestinationService{ - Type: "messaging", - Name: "testsvc", - Resource: "127.0.0.1/testtopic", - }, map[string]interface{}{ - "messaging.system": "kafka", - "messaging.destination": "testtopic", - "peer.service": "testsvc", - "peer.address": "127.0.0.1", - }) + baseAttr := map[string]any{ + "messaging.system": "kafka", + "peer.service": "testsvc", + "peer.address": "127.0.0.1", + } + for _, attr := range []map[string]any{ + setAttr(t, baseAttr, "messaging.destination", "testtopic"), + setAttr(t, baseAttr, "messaging.destination.name", "testtopic"), + } { + test(t, &modelpb.Destination{ + Address: "127.0.0.1", + }, &modelpb.DestinationService{ + Type: "messaging", + Name: "testsvc", + Resource: "127.0.0.1/testtopic", + }, attr) + } }) t.Run("system_destination_peerservice", func(t *testing.T) { - test(t, nil, &modelpb.DestinationService{ - Type: "messaging", - Name: "testsvc", - Resource: "testsvc/testtopic", - }, map[string]interface{}{ - "messaging.system": "kafka", - "messaging.destination": "testtopic", - "peer.service": "testsvc", - }) + baseAttr := map[string]any{ + "messaging.system": "kafka", + "peer.service": "testsvc", + } + for _, attr := range []map[string]any{ + setAttr(t, baseAttr, "messaging.destination", "testtopic"), + setAttr(t, baseAttr, "messaging.destination.name", "testtopic"), + } { + test(t, nil, &modelpb.DestinationService{ + Type: "messaging", + Name: "testsvc", + Resource: "testsvc/testtopic", + }, attr) + } }) t.Run("system_destination", func(t *testing.T) { - test(t, nil, &modelpb.DestinationService{ - Type: "messaging", - Name: "kafka", - Resource: "kafka/testtopic", - }, map[string]interface{}{ - "messaging.system": "kafka", - "messaging.destination": "testtopic", - }) + baseAttr := map[string]any{ + "messaging.system": "kafka", + } + for _, attr := range []map[string]any{ + setAttr(t, baseAttr, "messaging.destination", "testtopic"), + setAttr(t, baseAttr, "messaging.destination.name", "testtopic"), + } { + test(t, nil, &modelpb.DestinationService{ + Type: "messaging", + Name: "kafka", + Resource: "kafka/testtopic", + }, attr) + } }) } @@ -776,6 +812,9 @@ func TestTransactionTypePriorities(t *testing.T) { attribs["messaging.destination"] = "foobar" assert.Equal(t, "messaging", transactionWithAttribs(attribs).Transaction.Type) + delete(attribs, "messaging.destination") + attribs["messaging.destination.name"] = "foobar" + assert.Equal(t, "messaging", transactionWithAttribs(attribs).Transaction.Type) } func TestSpanTypePriorities(t *testing.T) { @@ -797,6 +836,10 @@ func TestSpanTypePriorities(t *testing.T) { attribs["messaging.destination"] = "foobar" assert.Equal(t, "messaging", spanWithAttribs(attribs).Span.Type) + delete(attribs, "messaging.destination") + attribs["messaging.destination.name"] = "foobar" + assert.Equal(t, "messaging", spanWithAttribs(attribs).Span.Type) + attribs["db.statement"] = "SELECT * FROM FOO" assert.Equal(t, "db", spanWithAttribs(attribs).Span.Type) } @@ -1655,6 +1698,17 @@ func TestServiceTarget(t *testing.T) { event := transformSpanWithAttributes(t, input) assert.Empty(t, cmp.Diff(expected, event.Service.Target, protocmp.Transform())) } + + setAttr := func(t *testing.T, baseAttr map[string]any, key string, val any) map[string]any { + t.Helper() + newAttr := make(map[string]any) + // Copy from the original map to the target map + for key, value := range baseAttr { + newAttr[key] = value + } + newAttr[key] = val + return newAttr + } t.Run("db_spans_with_peerservice_system", func(t *testing.T) { test(t, &modelpb.ServiceTarget{ Type: "postgresql", @@ -1762,35 +1816,48 @@ func TestServiceTarget(t *testing.T) { }) t.Run("messaging_spans_with_peerservice_system_destination", func(t *testing.T) { - test(t, &modelpb.ServiceTarget{ - Name: "myTopic", - Type: "kafka", - }, map[string]interface{}{ - "peer.service": "testsvc", - "messaging.system": "kafka", - "messaging.destination": "myTopic", - }) + baseAttr := map[string]any{ + "peer.service": "testsvc", + "messaging.system": "kafka", + } + for _, attr := range []map[string]any{ + setAttr(t, baseAttr, "messaging.destination", "myTopic"), + setAttr(t, baseAttr, "messaging.destination.name", "myTopic"), + } { + test(t, &modelpb.ServiceTarget{ + Name: "myTopic", + Type: "kafka", + }, attr) + } }) t.Run("messaging_spans_with_peerservice_system_destination_tempdestination", func(t *testing.T) { - test(t, &modelpb.ServiceTarget{ - Name: "testsvc", - Type: "kafka", - }, map[string]interface{}{ + baseAttr := map[string]any{ "peer.service": "testsvc", "messaging.temp_destination": true, "messaging.system": "kafka", - "messaging.destination": "myTopic", - }) + } + for _, attr := range []map[string]any{ + setAttr(t, baseAttr, "messaging.destination", "myTopic"), + setAttr(t, baseAttr, "messaging.destination.name", "myTopic"), + } { + test(t, &modelpb.ServiceTarget{ + Name: "testsvc", + Type: "kafka", + }, attr) + } }) t.Run("messaging_spans_with_destination", func(t *testing.T) { - test(t, &modelpb.ServiceTarget{ - Name: "myTopic", - Type: "messaging", - }, map[string]interface{}{ - "messaging.destination": "myTopic", - }) + for _, attr := range []map[string]any{ + {"messaging.destination": "myTopic"}, + {"messaging.destination.name": "myTopic"}, + } { + test(t, &modelpb.ServiceTarget{ + Name: "myTopic", + Type: "messaging", + }, attr) + } }) }