From 98da843d6c1faadffd7dec17ee762710eb279d71 Mon Sep 17 00:00:00 2001 From: Jared Tan Date: Mon, 23 Oct 2023 16:02:32 +0800 Subject: [PATCH] [exporter/elasticsearch] add missing scope info in span/log attributes (#27288) **Description:** **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27282 **Testing:** **Documentation:** --------- Signed-off-by: Jared Tan Signed-off-by: Dominik Rosiek Signed-off-by: Raphael Silva Signed-off-by: Alex Boten Signed-off-by: Pavol Loffay Co-authored-by: Antoine Toulme Co-authored-by: Raj Nishtala <113392743+rnishtala-sumo@users.noreply.github.com> Co-authored-by: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com> Co-authored-by: Povilas Versockas Co-authored-by: Priyanshu Raj <55045459+rpriyanshu9@users.noreply.github.com> Co-authored-by: sakulali Co-authored-by: Raphael Philipe Mendes da Silva Co-authored-by: Anthony Mirabella Co-authored-by: Yotam loewenbach <48534558+yotamloe@users.noreply.github.com> Co-authored-by: Alex Boten Co-authored-by: bryan-aguilar <46550959+bryan-aguilar@users.noreply.github.com> Co-authored-by: Daniel Jaglowski Co-authored-by: Jina Jain Co-authored-by: Yang Song Co-authored-by: Dmitrii Anoshin Co-authored-by: Curtis Robert <92119472+crobert-1@users.noreply.github.com> Co-authored-by: Abhishek Saharn <102726227+asaharn@users.noreply.github.com> Co-authored-by: Ramachandran A G Co-authored-by: Ziqi Zhao Co-authored-by: Ramachandran A G <106139410+ag-ramachandran@users.noreply.github.com> Co-authored-by: Faith Chikwekwe Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Co-authored-by: Daniel Kuiper <44123852+kuiperda@users.noreply.github.com> Co-authored-by: Carlos Castro Co-authored-by: Christian Co-authored-by: ArchangelSDY Co-authored-by: Pavol Loffay Co-authored-by: Paulo Janotti Co-authored-by: Nathan Burke Co-authored-by: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Co-authored-by: shalper2 <99686388+shalper2@users.noreply.github.com> Co-authored-by: OpenTelemetry Bot <107717825+opentelemetrybot@users.noreply.github.com> Co-authored-by: Martin Majlis <122797378+martin-majlis-s1@users.noreply.github.com> Co-authored-by: hovavza <147598197+hovavza@users.noreply.github.com> --- ...porter_add_missing_scope_info_in_span.yaml | 27 +++++++++++++++++++ cmd/otelcontribcol/exporters_test.go | 1 + .../elasticsearchexporter/logs_exporter.go | 7 ++--- .../logs_exporter_test.go | 9 ++++--- exporter/elasticsearchexporter/model.go | 20 +++++++++++--- exporter/elasticsearchexporter/model_test.go | 8 ++++-- .../elasticsearchexporter/trace_exporter.go | 7 ++--- .../traces_exporter_test.go | 3 ++- 8 files changed, 66 insertions(+), 16 deletions(-) create mode 100644 .chloggen/elasticsearchexporter_add_missing_scope_info_in_span.yaml diff --git a/.chloggen/elasticsearchexporter_add_missing_scope_info_in_span.yaml b/.chloggen/elasticsearchexporter_add_missing_scope_info_in_span.yaml new file mode 100644 index 000000000000..9e39172464a8 --- /dev/null +++ b/.chloggen/elasticsearchexporter_add_missing_scope_info_in_span.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "add missing scope info in span attributes" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27282] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/cmd/otelcontribcol/exporters_test.go b/cmd/otelcontribcol/exporters_test.go index 949e7e41048d..893cbbe5c81b 100644 --- a/cmd/otelcontribcol/exporters_test.go +++ b/cmd/otelcontribcol/exporters_test.go @@ -83,6 +83,7 @@ func TestDefaultExporters(t *testing.T) { cfg := expFactories["awscloudwatchlogs"].CreateDefaultConfig().(*awscloudwatchlogsexporter.Config) cfg.Endpoint = "http://" + endpoint cfg.Region = "local" + // disable queue/retry to validate passing the test data synchronously cfg.QueueSettings.Enabled = false cfg.RetrySettings.Enabled = false diff --git a/exporter/elasticsearchexporter/logs_exporter.go b/exporter/elasticsearchexporter/logs_exporter.go index 542eab5259c8..7511988bc2ba 100644 --- a/exporter/elasticsearchexporter/logs_exporter.go +++ b/exporter/elasticsearchexporter/logs_exporter.go @@ -83,9 +83,10 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo resource := rl.Resource() ills := rl.ScopeLogs() for j := 0; j < ills.Len(); j++ { + scope := ills.At(j).Scope() logs := ills.At(j).LogRecords() for k := 0; k < logs.Len(); k++ { - if err := e.pushLogRecord(ctx, resource, logs.At(k)); err != nil { + if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -99,7 +100,7 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo return errors.Join(errs...) } -func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord) error { +func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { prefix := getFromBothResourceAndAttribute(indexPrefix, resource, record) @@ -108,7 +109,7 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } - document, err := e.model.encodeLog(resource, record) + document, err := e.model.encodeLog(resource, record, scope) if err != nil { return fmt.Errorf("Failed to encode log event: %w", err) } diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go index b8da4be16e2c..dd7d80e6b2e7 100644 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ b/exporter/elasticsearchexporter/logs_exporter_test.go @@ -403,9 +403,12 @@ func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string // send trace with span & resource attributes func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporter, attrMp map[string]string, resMp map[string]string) { logs := newLogsWithAttributeAndResourceMap(attrMp, resMp) - resSpans := logs.ResourceLogs().At(0) - logRecords := resSpans.ScopeLogs().At(0).LogRecords().At(0) + resLogs := logs.ResourceLogs().At(0) + logRecords := resLogs.ScopeLogs().At(0).LogRecords().At(0) - err := exporter.pushLogRecord(context.TODO(), resSpans.Resource(), logRecords) + scopeLogs := resLogs.ScopeLogs().AppendEmpty() + scope := scopeLogs.Scope() + + err := exporter.pushLogRecord(context.TODO(), resLogs.Resource(), logRecords, scope) require.NoError(t, err) } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 98409e4cd4b3..9e9e43bc663b 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -17,8 +17,8 @@ import ( ) type mappingModel interface { - encodeLog(pcommon.Resource, plog.LogRecord) ([]byte, error) - encodeSpan(pcommon.Resource, ptrace.Span) ([]byte, error) + encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error) + encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error) } // encodeModel tries to keep the event as close to the original open telemetry semantics as is. @@ -38,7 +38,7 @@ const ( attributeField = "attribute" ) -func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord) ([]byte, error) { +func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) ([]byte, error) { var document objmodel.Document document.AddTimestamp("@timestamp", record.Timestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. document.AddTraceID("TraceId", record.TraceID()) @@ -49,6 +49,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord document.AddAttribute("Body", record.Body()) document.AddAttributes("Attributes", record.Attributes()) document.AddAttributes("Resource", resource.Attributes()) + document.AddAttributes("Scope", scopeToAttributes(scope)) if m.dedup { document.Dedup() @@ -61,7 +62,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord return buf.Bytes(), err } -func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span) ([]byte, error) { +func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) { var document objmodel.Document document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. document.AddTimestamp("EndTimestamp", span.EndTimestamp()) @@ -76,6 +77,7 @@ func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span) ([ document.AddAttributes("Resource", resource.Attributes()) document.AddEvents("Events", span.Events()) document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds + document.AddAttributes("Scope", scopeToAttributes(scope)) if m.dedup { document.Dedup() @@ -107,3 +109,13 @@ func spanLinksToString(spanLinkSlice ptrace.SpanLinkSlice) string { func durationAsMicroseconds(start, end time.Time) int64 { return (end.UnixNano() - start.UnixNano()) / 1000 } + +func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map { + attrs := pcommon.NewMap() + attrs.PutStr("name", scope.Name()) + attrs.PutStr("version", scope.Version()) + for k, v := range scope.Attributes().AsRaw() { + attrs.PutStr(k, v.(string)) + } + return attrs +} diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 76673f36db45..4ee35f74e819 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -13,12 +13,12 @@ import ( semconv "go.opentelemetry.io/collector/semconv/v1.18.0" ) -var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.evnetMockBar":"bar","Events.fooEvent.evnetMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":0}` +var expectedSpanBody = `{"@timestamp":"2023-04-19T03:04:05.000000006Z","Attributes.service.instance.id":"23","Duration":1000000,"EndTimestamp":"2023-04-19T03:04:06.000000006Z","Events.fooEvent.evnetMockBar":"bar","Events.fooEvent.evnetMockFoo":"foo","Events.fooEvent.time":"2023-04-19T03:04:05.000000006Z","Kind":"SPAN_KIND_CLIENT","Link":"[{\"attribute\":{},\"spanID\":\"\",\"traceID\":\"01020304050607080807060504030200\"}]","Name":"client span","Resource.cloud.platform":"aws_elastic_beanstalk","Resource.cloud.provider":"aws","Resource.deployment.environment":"BETA","Resource.service.instance.id":"23","Resource.service.name":"some-service","Resource.service.version":"env-version-1234","Scope.lib-foo":"lib-bar","Scope.name":"io.opentelemetry.rabbitmq-2.7","Scope.version":"1.30.0-alpha","SpanId":"1920212223242526","TraceId":"01020304050607080807060504030201","TraceStatus":0}` func TestEncodeSpan(t *testing.T) { model := &encodeModel{dedup: true, dedot: false} td := mockResourceSpans() - spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)) + spanByte, err := model.encodeSpan(td.ResourceSpans().At(0).Resource(), td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0), td.ResourceSpans().At(0).ScopeSpans().At(0).Scope()) assert.NoError(t, err) assert.Equal(t, expectedSpanBody, string(spanByte)) } @@ -40,6 +40,10 @@ func mockResourceSpans() ptrace.Traces { tEnd := time.Date(2023, 4, 19, 3, 4, 6, 6, time.UTC) scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() + scopeSpans.Scope().SetName("io.opentelemetry.rabbitmq-2.7") + scopeSpans.Scope().SetVersion("1.30.0-alpha") + scopeSpans.Scope().Attributes().PutStr("lib-foo", "lib-bar") + span := scopeSpans.Spans().AppendEmpty() span.SetName("client span") span.SetSpanID([8]byte{0x19, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26}) diff --git a/exporter/elasticsearchexporter/trace_exporter.go b/exporter/elasticsearchexporter/trace_exporter.go index 1ee1d3d98d95..ef421951ed4c 100644 --- a/exporter/elasticsearchexporter/trace_exporter.go +++ b/exporter/elasticsearchexporter/trace_exporter.go @@ -76,9 +76,10 @@ func (e *elasticsearchTracesExporter) pushTraceData( resource := il.Resource() scopeSpans := il.ScopeSpans() for j := 0; j < scopeSpans.Len(); j++ { + scope := scopeSpans.At(j).Scope() spans := scopeSpans.At(j).Spans() for k := 0; k < spans.Len(); k++ { - if err := e.pushTraceRecord(ctx, resource, spans.At(k)); err != nil { + if err := e.pushTraceRecord(ctx, resource, spans.At(k), scope); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -91,7 +92,7 @@ func (e *elasticsearchTracesExporter) pushTraceData( return errors.Join(errs...) } -func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span) error { +func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { prefix := getFromBothResourceAndAttribute(indexPrefix, resource, span) @@ -100,7 +101,7 @@ func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resou fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } - document, err := e.model.encodeSpan(resource, span) + document, err := e.model.encodeSpan(resource, span, scope) if err != nil { return fmt.Errorf("Failed to encode trace record: %w", err) } diff --git a/exporter/elasticsearchexporter/traces_exporter_test.go b/exporter/elasticsearchexporter/traces_exporter_test.go index 2a5586641905..70113b95365d 100644 --- a/exporter/elasticsearchexporter/traces_exporter_test.go +++ b/exporter/elasticsearchexporter/traces_exporter_test.go @@ -404,7 +404,8 @@ func mustSendTracesWithAttributes(t *testing.T, exporter *elasticsearchTracesExp traces := newTracesWithAttributeAndResourceMap(attrMp, resMp) resSpans := traces.ResourceSpans().At(0) span := resSpans.ScopeSpans().At(0).Spans().At(0) + scope := resSpans.ScopeSpans().At(0).Scope() - err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span) + err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span, scope) require.NoError(t, err) }