Skip to content

Commit

Permalink
feat: support messaging.destination.name
Browse files Browse the repository at this point in the history
  • Loading branch information
kyungeunni committed Jun 20, 2024
1 parent dae54a3 commit 0a654bc
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 103 deletions.
71 changes: 40 additions & 31 deletions input/elasticapm/internal/modeldecoder/v2/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,38 +424,47 @@ func TestDecodeMapToSpanModel(t *testing.T) {
})

t.Run("messaging", func(t *testing.T) {
attrs := map[string]interface{}{
"messaging.system": "kafka",
"messaging.destination": "myTopic",
"net.peer.ip": "10.20.30.40",
"net.peer.port": json.Number("123"),
}

var input span
var event modelpb.APMEvent
modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues())
input.OTel.Attributes = attrs
input.OTel.SpanKind.Set("PRODUCER")
input.Type.Reset()
mapToSpanModel(&input, &event)
for _, attrs := range []map[string]any{
{
"messaging.system": "kafka",
"messaging.destination": "myTopic",
"net.peer.ip": "10.20.30.40",
"net.peer.port": json.Number("123"),
},
{
"messaging.system": "kafka",
"messaging.destination.name": "myTopic",
"net.peer.ip": "10.20.30.40",
"net.peer.port": json.Number("123"),
},
} {

assert.Equal(t, "messaging", event.Span.Type)
assert.Equal(t, "kafka", event.Span.Subtype)
assert.Equal(t, "send", event.Span.Action)
assert.Equal(t, "PRODUCER", event.Span.Kind)
assert.Equal(t, &modelpb.Destination{
Address: "10.20.30.40",
Port: 123,
}, event.Destination)
assert.Empty(t, cmp.Diff(&modelpb.DestinationService{
Type: "messaging",
Name: "kafka",
Resource: "kafka/myTopic",
}, event.Span.DestinationService, protocmp.Transform()))
assert.Empty(t, cmp.Diff(&modelpb.ServiceTarget{
Type: "kafka",
Name: "myTopic",
}, event.Service.Target, protocmp.Transform()))
var input span
var event modelpb.APMEvent
modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues())
input.OTel.Attributes = attrs
input.OTel.SpanKind.Set("PRODUCER")
input.Type.Reset()
mapToSpanModel(&input, &event)

assert.Equal(t, "messaging", event.Span.Type)
assert.Equal(t, "kafka", event.Span.Subtype)
assert.Equal(t, "send", event.Span.Action)
assert.Equal(t, "PRODUCER", event.Span.Kind)
assert.Equal(t, &modelpb.Destination{
Address: "10.20.30.40",
Port: 123,
}, event.Destination)
assert.Empty(t, cmp.Diff(&modelpb.DestinationService{
Type: "messaging",
Name: "kafka",
Resource: "kafka/myTopic",
}, event.Span.DestinationService, protocmp.Transform()))
assert.Empty(t, cmp.Diff(&modelpb.ServiceTarget{
Type: "kafka",
Name: "myTopic",
}, event.Service.Target, protocmp.Transform()))
}
})

