From ca6f86aae8607d3098860e9b27ca1c0ea94ef604 Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Thu, 9 Nov 2023 21:09:45 -0800 Subject: [PATCH] Add splitExportTraceServiceRequest API to OTelProtoDecoder (#3600) * Add splitExportTraceServiceRequest API to OTelProtoDecoder Signed-off-by: Krishna Kondaka * Renamed the API Signed-off-by: Krishna Kondaka * Fixed code and modified test case Signed-off-by: Krishna Kondaka * Fixed check style test Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../plugins/otel/codec/OTelProtoCodec.java | 99 +++++++ .../otel/codec/OTelProtoCodecTest.java | 54 ++++ .../test-request-multiple-traces.json | 249 ++++++++++++++++++ 3 files changed, 402 insertions(+) create mode 100644 data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-traces.json diff --git a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java index dba17b0851..29a58be6df 100644 --- a/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java +++ b/data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodec.java @@ -166,6 +166,26 @@ public List parseExportTraceServiceRequest(final ExportTraceServiceRequest .flatMap(rs -> parseResourceSpans(rs).stream()).collect(Collectors.toList()); } + public Map splitExportTraceServiceRequestByTraceId(final ExportTraceServiceRequest exportTraceServiceRequest) { + Map result = new HashMap<>(); + Map resultBuilderMap = new HashMap<>(); + for (final ResourceSpans resourceSpans: exportTraceServiceRequest.getResourceSpansList()) { + for (Map.Entry entry: splitResourceSpansByTraceId(resourceSpans).entrySet()) { + String traceId = entry.getKey(); + + if (resultBuilderMap.containsKey(traceId)) { + resultBuilderMap.get(traceId).addResourceSpans(entry.getValue()); + } else { + resultBuilderMap.put(traceId, ExportTraceServiceRequest.newBuilder().addResourceSpans(entry.getValue())); + } + } + } + for (Map.Entry entry: resultBuilderMap.entrySet()) { + result.put(entry.getKey(), entry.getValue().build()); + } + return result; + } + public List parseExportLogsServiceRequest(final ExportLogsServiceRequest exportLogsServiceRequest) { return exportLogsServiceRequest.getResourceLogsList().stream() .flatMap(rs -> parseResourceLogs(rs).stream()).collect(Collectors.toList()); @@ -199,6 +219,38 @@ protected Collection parseResourceLogs(ResourceLogs rs) { return Stream.concat(mappedInstrumentationLibraryLogs, mappedScopeListLogs).collect(Collectors.toList()); } + protected Map splitResourceSpansByTraceId(final ResourceSpans resourceSpans) { + final Resource resource = resourceSpans.getResource(); + Map result = new HashMap<>(); + Map resultBuilderMap = new HashMap<>(); + + if (resourceSpans.getScopeSpansList().size() > 0) { + for (Map.Entry> entry: splitScopeSpansByTraceId(resourceSpans.getScopeSpansList()).entrySet()) { + ResourceSpans.Builder b = ResourceSpans.newBuilder().setResource(resource).addAllScopeSpans(entry.getValue()); + resultBuilderMap.put(entry.getKey(), b); + } + } + + if (resourceSpans.getInstrumentationLibrarySpansList().size() > 0) { + for (Map.Entry> entry: splitInstrumentationLibrarySpansByTraceId(resourceSpans.getInstrumentationLibrarySpansList()).entrySet()) { + ResourceSpans.Builder resourceSpansBuilder; + String traceId = entry.getKey(); + if (resultBuilderMap.containsKey(traceId)) { + resourceSpansBuilder = resultBuilderMap.get(traceId); + } else { + resourceSpansBuilder = ResourceSpans.newBuilder().setResource(resource); + resultBuilderMap.put(traceId, resourceSpansBuilder); + } + resourceSpansBuilder.addAllInstrumentationLibrarySpans(entry.getValue()); + } + } + for (Map.Entry entry: resultBuilderMap.entrySet()) { + result.put(entry.getKey(), entry.getValue().build()); + } + + return result; + } + protected List parseResourceSpans(final ResourceSpans resourceSpans) { final String serviceName = getServiceName(resourceSpans.getResource()).orElse(null); final Map resourceAttributes = getResourceAttributes(resourceSpans.getResource()); @@ -223,6 +275,21 @@ private List parseScopeSpans(final List scopeSpansList, final .collect(Collectors.toList()); } + private Map> splitScopeSpansByTraceId(final List scopeSpansList) { + Map> result = new HashMap<>(); + for (ScopeSpans ss: scopeSpansList) { + for (Map.Entry> entry: splitSpansByTraceId(ss.getSpansList()).entrySet()) { + ScopeSpans.Builder scopeSpansBuilder = ScopeSpans.newBuilder().setScope(ss.getScope()).addAllSpans(entry.getValue()); + String traceId = entry.getKey(); + if (!result.containsKey(traceId)) { + result.put(traceId, new ArrayList<>()); + } + result.get(traceId).add(scopeSpansBuilder.build()); + } + } + return result; + } + private List parseInstrumentationLibrarySpans(final List instrumentationLibrarySpansList, final String serviceName, final Map resourceAttributes) { return instrumentationLibrarySpansList.stream() @@ -233,6 +300,38 @@ private List parseInstrumentationLibrarySpans(final List> splitInstrumentationLibrarySpansByTraceId(final List instrumentationLibrarySpansList) { + Map> result = new HashMap<>(); + for (InstrumentationLibrarySpans is: instrumentationLibrarySpansList) { + for (Map.Entry> entry: splitSpansByTraceId(is.getSpansList()).entrySet()) { + String traceId = entry.getKey(); + InstrumentationLibrarySpans.Builder ilSpansBuilder = InstrumentationLibrarySpans.newBuilder().setInstrumentationLibrary(is.getInstrumentationLibrary()).addAllSpans(entry.getValue()); + if (!result.containsKey(traceId)) { + result.put(traceId, new ArrayList<>()); + } + result.get(traceId).add(ilSpansBuilder.build()); + } + } + return result; + } + + + private Map> splitSpansByTraceId(final List spans) { + Map> result = new HashMap<>(); + for (io.opentelemetry.proto.trace.v1.Span span: spans) { + String traceId = convertByteStringToString(span.getTraceId()); + List spanList; + if (result.containsKey(traceId)) { + spanList = result.get(traceId); + } else { + spanList = new ArrayList<>(); + result.put(traceId, spanList); + } + spanList.add(span); + } + return result; + } + private List parseSpans(final List spans, final T scope, final Function> scopeAttributesGetter, final String serviceName, final Map resourceAttributes) { diff --git a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java index 6c9a167ad5..a68bf57e6d 100644 --- a/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java +++ b/data-prepper-plugins/otel-proto-common/src/test/java/org/opensearch/dataprepper/plugins/otel/codec/OTelProtoCodecTest.java @@ -80,6 +80,7 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -98,6 +99,7 @@ public class OTelProtoCodecTest { private static final String TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE = "test-histogram-metrics.json"; private static final String TEST_REQUEST_LOGS_JSON_FILE = "test-request-log.json"; private static final String TEST_REQUEST_LOGS_IS_JSON_FILE = "test-request-log-is.json"; + private static final String TEST_REQUEST_MULTIPLE_TRACES_FILE = "test-request-multiple-traces.json"; private static final Long TIME = TimeUnit.MILLISECONDS.toNanos(ZonedDateTime.of( @@ -153,6 +155,58 @@ private String getFileAsJsonString(String requestJsonFileName) throws IOExceptio @Nested class OTelProtoDecoderTest { + @Test + public void testSplitExportTraceServiceRequestWithMultipleTraces() throws Exception { + final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_MULTIPLE_TRACES_FILE); + final Map map = decoderUnderTest.splitExportTraceServiceRequestByTraceId(exportTraceServiceRequest); + assertThat(map.size(), is(equalTo(3))); + for (Map.Entry entry: map.entrySet()) { + String expectedTraceId = new String(Hex.decodeHex(entry.getKey()), StandardCharsets.UTF_8); + ExportTraceServiceRequest request = entry.getValue(); + if (expectedTraceId.equals("TRACEID1")) { + assertThat(request.getResourceSpansList().size(), equalTo(1)); + ResourceSpans rs = request.getResourceSpansList().get(0); + assertThat(rs.getScopeSpansList().size(), equalTo(1)); + assertThat(rs.getInstrumentationLibrarySpansList().size(), equalTo(0)); + ScopeSpans ss = rs.getScopeSpansList().get(0); + assertThat(ss.getSpansList().size(), equalTo(1)); + io.opentelemetry.proto.trace.v1.Span span = ss.getSpansList().get(0); + String spanId = span.getSpanId().toStringUtf8(); + assertTrue(spanId.equals("TRACEID1-SPAN1")); + } else if (expectedTraceId.equals("TRACEID2")) { + assertThat(request.getResourceSpansList().size(), equalTo(1)); + ResourceSpans rs = request.getResourceSpansList().get(0); + assertThat(rs.getScopeSpansList().size(), equalTo(2)); + assertThat(rs.getInstrumentationLibrarySpansList().size(), equalTo(2)); + + ScopeSpans ss = rs.getScopeSpansList().get(0); + assertThat(ss.getSpansList().size(), equalTo(1)); + io.opentelemetry.proto.trace.v1.Span span = ss.getSpansList().get(0); + String spanId = span.getSpanId().toStringUtf8(); + assertTrue(spanId.equals("TRACEID2-SPAN1")); + + ss = rs.getScopeSpansList().get(1); + assertThat(ss.getSpansList().size(), equalTo(1)); + span = ss.getSpansList().get(0); + spanId = span.getSpanId().toStringUtf8(); + assertTrue(spanId.equals("TRACEID2-SPAN2")); + + } else if (expectedTraceId.equals("TRACEID3")) { + assertThat(request.getResourceSpansList().size(), equalTo(1)); + ResourceSpans rs = request.getResourceSpansList().get(0); + assertThat(rs.getScopeSpansList().size(), equalTo(1)); + assertThat(rs.getInstrumentationLibrarySpansList().size(), equalTo(0)); + ScopeSpans ss = rs.getScopeSpansList().get(0); + assertThat(ss.getSpansList().size(), equalTo(1)); + io.opentelemetry.proto.trace.v1.Span span = ss.getSpansList().get(0); + String spanId = span.getSpanId().toStringUtf8(); + assertTrue(spanId.equals("TRACEID3-SPAN1")); + } else { + assertTrue("Failed".equals("Unknown TraceId")); + } + } + } + @Test public void testParseExportTraceServiceRequest() throws IOException { final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_TRACE_JSON_FILE); diff --git a/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-traces.json b/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-traces.json new file mode 100644 index 0000000000..461a5c935a --- /dev/null +++ b/data-prepper-plugins/otel-proto-common/src/test/resources/test-request-multiple-traces.json @@ -0,0 +1,249 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "service.name", + "value": { + "stringValue": "analytics-service1" + } + }, + { + "key": "telemetry.sdk.language", + "value": { + "stringValue": "java" + } + }, + { + "key": "telemetry.sdk.name", + "value": { + "stringValue": "opentelemetry" + } + }, + { + "key": "telemetry.sdk.version", + "value": { + "stringValue": "0.8.0-SNAPSHOT" + } + }, + { + "key": "array", + "value": { + "arrayValue": { + "values": [ + { + "stringValue": "test string" + }, + { + "boolValue": false + }, + { + "intValue": 0 + } + ] + } + } + }, + { + "key": "kvList", + "value": { + "kvlistValue": { + "values": [ + { + "key": "key1", + "value": { + "stringValue": "value1" + } + }, + { + "key": "key2", + "value": { + "stringValue": "value2" + } + } + ] + } + } + } + ], + "droppedAttributesCount": 0 + }, + "scopeSpans": [ + { + "scope": { + "name": "io.opentelemetry.auto.spring-webmvc-3.1", + "version": "" + }, + "spans": [ + { + "traceId": "VFJBQ0VJRDE=", + "spanId": "VFJBQ0VJRDEtU1BBTjE=", + "traceState": "", + "parentSpanId": "yxwHNNFJQP0=", + "name": "LoggingController.save", + "kind": "SPAN_KIND_INTERNAL", + "startTimeUnixNano": "1597902043168792500", + "endTimeUnixNano": "1597902043215953100", + "attributes": [], + "droppedAttributesCount": 0, + "events": [], + "droppedEventsCount": 0, + "links": [], + "droppedLinksCount": 0, + "status": { + "code": "STATUS_CODE_OK", + "message": "" + } + }, + { + "traceId": "VFJBQ0VJRDI=", + "spanId": "VFJBQ0VJRDItU1BBTjE=", + "traceState": "", + "parentSpanId": "yxwHNNFJQP0=", + "name": "LoggingController.save", + "kind": "SPAN_KIND_INTERNAL", + "startTimeUnixNano": "1597902043168792500", + "endTimeUnixNano": "1597902043215953100", + "attributes": [], + "droppedAttributesCount": 0, + "events": [], + "droppedEventsCount": 0, + "links": [], + "droppedLinksCount": 0, + "status": { + "code": "STATUS_CODE_OK", + "message": "" + } + } + ] + }, + { + "scope": { + "name": "io.opentelemetry.auto.apache-httpasyncclient-4.0", + "version": "" + }, + "spans": [ + { + "traceId": "VFJBQ0VJRDI=", + "spanId": "VFJBQ0VJRDItU1BBTjI=", + "traceState": "", + "parentSpanId": "XYZAgv/Pv40=", + "name": "HTTP PUT", + "kind": "SPAN_KIND_CLIENT", + "startTimeUnixNano": "1597902043175204700", + "endTimeUnixNano": "1597902043205117100", + "attributes": [ + { + "key": "http.status_code", + "value": { + "intValue": "200" + } + }, + { + "key": "http.url", + "value": { + "stringValue": "/logs/_doc/service_1?timeout\\u003d1m" + } + }, + { + "key": "http.method", + "value": { + "stringValue": "PUT" + } + } + ], + "droppedAttributesCount": 0, + "events": [], + "droppedEventsCount": 0, + "links": [], + "droppedLinksCount": 0, + "status": { + "code": "STATUS_CODE_OK", + "message": "" + } + }, + { + "traceId": "VFJBQ0VJRDM=", + "spanId": "VFJBQ0VJRDMtU1BBTjE=", + "traceState": "", + "parentSpanId": "yxwHNNFJQP0=", + "name": "LoggingController.save", + "kind": "SPAN_KIND_INTERNAL", + "startTimeUnixNano": "1597902043168792500", + "endTimeUnixNano": "1597902043215953100", + "attributes": [], + "droppedAttributesCount": 0, + "events": [], + "droppedEventsCount": 0, + "links": [], + "droppedLinksCount": 0, + "status": { + "code": "STATUS_CODE_OK", + "message": "" + } + } + ] + } + ], + "instrumentationLibrarySpans": [ + { + "instrumentationLibrary": { + "name": "io.opentelemetry.auto.spring-webmvc-3.1", + "version": "" + }, + "spans": [ + { + "traceId": "VFJBQ0VJRDI=", + "spanId": "VFJBQ0VJRDItU1BBTjE=", + "traceState": "", + "parentSpanId": "yxwHNNFJQP0=", + "name": "LoggingController.save", + "kind": "SPAN_KIND_INTERNAL", + "startTimeUnixNano": "1597902043168792500", + "endTimeUnixNano": "1597902043215953100", + "attributes": [], + "droppedAttributesCount": 0, + "events": [], + "droppedEventsCount": 0, + "links": [], + "droppedLinksCount": 0, + "status": { + "code": "STATUS_CODE_OK", + "message": "" + } + } + ] + }, + { + "instrumentationLibrary": { + "name": "io.opentelemetry.auto.spring-webmvc-3.1", + "version": "" + }, + "spans": [ + { + "traceId": "VFJBQ0VJRDI=", + "spanId": "VFJBQ0VJRDItU1BBTjI=", + "traceState": "", + "parentSpanId": "yxwHNNFJQP0=", + "name": "LoggingController.save", + "kind": "SPAN_KIND_INTERNAL", + "startTimeUnixNano": "1597902043168792500", + "endTimeUnixNano": "1597902043215953100", + "attributes": [], + "droppedAttributesCount": 0, + "events": [], + "droppedEventsCount": 0, + "links": [], + "droppedLinksCount": 0, + "status": { + "code": "STATUS_CODE_OK", + "message": "" + } + } + ] + } + ] + } + ] +}