Skip to content

Commit

Permalink
Parse binary data as-is (#776)
Browse files Browse the repository at this point in the history
* Parse binary data as-is

* fix test
  • Loading branch information
vthacker authored Feb 20, 2024
1 parent 2c82356 commit 4fada1f
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.slack.kaldb.bulkIngestApi.opensearch;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.slack.kaldb.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
Expand Down Expand Up @@ -79,7 +80,8 @@ public static long getTimestampFromIngestDocument(IngestDocument ingestDocument)
.toEpochMilli();
}

protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {
@VisibleForTesting
public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) {

long timestampInMillis = getTimestampFromIngestDocument(ingestDocument);

Expand Down Expand Up @@ -139,7 +141,8 @@ protected static Map<String, List<Trace.Span>> convertIndexRequestToTraceFormat(
}

// only parse IndexRequests
protected static IngestDocument convertRequestToDocument(IndexRequest indexRequest) {
@VisibleForTesting
public static IngestDocument convertRequestToDocument(IndexRequest indexRequest) {
String index = indexRequest.index();
String id = indexRequest.id();
String routing = indexRequest.routing();
Expand All @@ -153,7 +156,8 @@ protected static IngestDocument convertRequestToDocument(IndexRequest indexReque
// and transform it
}

protected static List<IndexRequest> parseBulkRequest(byte[] postBody) throws IOException {
@VisibleForTesting
public static List<IndexRequest> parseBulkRequest(byte[] postBody) throws IOException {
List<IndexRequest> indexRequests = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();
// calls parse under the hood
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public static LogMessage toLogMessage(Trace.Span span) {
} else if (valueType == 3) {
jsonMap.put(key, tag.getVFloat64());
} else if (valueType == 4) {
jsonMap.put(key, encodeBinaryTagValue(tag.getVBinary()));
jsonMap.put(key, tag.getVBinary().toStringUtf8());
} else {
LOG.warn("Skipping field with unknown value type {} with key {}", valueType, key);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.slack.kaldb.writer;

import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument;
import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_FAILED_COUNTER;
import static com.slack.kaldb.logstore.LuceneIndexStoreImpl.MESSAGES_RECEIVED_COUNTER;
import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION;
Expand All @@ -13,6 +14,7 @@
import brave.Tracing;
import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.google.protobuf.ByteString;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.kaldb.chunkManager.IndexingChunkManager;
import com.slack.kaldb.logstore.LogMessage;
import com.slack.kaldb.logstore.search.SearchQuery;
Expand All @@ -24,6 +26,7 @@
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -38,6 +41,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ingest.IngestDocument;

public class LogMessageWriterImplTest {

Expand Down Expand Up @@ -289,6 +294,69 @@ public void testIngestTraceSpan() throws IOException {
.isEqualTo(1);
}

@Test
public void parseAndIndexBulkApiRequestTest() throws IOException {
// crux of the test - encoding and decoding of binary fields
// ByteString inputBytes = ByteString.copyFrom("{\"key1\":
// \"value1\"}".toString().getBytes());
//
// String output = SpanFormatter.encodeBinaryTagValue(inputBytes);
// System.out.println(output);

String inputDocuments =
"""
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1", "field2" : "value2", "tags" : [] }
{ "index" : { "_index" : "test", "_id" : "2" } }
{ "field1" : "value1", "field2" : "value2", "tags" : ["tagValue1", "tagValue2"] }
{ "index" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value1", "field2" : "value2", "message" : {} }
{ "index" : { "_index" : "test", "_id" : "4" } }
{ "field1" : "value1", "field2" : "value2", "message" : { "nestedField1" : "nestedValue1", "nestedField2" : "nestedValue2" } }
""";

byte[] rawRequest = inputDocuments.getBytes(StandardCharsets.UTF_8);

List<IndexRequest> indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest);
assertThat(indexRequests.size()).isEqualTo(4);

for (IndexRequest indexRequest : indexRequests) {
IngestDocument ingestDocument = convertRequestToDocument(indexRequest);
Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument);
ConsumerRecord<String, byte[]> spanRecord = consumerRecordWithValue(span.toByteArray());
LogMessageWriterImpl messageWriter =
new LogMessageWriterImpl(
chunkManagerUtil.chunkManager, LogMessageWriterImpl.traceSpanTransformer);
assertThat(messageWriter.insertRecord(spanRecord)).isTrue();
}

assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(4);
assertThat(getCount(MESSAGES_FAILED_COUNTER, metricsRegistry)).isEqualTo(0);
chunkManagerUtil.chunkManager.getActiveChunk().commit();

SearchResult<LogMessage> results = searchChunkManager("test", "_id:1");
assertThat(results.hits.size()).isEqualTo(1);
Object value = results.hits.get(0).getSource().get("tags");
assertThat(value).isEqualTo("[]");

results = searchChunkManager("test", "_id:2");
assertThat(results.hits.size()).isEqualTo(1);
value = results.hits.get(0).getSource().get("tags");
// ArrayList#toString in SpanFormatter#convertKVtoProto for the binary field type case
assertThat(value).isEqualTo("[tagValue1, tagValue2]");

results = searchChunkManager("test", "_id:3");
assertThat(results.hits.size()).isEqualTo(1);
value = results.hits.get(0).getSource().get("message");
assertThat(value).isEqualTo("{}");

results = searchChunkManager("test", "_id:4");
assertThat(results.hits.size()).isEqualTo(1);
value = results.hits.get(0).getSource().get("message");
// HashMap#toString in SpanFormatter#convertKVtoProto for the binary field type case
assertThat(value).isEqualTo("{nestedField2=nestedValue2, nestedField1=nestedValue1}");
}

@Test
public void testNullTraceSpan() throws IOException {
LogMessageWriterImpl messageWriter =
Expand Down
23 changes: 4 additions & 19 deletions kaldb/src/test/java/com/slack/kaldb/writer/SpanFormatterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
import static com.slack.kaldb.testlib.SpanUtil.BINARY_TAG_VALUE;
import static org.assertj.core.api.Assertions.assertThat;

import com.google.protobuf.ByteString;
import com.slack.kaldb.logstore.LogMessage;
import com.slack.kaldb.testlib.SpanUtil;
import com.slack.service.murron.trace.Trace;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -59,10 +56,7 @@ public void testNonRootSpanToLogMessage() {
assertThat(source.get("int")).isEqualTo(1000L);
assertThat(source.get("float")).isEqualTo(1001.2);
String binaryTagValue = (String) source.get("binary");
assertThat(binaryTagValue)
.isEqualTo(SpanFormatter.encodeBinaryTagValue(ByteString.copyFromUtf8(BINARY_TAG_VALUE)));
assertThat(new String(Base64.getDecoder().decode(binaryTagValue), StandardCharsets.UTF_8))
.isEqualTo(BINARY_TAG_VALUE);
assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE);
}

@Test
Expand Down Expand Up @@ -104,10 +98,7 @@ public void testRootSpanToLogMessage() {
assertThat(source.get("int")).isEqualTo(1000L);
assertThat(source.get("float")).isEqualTo(1001.2);
String binaryTagValue = (String) source.get("binary");
assertThat(binaryTagValue)
.isEqualTo(SpanFormatter.encodeBinaryTagValue(ByteString.copyFromUtf8(BINARY_TAG_VALUE)));
assertThat(new String(Base64.getDecoder().decode(binaryTagValue), StandardCharsets.UTF_8))
.isEqualTo(BINARY_TAG_VALUE);
assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE);
}

@Test
Expand Down Expand Up @@ -180,10 +171,7 @@ public void testListOfSpansConversion() {
assertThat(source.get("int")).isEqualTo(1000L);
assertThat(source.get("float")).isEqualTo(1001.2);
String binaryTagValue = (String) source.get("binary");
assertThat(binaryTagValue)
.isEqualTo(SpanFormatter.encodeBinaryTagValue(ByteString.copyFromUtf8(BINARY_TAG_VALUE)));
assertThat(new String(Base64.getDecoder().decode(binaryTagValue), StandardCharsets.UTF_8))
.isEqualTo(BINARY_TAG_VALUE);
assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE);
}
}

Expand Down Expand Up @@ -230,10 +218,7 @@ public void testSpanWithoutKeyFieldsToLogMessage() {
assertThat(source.get("int")).isEqualTo(1000L);
assertThat(source.get("float")).isEqualTo(1001.2);
String binaryTagValue = (String) source.get("binary");
assertThat(binaryTagValue)
.isEqualTo(SpanFormatter.encodeBinaryTagValue(ByteString.copyFromUtf8(BINARY_TAG_VALUE)));
assertThat(new String(Base64.getDecoder().decode(binaryTagValue), StandardCharsets.UTF_8))
.isEqualTo(BINARY_TAG_VALUE);
assertThat(binaryTagValue).isEqualTo(BINARY_TAG_VALUE);
}

@Test
Expand Down

0 comments on commit 4fada1f

Please sign in to comment.