From 4fada1fe4d06f529af2bd65cb4973117ac36bb90 Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Tue, 20 Feb 2024 14:10:39 -0800 Subject: [PATCH] Parse binary data as-is (#776) * Parse binary data as-is * fix test --- .../opensearch/BulkApiRequestParser.java | 10 ++- .../com/slack/kaldb/writer/SpanFormatter.java | 2 +- .../writer/LogMessageWriterImplTest.java | 68 +++++++++++++++++++ .../slack/kaldb/writer/SpanFormatterTest.java | 23 ++----- 4 files changed, 80 insertions(+), 23 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java index 1f815fc64e..850b657438 100644 --- a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java @@ -1,5 +1,6 @@ package com.slack.kaldb.bulkIngestApi.opensearch; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.slack.kaldb.writer.SpanFormatter; import com.slack.service.murron.trace.Trace; @@ -79,7 +80,8 @@ public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) .toEpochMilli(); } - protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { + @VisibleForTesting + public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { long timestampInMillis = getTimestampFromIngestDocument(ingestDocument); @@ -139,7 +141,8 @@ protected static Map> convertIndexRequestToTraceFormat( } // only parse IndexRequests - protected static IngestDocument convertRequestToDocument(IndexRequest indexRequest) { + @VisibleForTesting + public static IngestDocument convertRequestToDocument(IndexRequest indexRequest) { String index = indexRequest.index(); String id = indexRequest.id(); String routing = indexRequest.routing(); @@ -153,7 +156,8 @@ protected static IngestDocument convertRequestToDocument(IndexRequest indexReque // and transform it } - protected static List parseBulkRequest(byte[] postBody) throws IOException { + @VisibleForTesting + public static List parseBulkRequest(byte[] postBody) throws IOException { List indexRequests = new ArrayList<>(); BulkRequest bulkRequest = new BulkRequest(); // calls parse under the hood diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java index cac5069f19..188e25aa60 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java @@ -195,7 +195,7 @@ public static LogMessage toLogMessage(Trace.Span span) { } else if (valueType == 3) { jsonMap.put(key, tag.getVFloat64()); } else if (valueType == 4) { - jsonMap.put(key, encodeBinaryTagValue(tag.getVBinary())); + jsonMap.put(key, tag.getVBinary().toStringUtf8()); } else { LOG.warn("Skipping field with unknown value type {} with key {}", valueType, key); } diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java index d4a38a52d2..d9e73e139d 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java @@ -1,5 +1,6 @@ package com.slack.kaldb.writer; +import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument; import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_FAILED_COUNTER; import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_RECEIVED_COUNTER; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; @@ -13,6 +14,7 @@ import brave.Tracing; import com.adobe.testing.s3mock.junit5.S3MockExtension; import com.google.protobuf.ByteString; +import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; import com.slack.kaldb.chunkManager.IndexingChunkManager; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.search.SearchQuery; @@ -24,6 +26,7 @@ import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.util.Collections; @@ -38,6 +41,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.ingest.IngestDocument; public class LogMessageWriterImplTest { @@ -289,6 +294,69 @@ public void testIngestTraceSpan() throws IOException { .isEqualTo(1); } + @Test + public void parseAndIndexBulkApiRequestTest() throws IOException { + // crux of the test - encoding and decoding of binary fields + // ByteString inputBytes = ByteString.copyFrom("{\"key1\": + // \"value1\"}".toString().getBytes()); + // + // String output = SpanFormatter.encodeBinaryTagValue(inputBytes); + // System.out.println(output); + + String inputDocuments = + """ + { "index" : { "_index" : "test", "_id" : "1" } } + { "field1" : "value1", "field2" : "value2", "tags" : [] } + { "index" : { "_index" : "test", "_id" : "2" } } + { "field1" : "value1", "field2" : "value2", "tags" : ["tagValue1", "tagValue2"] } + { "index" : { "_index" : "test", "_id" : "3" } } + { "field1" : "value1", "field2" : "value2", "message" : {} } + { "index" : { "_index" : "test", "_id" : "4" } } + { "field1" : "value1", "field2" : "value2", "message" : { "nestedField1" : "nestedValue1", "nestedField2" : "nestedValue2" } } + """; + + byte[] rawRequest = inputDocuments.getBytes(StandardCharsets.UTF_8); + + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); + assertThat(indexRequests.size()).isEqualTo(4); + + for (IndexRequest indexRequest : indexRequests) { + IngestDocument ingestDocument = convertRequestToDocument(indexRequest); + Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument); + ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); + LogMessageWriterImpl messageWriter = + new LogMessageWriterImpl( + chunkManagerUtil.chunkManager, LogMessageWriterImpl.traceSpanTransformer); + assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); + } + + assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(4); + assertThat(getCount(MESSAGES_FAILED_COUNTER, metricsRegistry)).isEqualTo(0); + chunkManagerUtil.chunkManager.getActiveChunk().commit(); + + SearchResult results = searchChunkManager("test", "_id:1"); + assertThat(results.hits.size()).isEqualTo(1); + Object value = results.hits.get(0).getSource().get("tags"); + assertThat(value).isEqualTo("[]"); + + results = searchChunkManager("test", "_id:2"); + assertThat(results.hits.size()).isEqualTo(1); + value = results.hits.get(0).getSource().get("tags"); + // ArrayList#toString in SpanFormatter#convertKVtoProto for the binary field type case + assertThat(value).isEqualTo("[tagValue1, tagValue2]"); + + results = searchChunkManager("test", "_id:3"); + assertThat(results.hits.size()).isEqualTo(1); + value = results.hits.get(0).getSource().get("message"); + assertThat(value).isEqualTo("{}"); + + results = searchChunkManager("test", "_id:4"); + assertThat(results.hits.size()).isEqualTo(1); + value = results.hits.get(0).getSource().get("message"); + // HashMap#toString in SpanFormatter#convertKVtoProto for the binary field type case + assertThat(value).isEqualTo("{nestedField2=nestedValue2, nestedField1=nestedValue1}"); + } + @Test public void testNullTraceSpan() throws IOException { LogMessageWriterImpl messageWriter = diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java index f8976b36bf..6a89898129 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java @@ -3,15 +3,12 @@ import static com.slack.kaldb.testlib.SpanUtil.BINARY_TAG_VALUE; import static org.assertj.core.api.Assertions.assertThat; -import com.google.protobuf.ByteString; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.testlib.SpanUtil; import com.slack.service.murron.trace.Trace; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; @@ -59,10 +56,7 @@ public void testNonRootSpanToLogMessage() { assertThat(source.get("int")).isEqualTo(1000L); assertThat(source.get("float")).isEqualTo(1001.2); String binaryTagValue = (String) source.get("binary"); - assertThat(binaryTagValue) - .isEqualTo(SpanFormatter.encodeBinaryTagValue(ByteString.copyFromUtf8(BINARY_TAG_VALUE))); - assertThat(new String(Base64.getDecoder().decode(binaryTagValue), StandardCharsets.UTF_8)) - .isEqualTo(BINARY_TAG_VALUE); + assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE); } @Test @@ -104,10 +98,7 @@ public void testRootSpanToLogMessage() { assertThat(source.get("int")).isEqualTo(1000L); assertThat(source.get("float")).isEqualTo(1001.2); String binaryTagValue = (String) source.get("binary"); - assertThat(binaryTagValue) - .isEqualTo(SpanFormatter.encodeBinaryTagValue(ByteString.copyFromUtf8(BINARY_TAG_VALUE))); - assertThat(new String(Base64.getDecoder().decode(binaryTagValue), StandardCharsets.UTF_8)) - .isEqualTo(BINARY_TAG_VALUE); + assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE); } @Test @@ -180,10 +171,7 @@ public void testListOfSpansConversion() { assertThat(source.get("int")).isEqualTo(1000L); assertThat(source.get("float")).isEqualTo(1001.2); String binaryTagValue = (String) source.get("binary"); - assertThat(binaryTagValue) - .isEqualTo(SpanFormatter.encodeBinaryTagValue(ByteString.copyFromUtf8(BINARY_TAG_VALUE))); - assertThat(new String(Base64.getDecoder().decode(binaryTagValue), StandardCharsets.UTF_8)) - .isEqualTo(BINARY_TAG_VALUE); + assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE); } } @@ -230,10 +218,7 @@ public void testSpanWithoutKeyFieldsToLogMessage() { assertThat(source.get("int")).isEqualTo(1000L); assertThat(source.get("float")).isEqualTo(1001.2); String binaryTagValue = (String) source.get("binary"); - assertThat(binaryTagValue) - .isEqualTo(SpanFormatter.encodeBinaryTagValue(ByteString.copyFromUtf8(BINARY_TAG_VALUE))); - assertThat(new String(Base64.getDecoder().decode(binaryTagValue), StandardCharsets.UTF_8)) - .isEqualTo(BINARY_TAG_VALUE); + assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE); } @Test