t.Run("network", func(t *testing.T) {
Expand Down
12 changes: 10 additions & 2 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,11 @@ func TranslateTransaction(
event.Network.Carrier.Icc = stringval

// messaging.*
case "message_bus.destination", semconv.AttributeMessagingDestination:
//
// messaging.destination is now called messaging.destination.name in the latest semconv
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging
// keep both of them for the backward compatibility
case "message_bus.destination", semconv.AttributeMessagingDestination, "messaging.destination.name":
isMessaging = true
messagingQueueName = stringval
case semconv.AttributeMessagingSystem:
Expand Down Expand Up @@ -784,7 +788,11 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
event.Session.Id = stringval

// messaging.*
case "message_bus.destination", semconv.AttributeMessagingDestination:
//
// messaging.destination is now called messaging.destination.name in the latest semconv
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging
// keep both of them for the backward compatibility
case "message_bus.destination", semconv.AttributeMessagingDestination, "messaging.destination.name":
message.QueueName = stringval
isMessaging = true
case semconv.AttributeMessagingOperation:
Expand Down
207 changes: 137 additions & 70 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,27 +677,37 @@ func TestMessagingTransaction(t *testing.T) {
}

func TestMessagingSpan(t *testing.T) {
event := transformSpanWithAttributes(t, map[string]interface{}{
"messaging.system": "kafka",
"messaging.destination": "myTopic",
"net.peer.ip": "10.20.30.40",
"net.peer.port": 123,
}, func(s ptrace.Span) {
s.SetKind(ptrace.SpanKindProducer)
})
assert.Equal(t, "messaging", event.Span.Type)
assert.Equal(t, "kafka", event.Span.Subtype)
assert.Equal(t, "send", event.Span.Action)
assert.Empty(t, event.Labels)
assert.Equal(t, &modelpb.Destination{
Address: "10.20.30.40",
Port: 123,
}, event.Destination)
assert.Empty(t, cmp.Diff(&modelpb.DestinationService{
Type: "messaging",
Name: "kafka",
Resource: "kafka/myTopic",
}, event.Span.DestinationService, protocmp.Transform()))
for _, attr := range []map[string]any{
{
"messaging.system": "kafka",
"messaging.destination": "myTopic",
"net.peer.ip": "10.20.30.40",
"net.peer.port": 123,
},
{
"messaging.system": "kafka",
"messaging.destination.name": "myTopic",
"net.peer.ip": "10.20.30.40",
"net.peer.port": 123,
},
} {
event := transformSpanWithAttributes(t, attr, func(s ptrace.Span) {
s.SetKind(ptrace.SpanKindProducer)
})
assert.Equal(t, "messaging", event.Span.Type)
assert.Equal(t, "kafka", event.Span.Subtype)
assert.Equal(t, "send", event.Span.Action)
assert.Empty(t, event.Labels)
assert.Equal(t, &modelpb.Destination{
Address: "10.20.30.40",
Port: 123,
}, event.Destination)
assert.Empty(t, cmp.Diff(&modelpb.DestinationService{
Type: "messaging",
Name: "kafka",
Resource: "kafka/myTopic",
}, event.Span.DestinationService, protocmp.Transform()))
}
}

func TestMessagingSpan_DestinationResource(t *testing.T) {
Expand All @@ -708,40 +718,66 @@ func TestMessagingSpan_DestinationResource(t *testing.T) {
assert.Empty(t, cmp.Diff(expectedDestinationService, event.Span.DestinationService, protocmp.Transform()))
}

setAttr := func(t *testing.T, baseAttr map[string]any, key string, val any) map[string]any {
t.Helper()
newAttr := make(map[string]any)
// Copy from the original map to the target map
for key, value := range baseAttr {
newAttr[key] = value
}
newAttr[key] = val
return newAttr
}

t.Run("system_destination_peerservice_peeraddress", func(t *testing.T) {
test(t, &modelpb.Destination{
Address: "127.0.0.1",
}, &modelpb.DestinationService{
Type: "messaging",
Name: "testsvc",
Resource: "127.0.0.1/testtopic",
}, map[string]interface{}{
"messaging.system": "kafka",
"messaging.destination": "testtopic",
"peer.service": "testsvc",
"peer.address": "127.0.0.1",
})
baseAttr := map[string]any{
"messaging.system": "kafka",
"peer.service": "testsvc",
"peer.address": "127.0.0.1",
}
for _, attr := range []map[string]any{
setAttr(t, baseAttr, "messaging.destination", "testtopic"),
setAttr(t, baseAttr, "messaging.destination.name", "testtopic"),
} {
test(t, &modelpb.Destination{
Address: "127.0.0.1",
}, &modelpb.DestinationService{
Type: "messaging",
Name: "testsvc",
Resource: "127.0.0.1/testtopic",
}, attr)
}
})
t.Run("system_destination_peerservice", func(t *testing.T) {
test(t, nil, &modelpb.DestinationService{
Type: "messaging",
Name: "testsvc",
Resource: "testsvc/testtopic",
}, map[string]interface{}{
"messaging.system": "kafka",
"messaging.destination": "testtopic",
"peer.service": "testsvc",
})
baseAttr := map[string]any{
"messaging.system": "kafka",
"peer.service": "testsvc",
}
for _, attr := range []map[string]any{
setAttr(t, baseAttr, "messaging.destination", "testtopic"),
setAttr(t, baseAttr, "messaging.destination.name", "testtopic"),
} {
test(t, nil, &modelpb.DestinationService{
Type: "messaging",
Name: "testsvc",
Resource: "testsvc/testtopic",
}, attr)
}
})
t.Run("system_destination", func(t *testing.T) {
test(t, nil, &modelpb.DestinationService{
Type: "messaging",
Name: "kafka",
Resource: "kafka/testtopic",
}, map[string]interface{}{
"messaging.system": "kafka",
"messaging.destination": "testtopic",
})
baseAttr := map[string]any{
"messaging.system": "kafka",
}
for _, attr := range []map[string]any{
setAttr(t, baseAttr, "messaging.destination", "testtopic"),
setAttr(t, baseAttr, "messaging.destination.name", "testtopic"),
} {
test(t, nil, &modelpb.DestinationService{
Type: "messaging",
Name: "kafka",
Resource: "kafka/testtopic",
}, attr)
}
})
}

Expand Down Expand Up @@ -776,6 +812,9 @@ func TestTransactionTypePriorities(t *testing.T) {

attribs["messaging.destination"] = "foobar"
assert.Equal(t, "messaging", transactionWithAttribs(attribs).Transaction.Type)
delete(attribs, "messaging.destination")
attribs["messaging.destination.name"] = "foobar"
assert.Equal(t, "messaging", transactionWithAttribs(attribs).Transaction.Type)
}

func TestSpanTypePriorities(t *testing.T) {
Expand All @@ -797,6 +836,10 @@ func TestSpanTypePriorities(t *testing.T) {
attribs["messaging.destination"] = "foobar"
assert.Equal(t, "messaging", spanWithAttribs(attribs).Span.Type)

delete(attribs, "messaging.destination")
attribs["messaging.destination.name"] = "foobar"
assert.Equal(t, "messaging", spanWithAttribs(attribs).Span.Type)

attribs["db.statement"] = "SELECT * FROM FOO"
assert.Equal(t, "db", spanWithAttribs(attribs).Span.Type)
}
Expand Down Expand Up @@ -1655,6 +1698,17 @@ func TestServiceTarget(t *testing.T) {
event := transformSpanWithAttributes(t, input)
assert.Empty(t, cmp.Diff(expected, event.Service.Target, protocmp.Transform()))
}

setAttr := func(t *testing.T, baseAttr map[string]any, key string, val any) map[string]any {
t.Helper()
newAttr := make(map[string]any)
// Copy from the original map to the target map
for key, value := range baseAttr {
newAttr[key] = value
}
newAttr[key] = val
return newAttr
}
t.Run("db_spans_with_peerservice_system", func(t *testing.T) {
test(t, &modelpb.ServiceTarget{
Type: "postgresql",
Expand Down Expand Up @@ -1762,35 +1816,48 @@ func TestServiceTarget(t *testing.T) {
})

t.Run("messaging_spans_with_peerservice_system_destination", func(t *testing.T) {
test(t, &modelpb.ServiceTarget{
Name: "myTopic",
Type: "kafka",
}, map[string]interface{}{
"peer.service": "testsvc",
"messaging.system": "kafka",
"messaging.destination": "myTopic",
})
baseAttr := map[string]any{
"peer.service": "testsvc",
"messaging.system": "kafka",
}
for _, attr := range []map[string]any{
setAttr(t, baseAttr, "messaging.destination", "myTopic"),
setAttr(t, baseAttr, "messaging.destination.name", "myTopic"),
} {
test(t, &modelpb.ServiceTarget{
Name: "myTopic",
Type: "kafka",
}, attr)
}
})

t.Run("messaging_spans_with_peerservice_system_destination_tempdestination", func(t *testing.T) {
test(t, &modelpb.ServiceTarget{
Name: "testsvc",
Type: "kafka",
}, map[string]interface{}{
baseAttr := map[string]any{
"peer.service": "testsvc",
"messaging.temp_destination": true,
"messaging.system": "kafka",
"messaging.destination": "myTopic",
})
}
for _, attr := range []map[string]any{
setAttr(t, baseAttr, "messaging.destination", "myTopic"),
setAttr(t, baseAttr, "messaging.destination.name", "myTopic"),
} {
test(t, &modelpb.ServiceTarget{
Name: "testsvc",
Type: "kafka",
}, attr)
}
})

t.Run("messaging_spans_with_destination", func(t *testing.T) {
test(t, &modelpb.ServiceTarget{
Name: "myTopic",
Type: "messaging",
}, map[string]interface{}{
"messaging.destination": "myTopic",
})
for _, attr := range []map[string]any{
{"messaging.destination": "myTopic"},
{"messaging.destination.name": "myTopic"},
} {
test(t, &modelpb.ServiceTarget{
Name: "myTopic",
Type: "messaging",
}, attr)
}
})
}

Expand Down

0 comments on commit 0a654bc

Please sign in to comment.