From 18ededc0afadb2c636e4e89f6d2524744e07ec63 Mon Sep 17 00:00:00 2001 From: vthacker Date: Tue, 27 Feb 2024 15:46:52 -0800 Subject: [PATCH] deprecate LogMessage while indexing data --- .../java/com/slack/kaldb/IndexAPILog.java | 375 +++++++++--------- .../com/slack/kaldb/IndexingBenchmark.java | 35 +- .../java/com/slack/kaldb/QueryBenchmark.java | 339 ++++++++-------- .../com/slack/kaldb/chunk/ReadWriteChunk.java | 18 +- .../chunkManager/CachingChunkManager.java | 3 +- .../kaldb/chunkManager/ChunkManager.java | 4 +- .../chunkManager/IndexingChunkManager.java | 4 +- .../chunkManager/RecoveryChunkManager.java | 4 +- .../slack/kaldb/logstore/DocumentBuilder.java | 5 +- .../com/slack/kaldb/logstore/LogStore.java | 3 +- .../kaldb/logstore/LuceneIndexStoreImpl.java | 9 +- .../SchemaAwareLogDocumentBuilderImpl.java | 156 +++++++- .../slack/kaldb/recovery/RecoveryService.java | 7 +- .../com/slack/kaldb/server/KaldbIndexer.java | 7 +- .../kaldb/writer/LogMessageWriterImpl.java | 22 +- .../com/slack/kaldb/writer/SpanFormatter.java | 9 + .../kaldb/chunk/IndexingChunkImplTest.java | 26 +- .../kaldb/chunk/RecoveryChunkImplTest.java | 25 +- .../chunkManager/CachingChunkManagerTest.java | 3 +- .../IndexingChunkManagerTest.java | 174 ++++++-- .../RecoveryChunkManagerTest.java | 47 ++- ...MessageCountBasedRolloverStrategyTest.java | 9 +- .../ElasticsearchApiServiceTest.java | 6 +- .../kaldb/logstore/FieldConflictsTest.java | 10 +- .../logstore/LuceneIndexStoreImplTest.java | 14 +- .../ConvertFieldValueAndDuplicateTest.java | 26 +- .../schema/ConvertFieldValueTest.java | 8 +- .../kaldb/logstore/schema/DropPolicyTest.java | 20 +- .../schema/RaiseErrorFieldValueTest.java | 10 +- .../search/KaldbLocalQueryServiceTest.java | 42 +- .../search/LogIndexSearcherImplTest.java | 115 ++++-- .../SearchResultAggregatorImplTest.java | 4 +- .../logstore/search/StatsCollectorTest.java | 11 +- .../com/slack/kaldb/testlib/MessageUtil.java | 139 +++---- ...TemporaryLogStoreAndSearcherExtension.java | 2 +- .../writer/LogMessageWriterImplTest.java | 27 +- .../writer/kafka/KaldbKafkaConsumerTest.java | 11 +- 37 files changed, 1007 insertions(+), 722 deletions(-) diff --git a/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java b/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java index 0ec8e8c0dd..1e95d30005 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java +++ b/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java @@ -1,187 +1,188 @@ -package com.slack.kaldb; - -import com.google.protobuf.ByteString; -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LuceneIndexStoreImpl; -import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; -import com.slack.kaldb.writer.LogMessageWriterImpl; -import com.slack.service.murron.Murron; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.text.SimpleDateFormat; -import java.time.Duration; -import java.util.Comparator; -import java.util.Random; -import java.util.stream.Stream; -import org.apache.commons.io.FileUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.record.TimestampType; -import org.apache.lucene.store.Directory; -import org.openjdk.jmh.annotations.*; - -@State(Scope.Thread) -public class IndexAPILog { - - private Random random; - private final Duration commitInterval = Duration.ofSeconds(5 * 60); - private final Duration refreshInterval = Duration.ofSeconds(5 * 60); - - private Path tempDirectory; - private MeterRegistry registry; - LuceneIndexStoreImpl logStore; - - private String apiLogFile; - private BufferedReader reader; - private static SimpleDateFormat df = new SimpleDateFormat("yyyy-mm-ddHH:mm:ss.SSSzzz"); - private int skipCount; - private int indexCount; - - @Setup(Level.Iteration) - public void createIndexer() throws Exception { - random = new Random(); - registry = new SimpleMeterRegistry(); - tempDirectory = - Files.createDirectories( - Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); - logStore = - LuceneIndexStoreImpl.makeLogStore( - tempDirectory.toFile(), - commitInterval, - refreshInterval, - true, - SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, - registry); - - apiLogFile = System.getProperty("jmh.api.log.file", "api_logs.txt"); - reader = Files.newBufferedReader(Path.of(apiLogFile)); - skipCount = 0; - indexCount = 0; - } - - @TearDown(Level.Iteration) - public void tearDown() throws IOException { - Directory directory = logStore.getIndexWriter().getDirectory(); - String[] segmentFiles = directory.listAll(); - long indexedBytes = 0; - for (String segmentFile : segmentFiles) { - indexedBytes += directory.fileLength(segmentFile); - } - if (indexCount != 0) { - // Displaying indexCount only makes sense in measureAPILogIndexingSlingshotMode - System.out.println( - "Indexed = " - + indexCount - + " Skipped = " - + skipCount - + " Index size = " - + FileUtils.byteCountToDisplaySize(indexedBytes)); - } else { - System.out.println( - "Skipped = " - + skipCount - + " Index size = " - + FileUtils.byteCountToDisplaySize(indexedBytes)); - } - logStore.close(); - try (Stream walk = Files.walk(tempDirectory)) { - walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); - } - registry.close(); - if (reader != null) { - reader.close(); - } - } - - @Benchmark - public void measureAPILogIndexing() throws IOException { - String line = reader.readLine(); - if (line != null) { - // Work that ideally shouldn't count towards benchmark performance result - ConsumerRecord kafkaRecord = makeConsumerRecord(line); - if (kafkaRecord == null) { - // makeConsumerRecord will print why we skipped - return; - } - // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic - try { - LogMessage localLogMessage = - LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - logStore.addMessage(localLogMessage); - } catch (Exception e) { - System.out.println("skipping - cannot transform " + e); - skipCount++; - } - } else { - System.out.println("resetting - reach EOF"); - reader = Files.newBufferedReader(Path.of(apiLogFile)); - } - } - - @Benchmark - public void measureAPILogIndexingSlingshotMode() throws IOException { - String line; - do { - line = reader.readLine(); - if (line != null) { - // Work that ideally shouldn't count towards benchmark performance result - ConsumerRecord kafkaRecord = makeConsumerRecord(line); - if (kafkaRecord == null) { - // makeConsumerRecord will print why we skipped - continue; - } - // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic - try { - LogMessage localLogMessage = - LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - logStore.addMessage(localLogMessage); - indexCount++; - } catch (Exception e) { - System.out.println("skipping - cannot transform " + e); - } - } - } while (line != null); - } - - public ConsumerRecord makeConsumerRecord(String line) { - try { - // get start of messageBody - int messageDivision = line.indexOf("{"); - - // Everything will there is metadata - String[] splitLine = line.substring(0, messageDivision - 1).split("\\s+"); - String ts = splitLine[0] + splitLine[1] + splitLine[2] + splitLine[3]; - long timestamp = df.parse(ts).toInstant().toEpochMilli(); - - String message = line.substring(messageDivision); - Murron.MurronMessage testMurronMsg = - Murron.MurronMessage.newBuilder() - .setMessage(ByteString.copyFrom((message).getBytes(StandardCharsets.UTF_8))) - .setType(splitLine[5]) - .setHost(splitLine[4]) - .setTimestamp(timestamp) - .build(); - return new ConsumerRecord<>( - "testTopic", - 1, - 10, - 0L, - TimestampType.CREATE_TIME, - 0L, - 0, - 0, - "testKey", - testMurronMsg.toByteString().toByteArray()); - } catch (Exception e) { - System.out.println("skipping - cannot parse input" + e); - skipCount++; - return null; - } - } -} +// package com.slack.kaldb; +// +// import com.google.protobuf.ByteString; +// import com.slack.kaldb.logstore.LogMessage; +// import com.slack.kaldb.logstore.LuceneIndexStoreImpl; +// import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; +// import com.slack.kaldb.writer.LogMessageWriterImpl; +// import com.slack.service.murron.Murron; +// import io.micrometer.core.instrument.MeterRegistry; +// import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +// import java.io.BufferedReader; +// import java.io.File; +// import java.io.IOException; +// import java.nio.charset.StandardCharsets; +// import java.nio.file.Files; +// import java.nio.file.Path; +// import java.nio.file.Paths; +// import java.text.SimpleDateFormat; +// import java.time.Duration; +// import java.util.Comparator; +// import java.util.Random; +// import java.util.stream.Stream; +// import org.apache.commons.io.FileUtils; +// import org.apache.kafka.clients.consumer.ConsumerRecord; +// import org.apache.kafka.common.record.TimestampType; +// import org.apache.lucene.store.Directory; +// import org.openjdk.jmh.annotations.*; +// +// @State(Scope.Thread) +// public class IndexAPILog { +// +// private Random random; +// private final Duration commitInterval = Duration.ofSeconds(5 * 60); +// private final Duration refreshInterval = Duration.ofSeconds(5 * 60); +// +// private Path tempDirectory; +// private MeterRegistry registry; +// LuceneIndexStoreImpl logStore; +// +// private String apiLogFile; +// private BufferedReader reader; +// private static SimpleDateFormat df = new SimpleDateFormat("yyyy-mm-ddHH:mm:ss.SSSzzz"); +// private int skipCount; +// private int indexCount; +// +// @Setup(Level.Iteration) +// public void createIndexer() throws Exception { +// random = new Random(); +// registry = new SimpleMeterRegistry(); +// tempDirectory = +// Files.createDirectories( +// Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); +// logStore = +// LuceneIndexStoreImpl.makeLogStore( +// tempDirectory.toFile(), +// commitInterval, +// refreshInterval, +// true, +// +// SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, +// registry); +// +// apiLogFile = System.getProperty("jmh.api.log.file", "api_logs.txt"); +// reader = Files.newBufferedReader(Path.of(apiLogFile)); +// skipCount = 0; +// indexCount = 0; +// } +// +// @TearDown(Level.Iteration) +// public void tearDown() throws IOException { +// Directory directory = logStore.getIndexWriter().getDirectory(); +// String[] segmentFiles = directory.listAll(); +// long indexedBytes = 0; +// for (String segmentFile : segmentFiles) { +// indexedBytes += directory.fileLength(segmentFile); +// } +// if (indexCount != 0) { +// // Displaying indexCount only makes sense in measureAPILogIndexingSlingshotMode +// System.out.println( +// "Indexed = " +// + indexCount +// + " Skipped = " +// + skipCount +// + " Index size = " +// + FileUtils.byteCountToDisplaySize(indexedBytes)); +// } else { +// System.out.println( +// "Skipped = " +// + skipCount +// + " Index size = " +// + FileUtils.byteCountToDisplaySize(indexedBytes)); +// } +// logStore.close(); +// try (Stream walk = Files.walk(tempDirectory)) { +// walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); +// } +// registry.close(); +// if (reader != null) { +// reader.close(); +// } +// } +// +// @Benchmark +// public void measureAPILogIndexing() throws IOException { +// String line = reader.readLine(); +// if (line != null) { +// // Work that ideally shouldn't count towards benchmark performance result +// ConsumerRecord kafkaRecord = makeConsumerRecord(line); +// if (kafkaRecord == null) { +// // makeConsumerRecord will print why we skipped +// return; +// } +// // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic +// try { +// LogMessage localLogMessage = +// LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); +// logStore.addMessage(localLogMessage); +// } catch (Exception e) { +// System.out.println("skipping - cannot transform " + e); +// skipCount++; +// } +// } else { +// System.out.println("resetting - reach EOF"); +// reader = Files.newBufferedReader(Path.of(apiLogFile)); +// } +// } +// +// @Benchmark +// public void measureAPILogIndexingSlingshotMode() throws IOException { +// String line; +// do { +// line = reader.readLine(); +// if (line != null) { +// // Work that ideally shouldn't count towards benchmark performance result +// ConsumerRecord kafkaRecord = makeConsumerRecord(line); +// if (kafkaRecord == null) { +// // makeConsumerRecord will print why we skipped +// continue; +// } +// // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic +// try { +// LogMessage localLogMessage = +// LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); +// logStore.addMessage(localLogMessage); +// indexCount++; +// } catch (Exception e) { +// System.out.println("skipping - cannot transform " + e); +// } +// } +// } while (line != null); +// } +// +// public ConsumerRecord makeConsumerRecord(String line) { +// try { +// // get start of messageBody +// int messageDivision = line.indexOf("{"); +// +// // Everything will there is metadata +// String[] splitLine = line.substring(0, messageDivision - 1).split("\\s+"); +// String ts = splitLine[0] + splitLine[1] + splitLine[2] + splitLine[3]; +// long timestamp = df.parse(ts).toInstant().toEpochMilli(); +// +// String message = line.substring(messageDivision); +// Murron.MurronMessage testMurronMsg = +// Murron.MurronMessage.newBuilder() +// .setMessage(ByteString.copyFrom((message).getBytes(StandardCharsets.UTF_8))) +// .setType(splitLine[5]) +// .setHost(splitLine[4]) +// .setTimestamp(timestamp) +// .build(); +// return new ConsumerRecord<>( +// "testTopic", +// 1, +// 10, +// 0L, +// TimestampType.CREATE_TIME, +// 0L, +// 0, +// 0, +// "testKey", +// testMurronMsg.toByteString().toByteArray()); +// } catch (Exception e) { +// System.out.println("skipping - cannot parse input" + e); +// skipCount++; +// return null; +// } +// } +// } diff --git a/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java index 6b9bbd8fce..4dd2b455b2 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java +++ b/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java @@ -4,11 +4,11 @@ import com.google.protobuf.ByteString; import com.slack.kaldb.logstore.DocumentBuilder; -import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.LuceneIndexStoreImpl; import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; -import com.slack.kaldb.writer.LogMessageWriterImpl; +import com.slack.kaldb.writer.MurronLogFormatter; import com.slack.service.murron.Murron; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.File; @@ -21,8 +21,6 @@ import java.util.Comparator; import java.util.Random; import java.util.stream.Stream; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.record.TimestampType; import org.apache.lucene.document.*; import org.apache.lucene.index.IndexWriter; import org.openjdk.jmh.annotations.*; @@ -37,9 +35,7 @@ public class IndexingBenchmark { private MeterRegistry registry; LuceneIndexStoreImpl logStore; private Random random; - - private ConsumerRecord kafkaRecord; - private LogMessage logMessage; + private Trace.Span logMessage; private Document luceneDocument; @Setup(Level.Iteration) @@ -130,22 +126,9 @@ public void createIndexer() throws Exception { .setTimestamp(timestamp) .build(); - kafkaRecord = - new ConsumerRecord<>( - "testTopic", - 1, - 10, - 0L, - TimestampType.CREATE_TIME, - 0L, - 0, - 0, - "testKey", - testMurronMsg.toByteString().toByteArray()); - - logMessage = LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); + logMessage = MurronLogFormatter.fromApiLog(testMurronMsg); - DocumentBuilder documentBuilder = + DocumentBuilder documentBuilder = SchemaAwareLogDocumentBuilderImpl.build(CONVERT_VALUE_AND_DUPLICATE_FIELD, true, registry); luceneDocument = documentBuilder.fromMessage(logMessage); @@ -160,14 +143,6 @@ public void tearDown() throws IOException { registry.close(); } - @Benchmark - public void measureIndexingAsKafkaSerializedDocument() throws Exception { - // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic - LogMessage localLogMessage = - LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - logStore.addMessage(localLogMessage); - } - @Benchmark public void measureIndexingAsLogMessage() { logStore.addMessage(logMessage); diff --git a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java index 19c28c65de..57a0a46cdd 100644 --- a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java +++ b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java @@ -1,169 +1,170 @@ -package com.slack.kaldb; - -import brave.Tracer; -import brave.Tracing; -import com.google.protobuf.ByteString; -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LuceneIndexStoreImpl; -import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; -import com.slack.kaldb.logstore.search.LogIndexSearcher; -import com.slack.kaldb.logstore.search.LogIndexSearcherImpl; -import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; -import com.slack.kaldb.writer.LogMessageWriterImpl; -import com.slack.service.murron.Murron; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.text.SimpleDateFormat; -import java.time.Duration; -import java.util.Comparator; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; -import java.util.stream.Stream; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.record.TimestampType; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; - -@State(Scope.Thread) -public class QueryBenchmark { - private final Duration commitInterval = Duration.ofSeconds(5 * 60); - private final Duration refreshInterval = Duration.ofSeconds(5 * 60); - - private Path tempDirectory; - private MeterRegistry registry; - LuceneIndexStoreImpl logStore; - - private static final SimpleDateFormat df = new SimpleDateFormat("yyyy-mm-ddHH:mm:ss.SSSzzz"); - private LogIndexSearcher logIndexSearcher; - - @Setup(Level.Trial) - public void createIndexer() throws Exception { - // active tracer required for search currently - Tracing.newBuilder().build(); - - // raises logging level of Tracer to Warning, to prevent excessive log messages - final Logger logger = Logger.getLogger(Tracer.class.getName()); - logger.setLevel(java.util.logging.Level.WARNING); - - Random random = new Random(); - registry = new SimpleMeterRegistry(); - tempDirectory = - Files.createDirectories( - Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); - logStore = - LuceneIndexStoreImpl.makeLogStore( - tempDirectory.toFile(), - commitInterval, - refreshInterval, - true, - SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, - registry); - - String apiLogFile = System.getProperty("jmh.api.log.file", "api_logs.txt"); - - // startup multi-threaded log message population - ExecutorService executorService = Executors.newFixedThreadPool(6); - try (BufferedReader reader = Files.newBufferedReader(Path.of(apiLogFile))) { - String line; - do { - line = reader.readLine(); - if (line != null) { - String finalLine = line; - executorService.submit( - () -> { - // Work that ideally shouldn't count towards benchmark performance result - ConsumerRecord kafkaRecord = makeConsumerRecord(finalLine); - if (kafkaRecord == null) { - // makeConsumerRecord will print why we skipped - return; - } - // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic - try { - LogMessage localLogMessage = - LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - logStore.addMessage(localLogMessage); - } catch (Exception e) { - // ignored - } - }); - } - } while (line != null); - } - executorService.shutdown(); - executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - - logStore.commit(); - logStore.refresh(); - logIndexSearcher = - new LogIndexSearcherImpl(logStore.getSearcherManager(), logStore.getSchema()); - } - - @TearDown(Level.Trial) - public void tearDown() throws IOException { - logStore.close(); - try (Stream walk = Files.walk(tempDirectory)) { - walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); - } - registry.close(); - } - - @Benchmark - public void measureLogSearcherSearch() { - logIndexSearcher.search( - "*", - "", - 0L, - Long.MAX_VALUE, - 500, - new DateHistogramAggBuilder( - "1", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, "100d")); - } - - public ConsumerRecord makeConsumerRecord(String line) { - try { - // get start of messageBody - int messageDivision = line.indexOf("{"); - - // Everything will there is metadata - String[] splitLine = line.substring(0, messageDivision - 1).split("\\s+"); - String ts = splitLine[0] + splitLine[1] + splitLine[2] + splitLine[3]; - long timestamp = df.parse(ts).toInstant().toEpochMilli(); - - String message = line.substring(messageDivision); - Murron.MurronMessage testMurronMsg = - Murron.MurronMessage.newBuilder() - .setMessage(ByteString.copyFrom((message).getBytes(StandardCharsets.UTF_8))) - .setType(splitLine[5]) - .setHost(splitLine[4]) - .setTimestamp(timestamp) - .build(); - return new ConsumerRecord<>( - "testTopic", - 1, - 10, - 0L, - TimestampType.CREATE_TIME, - 0L, - 0, - 0, - "testKey", - testMurronMsg.toByteString().toByteArray()); - } catch (Exception e) { - return null; - } - } -} +// package com.slack.kaldb; +// +// import brave.Tracer; +// import brave.Tracing; +// import com.google.protobuf.ByteString; +// import com.slack.kaldb.logstore.LogMessage; +// import com.slack.kaldb.logstore.LuceneIndexStoreImpl; +// import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; +// import com.slack.kaldb.logstore.search.LogIndexSearcher; +// import com.slack.kaldb.logstore.search.LogIndexSearcherImpl; +// import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; +// import com.slack.kaldb.writer.LogMessageWriterImpl; +// import com.slack.service.murron.Murron; +// import io.micrometer.core.instrument.MeterRegistry; +// import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +// import java.io.BufferedReader; +// import java.io.File; +// import java.io.IOException; +// import java.nio.charset.StandardCharsets; +// import java.nio.file.Files; +// import java.nio.file.Path; +// import java.nio.file.Paths; +// import java.text.SimpleDateFormat; +// import java.time.Duration; +// import java.util.Comparator; +// import java.util.Random; +// import java.util.concurrent.ExecutorService; +// import java.util.concurrent.Executors; +// import java.util.concurrent.TimeUnit; +// import java.util.logging.Logger; +// import java.util.stream.Stream; +// import org.apache.kafka.clients.consumer.ConsumerRecord; +// import org.apache.kafka.common.record.TimestampType; +// import org.openjdk.jmh.annotations.Benchmark; +// import org.openjdk.jmh.annotations.Level; +// import org.openjdk.jmh.annotations.Scope; +// import org.openjdk.jmh.annotations.Setup; +// import org.openjdk.jmh.annotations.State; +// import org.openjdk.jmh.annotations.TearDown; +// +// @State(Scope.Thread) +// public class QueryBenchmark { +// private final Duration commitInterval = Duration.ofSeconds(5 * 60); +// private final Duration refreshInterval = Duration.ofSeconds(5 * 60); +// +// private Path tempDirectory; +// private MeterRegistry registry; +// LuceneIndexStoreImpl logStore; +// +// private static final SimpleDateFormat df = new SimpleDateFormat("yyyy-mm-ddHH:mm:ss.SSSzzz"); +// private LogIndexSearcher logIndexSearcher; +// +// @Setup(Level.Trial) +// public void createIndexer() throws Exception { +// // active tracer required for search currently +// Tracing.newBuilder().build(); +// +// // raises logging level of Tracer to Warning, to prevent excessive log messages +// final Logger logger = Logger.getLogger(Tracer.class.getName()); +// logger.setLevel(java.util.logging.Level.WARNING); +// +// Random random = new Random(); +// registry = new SimpleMeterRegistry(); +// tempDirectory = +// Files.createDirectories( +// Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); +// logStore = +// LuceneIndexStoreImpl.makeLogStore( +// tempDirectory.toFile(), +// commitInterval, +// refreshInterval, +// true, +// +// SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, +// registry); +// +// String apiLogFile = System.getProperty("jmh.api.log.file", "api_logs.txt"); +// +// // startup multi-threaded log message population +// ExecutorService executorService = Executors.newFixedThreadPool(6); +// try (BufferedReader reader = Files.newBufferedReader(Path.of(apiLogFile))) { +// String line; +// do { +// line = reader.readLine(); +// if (line != null) { +// String finalLine = line; +// executorService.submit( +// () -> { +// // Work that ideally shouldn't count towards benchmark performance result +// ConsumerRecord kafkaRecord = makeConsumerRecord(finalLine); +// if (kafkaRecord == null) { +// // makeConsumerRecord will print why we skipped +// return; +// } +// // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic +// try { +// LogMessage localLogMessage = +// LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); +// logStore.addMessage(localLogMessage); +// } catch (Exception e) { +// // ignored +// } +// }); +// } +// } while (line != null); +// } +// executorService.shutdown(); +// executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); +// +// logStore.commit(); +// logStore.refresh(); +// logIndexSearcher = +// new LogIndexSearcherImpl(logStore.getSearcherManager(), logStore.getSchema()); +// } +// +// @TearDown(Level.Trial) +// public void tearDown() throws IOException { +// logStore.close(); +// try (Stream walk = Files.walk(tempDirectory)) { +// walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); +// } +// registry.close(); +// } +// +// @Benchmark +// public void measureLogSearcherSearch() { +// logIndexSearcher.search( +// "*", +// "", +// 0L, +// Long.MAX_VALUE, +// 500, +// new DateHistogramAggBuilder( +// "1", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, "100d")); +// } +// +// public ConsumerRecord makeConsumerRecord(String line) { +// try { +// // get start of messageBody +// int messageDivision = line.indexOf("{"); +// +// // Everything will there is metadata +// String[] splitLine = line.substring(0, messageDivision - 1).split("\\s+"); +// String ts = splitLine[0] + splitLine[1] + splitLine[2] + splitLine[3]; +// long timestamp = df.parse(ts).toInstant().toEpochMilli(); +// +// String message = line.substring(messageDivision); +// Murron.MurronMessage testMurronMsg = +// Murron.MurronMessage.newBuilder() +// .setMessage(ByteString.copyFrom((message).getBytes(StandardCharsets.UTF_8))) +// .setType(splitLine[5]) +// .setHost(splitLine[4]) +// .setTimestamp(timestamp) +// .build(); +// return new ConsumerRecord<>( +// "testTopic", +// 1, +// 10, +// 0L, +// TimestampType.CREATE_TIME, +// 0L, +// 0, +// 0, +// "testKey", +// testMurronMsg.toByteString().toByteArray()); +// } catch (Exception e) { +// return null; +// } +// } +// } diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java index 79adfe31be..b79c8fe252 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java @@ -6,7 +6,6 @@ import com.google.common.annotations.VisibleForTesting; import com.slack.kaldb.blobfs.BlobFs; -import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.LogStore; import com.slack.kaldb.logstore.LuceneIndexStoreImpl; import com.slack.kaldb.logstore.search.LogIndexSearcher; @@ -19,6 +18,8 @@ import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadata; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; +import com.slack.kaldb.writer.SpanFormatter; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; @@ -141,7 +142,7 @@ public static SearchMetadata toSearchMetadata(String snapshotName, SearchContext } /** Index the message in the logstore and update the chunk data time range. */ - public void addMessage(T message, String kafkaPartitionId, long offset) { + public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) { if (!this.kafkaPartitionId.equals(kafkaPartitionId)) { throw new IllegalArgumentException( "All messages for this chunk should belong to partition: " @@ -151,13 +152,12 @@ public void addMessage(T message, String kafkaPartitionId, long offset) { } if (!readOnly) { logStore.addMessage(message); - // Update the chunk with the time range of the data in the chunk. - // TODO: This type conversion is a temporary hack, fix it by adding timestamp field to the - // message. - if (message instanceof LogMessage) { - chunkInfo.updateDataTimeRange(((LogMessage) message).getTimestamp().toEpochMilli()); - chunkInfo.updateMaxOffset(offset); - } + + chunkInfo.updateDataTimeRange(Instant.ofEpochMilli(message.getTimestamp() / 1000).toEpochMilli()); + // if we do this i.e also validate the timestamp tests + // that use dates from 2020 start failing so not touching this logic for now + // chunkInfo.updateDataTimeRange(SpanFormatter.getTimestampFromSpan(message).toEpochMilli()); + chunkInfo.updateMaxOffset(offset); } else { throw new IllegalStateException(String.format("Chunk %s is read only", chunkInfo)); } diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java index edb60ce143..d898dbee8f 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java @@ -9,6 +9,7 @@ import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import java.io.IOException; import org.apache.curator.x.async.AsyncCuratorFramework; @@ -120,7 +121,7 @@ public static CachingChunkManager fromConfig( } @Override - public void addMessage(T message, long msgSize, String kafkaPartitionId, long offset) + public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset) throws IOException { throw new UnsupportedOperationException( "Adding messages is not supported on a caching chunk manager"); diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManager.java index 68f8019bce..75751ad375 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManager.java @@ -3,12 +3,14 @@ import com.slack.kaldb.logstore.search.SearchQuery; import com.slack.kaldb.logstore.search.SearchResult; import com.slack.kaldb.metadata.schema.FieldType; +import com.slack.service.murron.trace.Trace; import java.io.IOException; import java.time.Duration; import java.util.Map; public interface ChunkManager { - void addMessage(T message, long msgSize, String kafkaPartitionId, long offset) throws IOException; + void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + throws IOException; SearchResult query(SearchQuery query, Duration queryTimeout); diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java index 7756cd5829..0ab3f9561c 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java @@ -25,6 +25,7 @@ import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import java.io.File; import java.io.IOException; @@ -166,7 +167,8 @@ public IndexingChunkManager( *

TODO: Indexer should stop cleanly if the roll over fails or an exception. */ @Override - public void addMessage(final T message, long msgSize, String kafkaPartitionId, long offset) + public void addMessage( + final Trace.Span message, long msgSize, String kafkaPartitionId, long offset) throws IOException { if (stopIngestion) { // Currently, this flag is set on only a chunkRollOverException. diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/RecoveryChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/RecoveryChunkManager.java index f7fc03fc30..e8819fe8c1 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/RecoveryChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/RecoveryChunkManager.java @@ -19,6 +19,7 @@ import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import java.io.IOException; import java.time.Instant; @@ -78,7 +79,8 @@ public RecoveryChunkManager( } @Override - public void addMessage(final T message, long msgSize, String kafkaPartitionId, long offset) + public void addMessage( + final Trace.Span message, long msgSize, String kafkaPartitionId, long offset) throws IOException { if (readOnly) { LOG.warn("Ingestion is stopped since the chunk is in read only mode."); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/DocumentBuilder.java b/kaldb/src/main/java/com/slack/kaldb/logstore/DocumentBuilder.java index 8b1d262598..621a43a794 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/DocumentBuilder.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/DocumentBuilder.java @@ -1,6 +1,7 @@ package com.slack.kaldb.logstore; import com.slack.kaldb.metadata.schema.LuceneFieldDef; +import com.slack.service.murron.trace.Trace; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import org.apache.lucene.document.Document; @@ -9,8 +10,8 @@ * DocumentBuilder defines the interfaces for classes that generate Lucene documents out of * messages. */ -public interface DocumentBuilder { - Document fromMessage(T message) throws IOException; +public interface DocumentBuilder { + Document fromMessage(Trace.Span message) throws IOException; ConcurrentHashMap getSchema(); } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java index af86bcfc10..4fa1d6c2f1 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java @@ -1,6 +1,7 @@ package com.slack.kaldb.logstore; import com.slack.kaldb.metadata.schema.LuceneFieldDef; +import com.slack.service.murron.trace.Trace; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @@ -11,7 +12,7 @@ /* An interface that implements a read and write interface for the LogStore */ public interface LogStore extends Closeable { - void addMessage(T message); + void addMessage(Trace.Span message); // TODO: Instead of exposing the searcherManager, consider returning an instance of the searcher. SearcherManager getSearcherManager(); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index f60d8a93b6..add4090e02 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -4,6 +4,7 @@ import com.slack.kaldb.metadata.schema.LuceneFieldDef; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.util.RuntimeHalterImpl; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import java.io.File; @@ -52,7 +53,7 @@ public class LuceneIndexStoreImpl implements LogStore { public static final String FINAL_MERGES_TIMER = "kaldb_index_final_merges"; private final SearcherManager searcherManager; - private final DocumentBuilder documentBuilder; + private final DocumentBuilder documentBuilder; private final FSDirectory indexDirectory; private final Timer timer; private final SnapshotDeletionPolicy snapshotDeletionPolicy; @@ -108,9 +109,7 @@ public static LuceneIndexStoreImpl makeLogStore( } public LuceneIndexStoreImpl( - LuceneIndexStoreConfig config, - DocumentBuilder documentBuilder, - MeterRegistry registry) + LuceneIndexStoreConfig config, DocumentBuilder documentBuilder, MeterRegistry registry) throws IOException { this.documentBuilder = documentBuilder; @@ -254,7 +253,7 @@ private void handleNonFatal(Throwable ex) { } @Override - public void addMessage(LogMessage message) { + public void addMessage(Trace.Span message) { try { messagesReceivedCounter.increment(); if (indexWriter.isPresent()) { diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java index d98dd18b2e..08e0092c72 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java @@ -1,19 +1,30 @@ package com.slack.kaldb.logstore.schema; +import static com.slack.kaldb.writer.SpanFormatter.DEFAULT_INDEX_NAME; +import static com.slack.kaldb.writer.SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE; + import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.slack.kaldb.logstore.DocumentBuilder; import com.slack.kaldb.logstore.FieldDefMismatchException; import com.slack.kaldb.logstore.LogMessage; +import com.slack.kaldb.logstore.LogWireMessage; import com.slack.kaldb.metadata.schema.FieldType; import com.slack.kaldb.metadata.schema.LuceneFieldDef; import com.slack.kaldb.util.JsonUtil; +import com.slack.kaldb.writer.SpanFormatter; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.apache.logging.log4j.util.Strings; import org.apache.lucene.document.Document; import org.slf4j.Logger; @@ -31,7 +42,7 @@ * rarely an issue and helps with performance. If this is an issue, we need to scan the json twice * to ensure document is good to index. */ -public class SchemaAwareLogDocumentBuilderImpl implements DocumentBuilder { +public class SchemaAwareLogDocumentBuilderImpl implements DocumentBuilder { private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareLogDocumentBuilderImpl.class); @@ -347,31 +358,144 @@ public static SchemaAwareLogDocumentBuilderImpl build( } @Override - public Document fromMessage(LogMessage message) throws JsonProcessingException { + public Document fromMessage(Trace.Span message) throws JsonProcessingException { Document doc = new Document(); - addField(doc, LogMessage.SystemField.INDEX.fieldName, message.getIndex(), "", 0); + + // today we rely on source to construct the document at search time so need to keep in + // consistent for now + Map jsonMap = new HashMap<>(); + if (!message.getParentId().isEmpty()) { + jsonMap.put( + LogMessage.ReservedField.PARENT_ID.fieldName, message.getParentId().toStringUtf8()); + addField( + doc, + LogMessage.ReservedField.PARENT_ID.fieldName, + message.getParentId().toStringUtf8(), + "", + 0); + } + if (!message.getTraceId().isEmpty()) { + jsonMap.put(LogMessage.ReservedField.TRACE_ID.fieldName, message.getTraceId().toStringUtf8()); + addField( + doc, + LogMessage.ReservedField.TRACE_ID.fieldName, + message.getTraceId().toStringUtf8(), + "", + 0); + } + if (!message.getName().isEmpty()) { + jsonMap.put(LogMessage.ReservedField.NAME.fieldName, message.getName()); + addField(doc, LogMessage.ReservedField.NAME.fieldName, message.getName(), "", 0); + } + if (message.getDuration() != 0) { + jsonMap.put( + LogMessage.ReservedField.DURATION_MS.fieldName, + Duration.of(message.getDuration(), ChronoUnit.MICROS).toMillis()); + addField( + doc, + LogMessage.ReservedField.DURATION_MS.fieldName, + Duration.of(message.getDuration(), ChronoUnit.MICROS).toMillis(), + "", + 0); + } + if (!message.getId().isEmpty()) { + jsonMap.put(LogMessage.SystemField.ID.fieldName, message.getId().toStringUtf8()); + addField(doc, LogMessage.SystemField.ID.fieldName, message.getId().toStringUtf8(), "", 0); + } else { + throw new IllegalArgumentException("Span id is empty"); + } + + Instant timestamp = SpanFormatter.getTimestampFromSpan(message); addField( - doc, - LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, - message.getTimestamp().toEpochMilli(), - "", - 0); - addField(doc, LogMessage.ReservedField.TYPE.fieldName, message.getType(), "", 0); - addField(doc, LogMessage.SystemField.ID.fieldName, message.getId(), "", 0); - - final String msgString = JsonUtil.writeAsString(message.toWireMessage()); + doc, LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toEpochMilli(), "", 0); + + Map tags = + message.getTagsList().stream() + .map(keyValue -> Map.entry(keyValue.getKey(), keyValue)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // TODO: this should just be top level Trace.Span fields. This is error prone - what if type is + // not a string? + String indexName = + tags.containsKey(LogMessage.ReservedField.SERVICE_NAME.fieldName) + ? tags.get(LogMessage.ReservedField.SERVICE_NAME.fieldName).getVStr() + : DEFAULT_INDEX_NAME; + String msgType = + tags.containsKey(LogMessage.ReservedField.TYPE.fieldName) + ? tags.get(LogMessage.ReservedField.TYPE.fieldName).getVStr() + : DEFAULT_LOG_MESSAGE_TYPE; + + jsonMap.put(LogMessage.ReservedField.TYPE.fieldName, msgType); + addField(doc, LogMessage.ReservedField.TYPE.fieldName, msgType, "", 0); + + jsonMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName); + addField(doc, LogMessage.SystemField.INDEX.fieldName, indexName, "", 0); + addField(doc, LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName, "", 0); + + tags.remove(LogMessage.ReservedField.SERVICE_NAME.fieldName); + tags.remove(LogMessage.ReservedField.TYPE.fieldName); + + for (Trace.KeyValue keyValue : tags.values()) { + if (keyValue.getVType() == Trace.ValueType.STRING) { + addField(doc, keyValue.getKey(), keyValue.getVStr(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVStr()); + } else if (keyValue.getVType() == Trace.ValueType.BOOL) { + addField(doc, keyValue.getKey(), keyValue.getVBool(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVBool()); + } else if (keyValue.getVType() == Trace.ValueType.INT64) { + addField(doc, keyValue.getKey(), keyValue.getVInt64(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVInt64()); + } else if (keyValue.getVType() == Trace.ValueType.FLOAT64) { + addField(doc, keyValue.getKey(), keyValue.getVFloat64(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVFloat64()); + } else if (keyValue.getVType() == Trace.ValueType.BINARY) { + addField(doc, keyValue.getKey(), keyValue.getVBinary().toStringUtf8(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVBinary().toStringUtf8()); + } else { + LOG.warn( + "Skipping field with unknown value type {} with key {}", + keyValue.getVType(), + keyValue.getKey()); + } + } + + LogWireMessage logWireMessage = + new LogWireMessage(indexName, msgType, message.getId().toStringUtf8(), timestamp, jsonMap); + final String msgString = JsonUtil.writeAsString(logWireMessage); addField(doc, LogMessage.SystemField.SOURCE.fieldName, msgString, "", 0); if (enableFullTextSearch) { addField(doc, LogMessage.SystemField.ALL.fieldName, msgString, "", 0); } - for (String key : message.getSource().keySet()) { - addField(doc, key, message.getSource().get(key), "", 0); - } - LOG.trace("Lucene document {} for message {}", doc, message); return doc; } + // @Override + // public Document fromMessage(Trace.Span message) throws JsonProcessingException { + // Document doc = new Document(); + // addField(doc, LogMessage.SystemField.INDEX.fieldName, message.getIndex(), "", 0); + // addField( + // doc, + // LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, + // message.getTimestamp().toEpochMilli(), + // "", + // 0); + // addField(doc, LogMessage.ReservedField.TYPE.fieldName, message.getType(), "", 0); + // addField(doc, LogMessage.SystemField.ID.fieldName, message.getId(), "", 0); + // + // final String msgString = JsonUtil.writeAsString(message.toWireMessage()); + // addField(doc, LogMessage.SystemField.SOURCE.fieldName, msgString, "", 0); + // if (enableFullTextSearch) { + // addField(doc, LogMessage.SystemField.ALL.fieldName, msgString, "", 0); + // } + // + // for (String key : message.getSource().keySet()) { + // addField(doc, key, message.getSource().get(key), "", 0); + // } + // LOG.trace("Lucene document {} for message {}", doc, message); + // return doc; + // } + @Override public ConcurrentHashMap getSchema() { return fieldDefMap; diff --git a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java index 1bc97b4d12..4ec16684db 100644 --- a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java @@ -1,7 +1,6 @@ package com.slack.kaldb.recovery; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.server.ValidateKaldbConfig.INDEXER_DATA_TRANSFORMER_MAP; import static com.slack.kaldb.util.TimeUtils.nanosToMillis; import com.google.common.annotations.VisibleForTesting; @@ -22,7 +21,6 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.proto.metadata.Metadata; -import com.slack.kaldb.writer.LogMessageTransformer; import com.slack.kaldb.writer.LogMessageWriterImpl; import com.slack.kaldb.writer.kafka.KaldbKafkaConsumer; import io.micrometer.core.instrument.Counter; @@ -305,10 +303,7 @@ boolean handleRecoveryTask(RecoveryTaskMetadata recoveryTaskMetadata) { kaldbConfig.getS3Config()); // Ingest data in parallel - LogMessageTransformer messageTransformer = - INDEXER_DATA_TRANSFORMER_MAP.get(kaldbConfig.getIndexerConfig().getDataTransformer()); - LogMessageWriterImpl logMessageWriterImpl = - new LogMessageWriterImpl(chunkManager, messageTransformer); + LogMessageWriterImpl logMessageWriterImpl = new LogMessageWriterImpl(chunkManager); KaldbKafkaConsumer kafkaConsumer = new KaldbKafkaConsumer( makeKafkaConfig( diff --git a/kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java b/kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java index 5da436f389..0cc80fa170 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java @@ -2,7 +2,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.server.ValidateKaldbConfig.INDEXER_DATA_TRANSFORMER_MAP; import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.slack.kaldb.chunkManager.ChunkRollOverException; @@ -12,7 +11,6 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.util.RuntimeHalterImpl; -import com.slack.kaldb.writer.LogMessageTransformer; import com.slack.kaldb.writer.LogMessageWriterImpl; import com.slack.kaldb.writer.kafka.KaldbKafkaConsumer; import io.micrometer.core.instrument.MeterRegistry; @@ -67,10 +65,7 @@ public KaldbIndexer( // Create a chunk manager this.chunkManager = chunkManager; // set up indexing pipelne - LogMessageTransformer messageTransformer = - INDEXER_DATA_TRANSFORMER_MAP.get(indexerConfig.getDataTransformer()); - LogMessageWriterImpl logMessageWriterImpl = - new LogMessageWriterImpl(chunkManager, messageTransformer); + LogMessageWriterImpl logMessageWriterImpl = new LogMessageWriterImpl(chunkManager); this.kafkaConsumer = new KaldbKafkaConsumer(kafkaConfig, logMessageWriterImpl, meterRegistry); } diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java b/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java index cae310b7ae..a273702400 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java @@ -6,7 +6,6 @@ import com.slack.service.murron.Murron; import com.slack.service.murron.trace.Trace; import java.io.IOException; -import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; @@ -74,37 +73,34 @@ public class LogMessageWriterImpl implements MessageWriter { }; private final ChunkManager chunkManager; - private final LogMessageTransformer dataTransformer; - public LogMessageWriterImpl( - ChunkManager chunkManager, LogMessageTransformer dataTransformer) { + public LogMessageWriterImpl(ChunkManager chunkManager) { this.chunkManager = chunkManager; - this.dataTransformer = dataTransformer; } @Override public boolean insertRecord(ConsumerRecord record) throws IOException { if (record == null) return false; - final List logMessages; + final Trace.ListOfSpans listOfSpans; try { - logMessages = this.dataTransformer.toLogMessage(record); - // Ideally, we should return true when logMessages are empty. But, fail the record, since we - // don't expect any empty records or we may have a bug in earlier code. - if (logMessages.isEmpty()) return false; + final Trace.Span span = Trace.Span.parseFrom(record.value()); + listOfSpans = Trace.ListOfSpans.newBuilder().addSpans(span).build(); + + if (listOfSpans.getSpansCount() == 0) return false; } catch (Exception e) { LOG.warn("Parsing consumer record: {} failed with an exception.", record, e); return false; } - final int avgMsgSize = record.serializedValueSize() / logMessages.size(); - for (LogMessage logMessage : logMessages) { + final int avgMsgSize = record.serializedValueSize() / listOfSpans.getSerializedSize(); + for (Trace.Span span : listOfSpans.getSpansList()) { // Currently, ChunkManager.addMessage increments a failure counter to indicate an ingestion // error. We decided to throw the exception to a higher level since in a batch ingestion // the upper layers of the stack can't take any further action. If this becomes an issue // in future, propagate the exception upwards here or return a value. chunkManager.addMessage( - logMessage, avgMsgSize, String.valueOf(record.partition()), record.offset()); + span, avgMsgSize, String.valueOf(record.partition()), record.offset()); } return true; } diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java index 188e25aa60..8809d15314 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java @@ -147,6 +147,15 @@ public static boolean isValidTimestamp(Instant timestamp) { return true; } + public static Instant getTimestampFromSpan(Trace.Span span) { + Instant timestamp = Instant.ofEpochMilli(span.getTimestamp() / 1000); + if (isValidTimestamp(timestamp)) { + return timestamp; + } else { + return Instant.now(); + } + } + // TODO: Make this function more memory efficient? public static LogMessage toLogMessage(Trace.Span span) { if (span == null) return null; diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java index 97e280dc06..3b5b7c5a37 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/IndexingChunkImplTest.java @@ -33,6 +33,7 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.testlib.MessageUtil; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; @@ -153,7 +154,7 @@ public void testAddAndSearchChunk() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -207,7 +208,7 @@ public void testAddAndSearchChunkInTimeRange() { MessageUtil.makeMessagesWithTimeDifference(1, 100, 1000, startTime); final long messageStartTimeMs = messages.get(0).getTimestamp().toEpochMilli(); int offset = 1; - for (LogMessage m : messages) { + for (Trace.Span m : MessageUtil.makeMessagesWithTimeDifference1(1, 100, 1000, startTime)) { chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); offset++; } @@ -249,7 +250,7 @@ public void testAddAndSearchChunkInTimeRange() { 1, 100, 1000, startTime.plus(2, ChronoUnit.DAYS)); final long newMessageStartTimeEpochMs = newMessages.get(0).getTimestamp().toEpochMilli(); for (LogMessage m : newMessages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -315,7 +316,7 @@ public void testSearchInReadOnlyChunk() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -348,7 +349,7 @@ public void testAddMessageToReadOnlyChunk() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -362,7 +363,7 @@ public void testAddMessageToReadOnlyChunk() { .isThrownBy( () -> chunk.addMessage( - MessageUtil.makeMessage(101), TEST_KAFKA_PARTITION_ID, finalOffset)); + MessageUtil.withMessageId(101), TEST_KAFKA_PARTITION_ID, finalOffset)); } @Test @@ -370,7 +371,7 @@ public void testMessageFromDifferentPartitionFails() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -384,7 +385,7 @@ public void testMessageFromDifferentPartitionFails() { .isThrownBy( () -> chunk.addMessage( - MessageUtil.makeMessage(101), "differentKafkaPartition", finalOffset)); + MessageUtil.withMessageId(101), "differentKafkaPartition", finalOffset)); } @Test @@ -392,7 +393,7 @@ public void testCommitBeforeSnapshot() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } assertThat(chunk.isReadOnly()).isFalse(); @@ -496,7 +497,8 @@ public void testAddInvalidMessagesToChunk() { LogMessage testMessage = MessageUtil.makeMessage(0, Map.of("username", 0)); // An Invalid message is dropped but failure counter is incremented. - chunk.addMessage(testMessage, TEST_KAFKA_PARTITION_ID, 1); + chunk.addMessage( + MessageUtil.convertLogMessageToSpan(testMessage), TEST_KAFKA_PARTITION_ID, 1); chunk.commit(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, registry)).isEqualTo(1); @@ -588,7 +590,7 @@ public void testSnapshotToNonExistentS3BucketFails() List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } @@ -650,7 +652,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java index 0c106d7ad3..6a4600afbd 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/RecoveryChunkImplTest.java @@ -139,7 +139,7 @@ public void testAddAndSearchChunk() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -197,7 +197,7 @@ public void testAddAndSearchChunkInTimeRange() { final long messageStartTimeMs = messages.get(0).getTimestamp().toEpochMilli(); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -238,7 +238,7 @@ public void testAddAndSearchChunkInTimeRange() { 1, 100, 1000, startTime.plus(2, ChronoUnit.DAYS)); final long newMessageStartTimeEpochMs = newMessages.get(0).getTimestamp().toEpochMilli(); for (LogMessage m : newMessages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -304,7 +304,7 @@ public void testSearchInReadOnlyChunk() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -337,7 +337,7 @@ public void testAddMessageToReadOnlyChunk() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -351,7 +351,7 @@ public void testAddMessageToReadOnlyChunk() { .isThrownBy( () -> chunk.addMessage( - MessageUtil.makeMessage(101), TEST_KAFKA_PARTITION_ID, finalOffset)); + MessageUtil.withMessageId(101), TEST_KAFKA_PARTITION_ID, finalOffset)); } @Test @@ -359,7 +359,7 @@ public void testMessageFromDifferentPartitionFails() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } chunk.commit(); @@ -373,7 +373,7 @@ public void testMessageFromDifferentPartitionFails() { .isThrownBy( () -> chunk.addMessage( - MessageUtil.makeMessage(101), "differentKafkaPartition", finalOffset)); + MessageUtil.withMessageId(101), "differentKafkaPartition", finalOffset)); } @Test @@ -381,7 +381,7 @@ public void testCommitBeforeSnapshot() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } assertThat(chunk.isReadOnly()).isFalse(); @@ -487,7 +487,8 @@ public void testAddInvalidMessagesToChunk() { LogMessage testMessage = MessageUtil.makeMessage(0, Map.of("username", 0)); // An Invalid message is dropped but failure counter is incremented. - chunk.addMessage(testMessage, TEST_KAFKA_PARTITION_ID, 1); + chunk.addMessage( + MessageUtil.convertLogMessageToSpan(testMessage), TEST_KAFKA_PARTITION_ID, 1); chunk.commit(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, registry)).isEqualTo(1); @@ -575,7 +576,7 @@ public void testSnapshotToNonExistentS3BucketFails() { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } @@ -626,7 +627,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100); int offset = 1; for (LogMessage m : messages) { - chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset); + chunk.addMessage(MessageUtil.convertLogMessageToSpan(m), TEST_KAFKA_PARTITION_ID, offset); offset++; } diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkManager/CachingChunkManagerTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkManager/CachingChunkManagerTest.java index 62683577fb..b166c38610 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkManager/CachingChunkManagerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkManager/CachingChunkManagerTest.java @@ -157,7 +157,8 @@ public void shouldHandleLifecycle() throws Exception { public void testAddMessageIsUnsupported() throws TimeoutException { cachingChunkManager = initChunkManager(); MessageUtil.makeMessage(1); - assertThatThrownBy(() -> cachingChunkManager.addMessage(MessageUtil.makeMessage(1), 10, "1", 1)) + assertThatThrownBy( + () -> cachingChunkManager.addMessage(MessageUtil.withMessageId(1), 10, "1", 1)) .isInstanceOf(UnsupportedOperationException.class); } diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java index cb4cf97e9d..609e6c053e 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java @@ -59,6 +59,7 @@ import com.slack.kaldb.proto.service.KaldbSearch; import com.slack.kaldb.testlib.KaldbConfigUtil; import com.slack.kaldb.testlib.MessageUtil; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.File; import java.io.IOException; @@ -218,7 +219,11 @@ public void testDeleteOverMaxThresholdGreaterThanZero() throws IOException, Time int offset = 1; for (LogMessage m : messages.subList(0, 9)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } assertThat(chunkManager.getChunkList().size()).isEqualTo(1); @@ -230,7 +235,11 @@ public void testDeleteOverMaxThresholdGreaterThanZero() throws IOException, Time assertThat(chunk1.info().getChunkSnapshotTimeEpochMs()).isZero(); for (LogMessage m : messages.subList(9, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } @@ -299,7 +308,11 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0() int offset = 1; for (LogMessage m : messages.subList(0, 9)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } assertThat(chunkManager.getChunkList().size()).isEqualTo(1); @@ -343,7 +356,8 @@ public void closeDuringCleanerTask() int offset = 1; for (LogMessage m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), msgSize, TEST_KAFKA_PARTITION_ID, offset); offset++; chunkManager.getActiveChunk().commit(); @@ -386,7 +400,8 @@ public void testAddMessage() throws Exception { int offset = 1; for (LogMessage m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), msgSize, TEST_KAFKA_PARTITION_ID, offset); actualChunkSize += msgSize; offset++; } @@ -451,7 +466,7 @@ public void testAddMessage() throws Exception { final int veryHighOffset = 1000; assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(offset - 1); assertThat(veryHighOffset - offset).isGreaterThan(100); - LogMessage messageWithHighOffset = MessageUtil.makeMessage(101); + Trace.Span messageWithHighOffset = MessageUtil.withMessageId(101); chunkManager.addMessage( messageWithHighOffset, messageWithHighOffset.toString().length(), @@ -481,7 +496,7 @@ public void testAddMessage() throws Exception { assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); assertThat(lowerOffset - offset).isGreaterThan(100); assertThat(veryHighOffset - lowerOffset).isGreaterThan(100); - LogMessage messageWithLowerOffset = MessageUtil.makeMessage(102); + Trace.Span messageWithLowerOffset = MessageUtil.withMessageId(102); chunkManager.addMessage( messageWithLowerOffset, messageWithLowerOffset.toString().length(), @@ -507,7 +522,7 @@ public void testAddMessage() throws Exception { .isEqualTo(1); // Inserting a message from a different kafka partition fails - LogMessage messageWithInvalidTopic = MessageUtil.makeMessage(103); + Trace.Span messageWithInvalidTopic = MessageUtil.withMessageId(103); assertThatIllegalArgumentException() .isThrownBy( () -> @@ -609,7 +624,11 @@ public void testAddAndSearchMessageInMultipleSlices() throws Exception { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 15); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } chunkManager.getActiveChunk().commit(); @@ -638,7 +657,11 @@ public void testAddAndSearchMessageInSpecificChunks() throws Exception { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 15); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } chunkManager.getActiveChunk().commit(); @@ -728,13 +751,21 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { // Add a message int offset = 1; LogMessage msg1 = MessageUtil.makeMessage(1); - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(msg1), + msg1.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; // Add an invalid message LogMessage msg100 = MessageUtil.makeMessage(100, Map.of(LogMessage.ReservedField.HOSTNAME.fieldName, 20000)); - chunkManager.addMessage(msg100, msg100.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(msg100), + msg100.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; // Commit the new chunk so we can search it. @@ -784,14 +815,22 @@ public void testMessagesAddedToActiveChunks() throws Exception { LogMessage msg1 = msgs.get(0); LogMessage msg2 = msgs.get(1); int offset = 1; - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(msg1), + msg1.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; ReadWriteChunk chunk1 = chunkManager.getActiveChunk(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(1); assertThat(getCount(MESSAGES_FAILED_COUNTER, metricsRegistry)).isEqualTo(0); assertThat(getValue(LIVE_MESSAGES_INDEXED, metricsRegistry)).isEqualTo(1); - chunkManager.addMessage(msg2, msg2.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(msg2), + msg2.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; assertThat(chunkManager.getChunkList().size()).isEqualTo(1); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(2); @@ -805,7 +844,11 @@ public void testMessagesAddedToActiveChunks() throws Exception { LogMessage msg3 = msgs.get(2); LogMessage msg4 = msgs.get(3); - chunkManager.addMessage(msg3, msg3.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(msg3), + msg3.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; assertThat(chunkManager.getChunkList().size()).isEqualTo(2); @@ -820,7 +863,10 @@ public void testMessagesAddedToActiveChunks() throws Exception { checkMetadata(3, 2, 1, 2, 1); // Inserting in an older chunk throws an exception. So, additions go to active chunks only. assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> chunk1.addMessage(msg4, TEST_KAFKA_PARTITION_ID, 1)); + .isThrownBy( + () -> + chunk1.addMessage( + MessageUtil.convertLogMessageToSpan(msg4), TEST_KAFKA_PARTITION_ID, 1)); } @Test @@ -835,7 +881,11 @@ public void testMultiThreadedChunkRollover() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (LogMessage m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } @@ -869,7 +919,11 @@ public void testAddMessagesToChunkWithRollover() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (LogMessage m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } @@ -890,7 +944,11 @@ public void testAddMessagesToChunkWithRollover() throws Exception { // Add remaining messages to create a second chunk. for (LogMessage m : messages.subList(11, 25)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } chunkManager.getActiveChunk().commit(); @@ -960,7 +1018,11 @@ public void testAllChunkFailures() throws Exception { // Add 11 messages to initiate first roll over. int offset = 1; for (LogMessage m : messages.subList(0, 11)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } // Main chunk is already committed. Commit the new chunk so we can search it. @@ -979,7 +1041,11 @@ public void testAllChunkFailures() throws Exception { testChunkManagerSearch(chunkManager, "Message21", 0, 2, 2); for (LogMessage m : messages.subList(11, 25)) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } chunkManager.getActiveChunk().commit(); @@ -1040,7 +1106,11 @@ public void testCommitInvalidChunk() throws Exception { int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } @@ -1081,7 +1151,11 @@ public void testMultiChunkSearch() throws Exception { int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } @@ -1205,7 +1279,11 @@ public void testChunkRollOverInProgressExceptionIsThrown() throws Exception { () -> { int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } }) @@ -1242,7 +1320,11 @@ public void testSuccessfulRollOverFinishesOnClose() throws Exception { // rollover. int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } ListenableFuture rollOverFuture = chunkManager.getRolloverFuture(); @@ -1294,7 +1376,11 @@ public void testFailedRollOverFinishesOnClose() throws Exception { // rollover. int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } await().until(() -> getCount(ROLLOVERS_FAILED, metricsRegistry) == 1); @@ -1339,7 +1425,11 @@ public void testRollOverFailure() int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } @@ -1363,7 +1453,10 @@ public void testRollOverFailure() MessageUtil.makeMessagesWithTimeDifference(11, 12, 1000, startTime); for (LogMessage m : newMessage) { chunkManager.addMessage( - m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset); + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + newOffset); newOffset++; } }) @@ -1388,7 +1481,11 @@ public void testRollOverFailureWithDirectExecutor() // exception. int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } @@ -1409,7 +1506,10 @@ public void testRollOverFailureWithDirectExecutor() MessageUtil.makeMessagesWithTimeDifference(11, 12, 1000, startTime); for (LogMessage m : newMessage) { chunkManager.addMessage( - m, m.toString().length(), TEST_KAFKA_PARTITION_ID, newOffset); + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + newOffset); newOffset++; } }) @@ -1431,7 +1531,11 @@ public void testNewFieldAddedToSchema() throws IOException, TimeoutException { int offset = 1; for (LogMessage m : messages1) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } chunkManager.getActiveChunk().commit(); @@ -1442,7 +1546,10 @@ public void testNewFieldAddedToSchema() throws IOException, TimeoutException { LogMessage logMessage = LogMessage.fromWireMessage(MessageUtil.makeWireMessage(11, Map.of("schemaTest", true))); chunkManager.addMessage( - logMessage, logMessage.toString().length(), TEST_KAFKA_PARTITION_ID, offset++); + MessageUtil.convertLogMessageToSpan(logMessage), + logMessage.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset++); chunkManager.rollOverActiveChunk(); await().until(() -> getCount(RollOverChunkTask.ROLLOVERS_COMPLETED, metricsRegistry) == 2); @@ -1574,7 +1681,8 @@ private void insertMessages( int offset = 1; for (LogMessage m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), msgSize, TEST_KAFKA_PARTITION_ID, offset); offset++; actualMessagesGauge++; actualBytesGauge += msgSize; diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java index 7df5dfd325..f9d2030cd5 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java @@ -40,6 +40,7 @@ import com.slack.kaldb.testlib.KaldbConfigUtil; import com.slack.kaldb.testlib.MessageUtil; import com.slack.kaldb.testlib.TemporaryLogStoreAndSearcherExtension; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.File; import java.io.IOException; @@ -167,7 +168,8 @@ public void testAddMessageAndRollover() throws Exception { int offset = 1; for (LogMessage m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), msgSize, TEST_KAFKA_PARTITION_ID, offset); actualChunkSize += msgSize; offset++; } @@ -214,7 +216,7 @@ public void testAddMessageAndRollover() throws Exception { final int veryHighOffset = 1000; assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(offset - 1); assertThat(veryHighOffset - offset).isGreaterThan(100); - LogMessage messageWithHighOffset = MessageUtil.makeMessage(101); + Trace.Span messageWithHighOffset = MessageUtil.withMessageId(101); chunkManager.addMessage( messageWithHighOffset, messageWithHighOffset.toString().length(), @@ -244,7 +246,7 @@ public void testAddMessageAndRollover() throws Exception { assertThat(chunkManager.getActiveChunk().info().getMaxOffset()).isEqualTo(veryHighOffset); assertThat(lowerOffset - offset).isGreaterThan(100); assertThat(veryHighOffset - lowerOffset).isGreaterThan(100); - LogMessage messageWithLowerOffset = MessageUtil.makeMessage(102); + Trace.Span messageWithLowerOffset = MessageUtil.withMessageId(102); chunkManager.addMessage( messageWithLowerOffset, messageWithLowerOffset.toString().length(), @@ -270,7 +272,7 @@ public void testAddMessageAndRollover() throws Exception { .isEqualTo(1); // Inserting a message from a different kafka partition fails - LogMessage messageWithInvalidTopic = MessageUtil.makeMessage(103); + Trace.Span messageWithInvalidTopic = MessageUtil.withMessageId(103); assertThatIllegalArgumentException() .isThrownBy( () -> @@ -304,10 +306,11 @@ public void testAddMessageAndRollover() throws Exception { // Can't add messages to current chunk after roll over. assertThatThrownBy( - () -> - currentChunk.addMessage( - MessageUtil.makeMessage(100000), TEST_KAFKA_PARTITION_ID, 100000)) - .isInstanceOf(IllegalStateException.class); + () -> + currentChunk.addMessage( + MessageUtil.convertLogMessageToSpan(MessageUtil.makeMessage(100000)), + TEST_KAFKA_PARTITION_ID, + 100000)); // Ensure data is cleaned up in the manager assertThat(chunkManager.getChunkList()).isEmpty(); @@ -361,13 +364,21 @@ public void testAddMessageWithPropertyTypeConflicts() throws Exception { // Add a valid message int offset = 1; LogMessage msg1 = MessageUtil.makeMessage(1); - chunkManager.addMessage(msg1, msg1.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(msg1), + msg1.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; // Add an invalid message LogMessage msg100 = MessageUtil.makeMessage(100, Map.of(LogMessage.ReservedField.HOSTNAME.fieldName, 20000)); - chunkManager.addMessage(msg100, msg100.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(msg100), + msg100.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); //noinspection UnusedAssignment offset++; @@ -404,7 +415,11 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { int offset = 1; List messages = MessageUtil.makeMessagesWithTimeDifference(1, 20); for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } @@ -423,7 +438,10 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { .isThrownBy( () -> chunkManager.addMessage( - MessageUtil.makeMessage(1000), 100, TEST_KAFKA_PARTITION_ID, 1000)); + MessageUtil.convertLogMessageToSpan(MessageUtil.makeMessage(1000)), + 100, + TEST_KAFKA_PARTITION_ID, + 1000)); // Check metadata. List snapshots = @@ -446,7 +464,10 @@ public void testAddMessagesWithFailedRollOverStopsIngestion() throws Exception { .isThrownBy( () -> chunkManager.addMessage( - MessageUtil.makeMessage(1000), 100, TEST_KAFKA_PARTITION_ID, 1000)); + MessageUtil.convertLogMessageToSpan(MessageUtil.makeMessage(1000)), + 100, + TEST_KAFKA_PARTITION_ID, + 1000)); chunkManager.awaitTerminated(DEFAULT_START_STOP_DURATION); chunkManager = null; diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java index 3b080ae35f..4c762add94 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java @@ -180,7 +180,8 @@ public void testDiskBasedRolloverWithMaxBytes() throws Exception { boolean shouldCheckOnNextMessage = false; for (LogMessage m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), msgSize, TEST_KAFKA_PARTITION_ID, offset); offset++; Thread.sleep(DiskOrMessageCountBasedRolloverStrategy.DIRECTORY_SIZE_EXECUTOR_PERIOD_MS); if (chunkManager.getActiveChunk() != null) { @@ -268,7 +269,8 @@ public void testDiskBasedRolloverWithMaxMessages() throws Exception { int offset = 1; for (LogMessage m : messages) { final int msgSize = m.toString().length(); - chunkManager.addMessage(m, msgSize, TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), msgSize, TEST_KAFKA_PARTITION_ID, offset); offset++; if (chunkManager.getActiveChunk() != null) { chunkManager.getActiveChunk().commit(); @@ -349,7 +351,8 @@ public void testDirectorySizeWithNoValidSegments() throws IOException { @Test public void testDirectorySizeWithValidSegments() { strictLogStore.logStore.addMessage( - new LogMessage("foo", "bar", "baz", Instant.EPOCH, Map.of())); + MessageUtil.convertLogMessageToSpan( + new LogMessage("foo", "bar", "baz", Instant.EPOCH, Map.of()))); strictLogStore.logStore.commit(); FSDirectory directory = strictLogStore.logStore.getDirectory(); long directorySize = DiskOrMessageCountBasedRolloverStrategy.calculateDirectorySize(directory); diff --git a/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java index c56eec0c6e..e2ee1a429b 100644 --- a/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/elasticsearchApi/ElasticsearchApiServiceTest.java @@ -337,7 +337,11 @@ private void addMessagesToChunkManager(List messages) throws IOExcep IndexingChunkManager chunkManager = chunkManagerUtil.chunkManager; int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARTITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARTITION_ID, + offset); offset++; } chunkManager.getActiveChunk().commit(); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/FieldConflictsTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/FieldConflictsTest.java index 8029c28958..be2a4d02ac 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/FieldConflictsTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/FieldConflictsTest.java @@ -46,7 +46,7 @@ public void testFieldConflictingFieldTypeWithSameValue() { "host1-dc2.abc.com", conflictingFieldName, "1")); - strictLogStore.logStore.addMessage(msg1); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg1)); LogMessage msg2 = new LogMessage( @@ -63,7 +63,7 @@ public void testFieldConflictingFieldTypeWithSameValue() { "host1-dc2.abc.com", conflictingFieldName, 1)); - strictLogStore.logStore.addMessage(msg2); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg2)); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); @@ -119,7 +119,7 @@ public void testFieldConflictingFieldTypeWithDifferentValue() { "host1-dc2.abc.com", conflictingFieldName, "1")); - strictLogStore.logStore.addMessage(msg0); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg0)); LogMessage msg1 = new LogMessage( @@ -136,7 +136,7 @@ public void testFieldConflictingFieldTypeWithDifferentValue() { "host1-dc2.abc.com", conflictingFieldName, "one")); - strictLogStore.logStore.addMessage(msg1); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg1)); LogMessage msg2 = new LogMessage( @@ -153,7 +153,7 @@ public void testFieldConflictingFieldTypeWithDifferentValue() { "host1-dc2.abc.com", conflictingFieldName, 200)); - strictLogStore.logStore.addMessage(msg2); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg2)); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java index 464a6c4ea3..2ce441303f 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java @@ -92,7 +92,7 @@ public void testSearchAndQueryDocsWithNestedJson() throws InterruptedException { "duplicate1", "nested", Map.of("key1", "value1", "duplicateproperty", "2"))); - logStore.logStore.addMessage(msg); + logStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg)); logStore.logStore.commit(); logStore.logStore.refresh(); @@ -159,7 +159,7 @@ public void testTimestampOrdering() { public void testIndexDocsWithUnsupportedPropertyTypes() { LogMessage msg = MessageUtil.makeMessage(100, Map.of("unsupportedProperty", Collections.emptyList())); - logStore.logStore.addMessage(msg); + logStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg)); addMessages(logStore.logStore, 1, 99, true); Collection results = findAllMessages(logStore.logSearcher, MessageUtil.TEST_DATASET_NAME, "identifier", 1000); @@ -173,7 +173,7 @@ public void testIndexDocsWithUnsupportedPropertyTypes() { @Test public void testIndexDocsWithTypeMismatchErrors() { LogMessage msg = MessageUtil.makeMessage(100, Map.of(ReservedField.HOSTNAME.fieldName, 1)); - logStore.logStore.addMessage(msg); + logStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg)); addMessages(logStore.logStore, 1, 99, true); Collection results = findAllMessages(logStore.logSearcher, MessageUtil.TEST_DATASET_NAME, "identifier", 1000); @@ -201,7 +201,7 @@ public TestsWithRaiseErrorFieldConflictPolicy() throws IOException {} public void failIndexingDocsWithListFieldType() { LogMessage msg = MessageUtil.makeMessage(100, Map.of("unsupportedProperty", Collections.emptyList())); - logStore.logStore.addMessage(msg); + logStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg)); addMessages(logStore.logStore, 1, 99, true); Collection results = findAllMessages(logStore.logSearcher, MessageUtil.TEST_DATASET_NAME, "identifier", 1000); @@ -216,7 +216,7 @@ public void failIndexingDocsWithListFieldType() { public void failIndexingDocsWithMismatchedTypeErrors() { LogMessage msg = MessageUtil.makeMessage(100, Map.of(ReservedField.HOSTNAME.fieldName, 20000)); - logStore.logStore.addMessage(msg); + logStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg)); addMessages(logStore.logStore, 1, 99, true); Collection results = findAllMessages(logStore.logSearcher, MessageUtil.TEST_DATASET_NAME, "identifier", 1000); @@ -232,7 +232,7 @@ public void indexLongUnbreakableField() { String hugeField = IntStream.range(1, 10000).boxed().map(String::valueOf).collect(Collectors.joining("")); LogMessage msg = MessageUtil.makeMessage(1, Map.of("hugefield", hugeField)); - logStore.logStore.addMessage(msg); + logStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg)); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, logStore.metricsRegistry)).isEqualTo(1); // UTF8 encoding is longer than the max length 32766 assertThat(getCount(MESSAGES_FAILED_COUNTER, logStore.metricsRegistry)).isEqualTo(1); @@ -256,7 +256,7 @@ public void testFieldSearch() throws InterruptedException { "foo-bar", ReservedField.HOSTNAME.fieldName, "host1-dc2.abc.com")); - logStore.logStore.addMessage(msg); + logStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg)); logStore.logStore.commit(); logStore.logStore.refresh(); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/ConvertFieldValueAndDuplicateTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/ConvertFieldValueAndDuplicateTest.java index b08a4bdbe3..e1d8066f6c 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/ConvertFieldValueAndDuplicateTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/ConvertFieldValueAndDuplicateTest.java @@ -64,7 +64,7 @@ public void testListTypeInDocument() throws IOException { "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); assertThat(testDocument.getFields().size()).isEqualTo(23); assertThat(docBuilder.getSchema().size()).isEqualTo(23); assertThat(docBuilder.getSchema().keySet()) @@ -120,7 +120,7 @@ public void testListTypeInDocumentWithoutFullTextSearch() throws IOException { "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); assertThat(testDocument.getFields().size()).isEqualTo(22); assertThat(docBuilder.getSchema().size()).isEqualTo(22); assertThat(docBuilder.getSchema().keySet()) @@ -172,7 +172,7 @@ public void testConvertingAndDuplicatingConflictingField() throws JsonProcessing conflictingFieldName, "1")); - Document msg1Doc = convertFieldBuilder.fromMessage(msg1); + Document msg1Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); assertThat(msg1Doc.getFields().size()).isEqualTo(17); assertThat( msg1Doc.getFields().stream() @@ -202,7 +202,7 @@ public void testConvertingAndDuplicatingConflictingField() throws JsonProcessing "host1-dc2.abc.com", conflictingFieldName, 1)); - Document msg2Doc = convertFieldBuilder.fromMessage(msg2); + Document msg2Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); assertThat(msg2Doc.getFields().size()).isEqualTo(19); String additionalCreatedFieldName = makeNewFieldOfType(conflictingFieldName, FieldType.INTEGER); // Value converted and new field is added. @@ -267,7 +267,7 @@ public void testConvertingAndDuplicatingConflictingBooleanField() throws JsonPro conflictingFieldName, true)); - Document msg1Doc = convertFieldBuilder.fromMessage(msg1); + Document msg1Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); assertThat(msg1Doc.getFields().size()).isEqualTo(17); assertThat( msg1Doc.getFields().stream() @@ -297,7 +297,7 @@ public void testConvertingAndDuplicatingConflictingBooleanField() throws JsonPro "host1-dc2.abc.com", conflictingFieldName, "random")); - Document msg2Doc = convertFieldBuilder.fromMessage(msg2); + Document msg2Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); assertThat(msg2Doc.getFields().size()).isEqualTo(19); String additionalCreatedFieldName = makeNewFieldOfType(conflictingFieldName, FieldType.STRING); // Value converted and new field is added. @@ -355,7 +355,7 @@ public void testConvertingAndDuplicatingConflictingBooleanField() throws JsonPro "host1-dc2.abc.com", additionalCreatedFieldName, true)); - Document msg3Doc = convertFieldBuilder.fromMessage(msg3); + Document msg3Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg3)); assertThat(msg3Doc.getFields().size()).isEqualTo(19); assertThat( msg3Doc.getFields().stream() @@ -410,7 +410,7 @@ public void testValueTypeConversionWorksInDocument() throws JsonProcessingExcept conflictingFieldName, "1")); - Document msg1Doc = convertFieldBuilder.fromMessage(msg1); + Document msg1Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); assertThat(msg1Doc.getFields().size()).isEqualTo(17); assertThat( msg1Doc.getFields().stream() @@ -441,7 +441,7 @@ public void testValueTypeConversionWorksInDocument() throws JsonProcessingExcept "host1-dc2.abc.com", conflictingFieldName, conflictingFloatValue)); - Document msg2Doc = convertFieldBuilder.fromMessage(msg2); + Document msg2Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); assertThat(msg2Doc.getFields().size()).isEqualTo(19); String additionalCreatedFieldName = makeNewFieldOfType(conflictingFieldName, FieldType.FLOAT); // Value converted and new field is added. @@ -511,7 +511,7 @@ public void testConversionInConvertAndDuplicateField() throws IOException { "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument1 = docBuilder.fromMessage(msg1); + Document testDocument1 = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); final int expectedDocFieldsAfterMsg1 = 23; assertThat(testDocument1.getFields().size()).isEqualTo(expectedDocFieldsAfterMsg1); final int expectedFieldsAfterMsg1 = 23; @@ -550,7 +550,7 @@ public void testConversionInConvertAndDuplicateField() throws IOException { "value1", "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument2 = docBuilder.fromMessage(msg2); + Document testDocument2 = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); assertThat(testDocument2.getFields().size()).isEqualTo(expectedDocFieldsAfterMsg1 + 2); assertThat(docBuilder.getSchema().size()).isEqualTo(expectedFieldsAfterMsg1 + 1); assertThat(docBuilder.getSchema().get(floatStrConflictField).fieldType) @@ -622,7 +622,7 @@ public void testStringTextAliasing() throws JsonProcessingException { "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument1 = docBuilder.fromMessage(msg1); + Document testDocument1 = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); final int expectedDocFieldsAfterMsg1 = 23; assertThat(testDocument1.getFields().size()).isEqualTo(expectedDocFieldsAfterMsg1); final int expectedFieldsAfterMsg1 = 23; @@ -671,7 +671,7 @@ public void testStringTextAliasing() throws JsonProcessingException { Map.of( stringField, "nestedStringField", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument2 = docBuilder.fromMessage(msg2); + Document testDocument2 = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); // Nested string field adds 1 more sorted doc values field. assertThat(testDocument2.getFields().size()).isEqualTo(expectedDocFieldsAfterMsg1); assertThat(docBuilder.getSchema().size()).isEqualTo(expectedFieldsAfterMsg1 + 1); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/ConvertFieldValueTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/ConvertFieldValueTest.java index 8842a2bd1e..bc41389f47 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/ConvertFieldValueTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/ConvertFieldValueTest.java @@ -56,7 +56,7 @@ public void testConvertingConflictingField() throws JsonProcessingException { conflictingFieldName, "1")); - Document msg1Doc = convertFieldBuilder.fromMessage(msg1); + Document msg1Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); assertThat(msg1Doc.getFields().size()).isEqualTo(17); assertThat( msg1Doc.getFields().stream() @@ -86,7 +86,7 @@ public void testConvertingConflictingField() throws JsonProcessingException { "host1-dc2.abc.com", conflictingFieldName, 1)); - Document msg2Doc = convertFieldBuilder.fromMessage(msg2); + Document msg2Doc = convertFieldBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); assertThat(msg2Doc.getFields().size()).isEqualTo(17); // Value is converted for conflicting field. assertThat( @@ -144,7 +144,7 @@ public void testConversionUsingConvertField() throws IOException { "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument1 = docBuilder.fromMessage(msg1); + Document testDocument1 = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); final int expectedDocFieldsAfterMsg1 = 23; assertThat(testDocument1.getFields().size()).isEqualTo(expectedDocFieldsAfterMsg1); final int expectedFieldsAfterMsg1 = 23; @@ -183,7 +183,7 @@ public void testConversionUsingConvertField() throws IOException { "value1", "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument2 = docBuilder.fromMessage(msg2); + Document testDocument2 = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); assertThat(testDocument2.getFields().size()).isEqualTo(expectedDocFieldsAfterMsg1); assertThat(docBuilder.getSchema().size()).isEqualTo(expectedFieldsAfterMsg1); assertThat(docBuilder.getSchema().get(floatStrConflictField).fieldType) diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/DropPolicyTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/DropPolicyTest.java index 10448628aa..28ba9aa40b 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/DropPolicyTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/DropPolicyTest.java @@ -40,7 +40,7 @@ public void testBasicDocumentCreation() throws IOException { assertThat(docBuilder.getSchema().size()).isEqualTo(17); assertThat(docBuilder.getSchema().keySet()).contains(LogMessage.SystemField.ALL.fieldName); final LogMessage message = MessageUtil.makeMessage(0); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); assertThat(testDocument.getFields().size()).isEqualTo(21); assertThat(docBuilder.getSchema().size()).isEqualTo(22); assertThat(docBuilder.getSchema().keySet()) @@ -86,7 +86,7 @@ public void testBasicDocumentCreationWithoutFullTextSearch() throws IOException assertThat(docBuilder.getSchema().keySet()) .doesNotContain(LogMessage.SystemField.ALL.fieldName); final LogMessage message = MessageUtil.makeMessage(0); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); assertThat(testDocument.getFields().size()).isEqualTo(20); assertThat(docBuilder.getSchema().size()).isEqualTo(21); assertThat(docBuilder.getSchema().keySet()) @@ -148,7 +148,7 @@ public void testNestedDocumentCreation() throws IOException { "nested", Map.of("nested1", "value1", "nested2", 2))); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); assertThat(testDocument.getFields().size()).isEqualTo(19); assertThat(docBuilder.getSchema().size()).isEqualTo(21); assertThat(docBuilder.getSchema().keySet()) @@ -199,7 +199,7 @@ public void testMaxRecursionNestedDocumentCreation() throws IOException { "nested22", Map.of("nested31", 31, "nested32", Map.of("nested41", 41)))))); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); assertThat(testDocument.getFields().size()).isEqualTo(25); assertThat(docBuilder.getSchema().size()).isEqualTo(24); assertThat(docBuilder.getSchema().keySet()) @@ -251,7 +251,7 @@ public void testMultiLevelNestedDocumentCreation() throws IOException { "nested", Map.of("leaf1", "value1", "nested", Map.of("leaf2", "value2", "leaf21", 3)))); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); assertThat(testDocument.getFields().size()).isEqualTo(19); assertThat(docBuilder.getSchema().size()).isEqualTo(21); assertThat(docBuilder.getSchema().keySet()) @@ -294,7 +294,7 @@ public void testMultiLevelNestedDocumentCreationWithoutFulltTextSearch() throws "nested", Map.of("leaf1", "value1", "nested", Map.of("leaf2", "value2", "leaf21", 3)))); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); assertThat(testDocument.getFields().size()).isEqualTo(18); assertThat(docBuilder.getSchema().size()).isEqualTo(20); assertThat(docBuilder.getSchema().keySet()) @@ -339,7 +339,7 @@ public void testDroppingConflictingField() throws JsonProcessingException { conflictingFieldName, "1")); - Document msg1Doc = docBuilder.fromMessage(msg1); + Document msg1Doc = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); assertThat(msg1Doc.getFields().size()).isEqualTo(17); assertThat( msg1Doc.getFields().stream() @@ -369,7 +369,7 @@ public void testDroppingConflictingField() throws JsonProcessingException { "host1-dc2.abc.com", conflictingFieldName, 1)); - Document msg2Doc = docBuilder.fromMessage(msg2); + Document msg2Doc = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); assertThat(msg2Doc.getFields().size()).isEqualTo(15); // Conflicting field is dropped. assertThat( @@ -415,7 +415,7 @@ public void testConversionUsingDropFieldBuilder() throws IOException { "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument = docBuilder.fromMessage(message); + Document testDocument = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(message)); final int expectedFieldsInDocumentAfterMesssage = 23; assertThat(testDocument.getFields().size()).isEqualTo(expectedFieldsInDocumentAfterMesssage); final int fieldCountAfterIndexingFirstDocument = 23; @@ -454,7 +454,7 @@ public void testConversionUsingDropFieldBuilder() throws IOException { "value1", "nested", Map.of("leaf2", "value2", "leaf21", 3, "nestedList", List.of(1))))); - Document testDocument2 = docBuilder.fromMessage(msg2); + Document testDocument2 = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2)); assertThat(testDocument2.getFields().size()) .isEqualTo( expectedFieldsInDocumentAfterMesssage - 2); // 1 dropped field, 2 less indexed fields diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/RaiseErrorFieldValueTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/RaiseErrorFieldValueTest.java index ca12ff4ec9..6d9d56db35 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/schema/RaiseErrorFieldValueTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/schema/RaiseErrorFieldValueTest.java @@ -52,7 +52,7 @@ public void testRaiseErrorOnConflictingField() throws JsonProcessingException { conflictingFieldName, 1)); - Document msg1Doc = docBuilder.fromMessage(msg1); + Document msg1Doc = docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1)); assertThat(msg1Doc.getFields().size()).isEqualTo(17); assertThat( msg1Doc.getFields().stream() @@ -84,7 +84,7 @@ public void testRaiseErrorOnConflictingField() throws JsonProcessingException { "newFieldValue", conflictingFieldName, "1")); - assertThatThrownBy(() -> docBuilder.fromMessage(msg2)) + assertThatThrownBy(() -> docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg2))) .isInstanceOf(FieldDefMismatchException.class); // NOTE: When a document indexing fails, we still register the types of the fields in this doc. // So, the fieldMap may contain an additional item than before. @@ -108,7 +108,7 @@ public void testRaiseErrorOnConflictingField() throws JsonProcessingException { 123, "newFieldText", "newFieldValue")); - assertThatThrownBy(() -> docBuilder.fromMessage(msg3)) + assertThatThrownBy(() -> docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg3))) .isInstanceOf(FieldDefMismatchException.class); // NOTE: When a document indexing fails, we still register the types of the fields in this doc. // So, the fieldMap may contain an additional item than before. @@ -153,7 +153,7 @@ public void testRaiseErrorOnConflictingReservedField() { hostNameField, 123)); - assertThatThrownBy(() -> docBuilder.fromMessage(msg1)) + assertThatThrownBy(() -> docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1))) .isInstanceOf(FieldDefMismatchException.class); assertThat(docBuilder.getSchema().size()).isEqualTo(17); assertThat(docBuilder.getSchema().keySet()).contains(hostNameField); @@ -188,7 +188,7 @@ public void testRaiseErrorOnConflictingReservedFieldWithoutFullTextSearch() { hostNameField, 123)); - assertThatThrownBy(() -> docBuilder.fromMessage(msg1)) + assertThatThrownBy(() -> docBuilder.fromMessage(MessageUtil.convertLogMessageToSpan(msg1))) .isInstanceOf(FieldDefMismatchException.class); assertThat(docBuilder.getSchema().size()).isEqualTo(16); assertThat(docBuilder.getSchema().keySet()).contains(hostNameField); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/KaldbLocalQueryServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/KaldbLocalQueryServiceTest.java index 1a09bf23f8..6c99c71d9f 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/KaldbLocalQueryServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/KaldbLocalQueryServiceTest.java @@ -111,7 +111,11 @@ public void testKalDbSearch() throws IOException { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARITION_ID, + offset); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -179,7 +183,11 @@ public void testKalDbSearchNoData() throws IOException { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARITION_ID, + offset); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -225,7 +233,11 @@ public void testKalDbSearchNoHits() throws IOException { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARITION_ID, + offset); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -273,7 +285,11 @@ public void testKalDbSearchNoHistogram() throws IOException { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARITION_ID, + offset); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -329,7 +345,11 @@ public void testKalDbBadArgSearch() throws Throwable { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARITION_ID, + offset); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -364,7 +384,11 @@ public void testKalDbGrpcSearch() throws IOException { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARITION_ID, + offset); offset++; } // No need to commit the active chunk since the last chunk is already closed. @@ -443,7 +467,11 @@ public void testKalDbGrpcSearchThrowsException() throws IOException { List messages = MessageUtil.makeMessagesWithTimeDifference(1, 100, 1000, startTime); int offset = 1; for (LogMessage m : messages) { - chunkManager.addMessage(m, m.toString().length(), TEST_KAFKA_PARITION_ID, offset); + chunkManager.addMessage( + MessageUtil.convertLogMessageToSpan(m), + m.toString().length(), + TEST_KAFKA_PARITION_ID, + offset); offset++; } // No need to commit the active chunk since the last chunk is already closed. diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java index 966c030bc9..fb0f54ac64 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java @@ -25,6 +25,7 @@ import com.slack.kaldb.logstore.search.aggregations.MovingAvgAggBuilder; import com.slack.kaldb.logstore.search.aggregations.SumAggBuilder; import com.slack.kaldb.logstore.search.aggregations.TermsAggBuilder; +import com.slack.kaldb.testlib.MessageUtil; import com.slack.kaldb.testlib.TemporaryLogStoreAndSearcherExtension; import java.io.IOException; import java.time.Instant; @@ -70,18 +71,23 @@ public static void beforeClass() { private void loadTestData(Instant time) { strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(1, "apple", TEST_DATASET_NAME, time)); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(1, "apple", TEST_DATASET_NAME, time))); // todo - re-enable when multi-tenancy is supported - slackhq/kaldb/issues/223 // strictLogStore.logStore.addMessage( // makeMessageWithIndexAndTimestamp(2, "baby", "new" + TEST_INDEX_NAME, time.plusSeconds(1))); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(3, "apple baby", TEST_DATASET_NAME, time.plusSeconds(2))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 3, "apple baby", TEST_DATASET_NAME, time.plusSeconds(2)))); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(4, "car", TEST_DATASET_NAME, time.plusSeconds(3))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(4, "car", TEST_DATASET_NAME, time.plusSeconds(3)))); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp( - 5, "apple baby car", TEST_DATASET_NAME, time.plusSeconds(4))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 5, "apple baby car", TEST_DATASET_NAME, time.plusSeconds(4)))); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); } @@ -92,9 +98,12 @@ public void testTimeBoundSearch() { LocalDateTime.ofEpochSecond(1593365471, 0, ZoneOffset.UTC) .atZone(ZoneOffset.UTC) .toInstant(); - strictLogStore.logStore.addMessage(makeMessageWithIndexAndTimestamp(1, "test1", "test", time)); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(1, "test1", "test", time.plusSeconds(100))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(1, "test1", "test", time))); + strictLogStore.logStore.addMessage( + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(1, "test1", "test", time.plusSeconds(100)))); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); @@ -171,8 +180,12 @@ public void testTimeBoundSearch() { @Disabled // todo - re-enable when multi-tenancy is supported - slackhq/kaldb/issues/223 public void testIndexBoundSearch() { Instant time = Instant.ofEpochSecond(1593365471); - strictLogStore.logStore.addMessage(makeMessageWithIndexAndTimestamp(1, "test1", "idx", time)); - strictLogStore.logStore.addMessage(makeMessageWithIndexAndTimestamp(1, "test1", "idx1", time)); + strictLogStore.logStore.addMessage( + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(1, "test1", "idx", time))); + strictLogStore.logStore.addMessage( + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(1, "test1", "idx1", time))); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); @@ -266,8 +279,9 @@ public void testSearchMultipleItemsAndIndices() { public void testAllQueryWithFullTextSearchEnabled() { Instant time = Instant.now(); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp( - 1, "apple", TEST_DATASET_NAME, time, Map.of("customField", "value"))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 1, "apple", TEST_DATASET_NAME, time, Map.of("customField", "value")))); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); @@ -309,8 +323,9 @@ public void testAllQueryWithFullTextSearchEnabled() { public void testAllQueryWithFullTextSearchDisabled() { Instant time = Instant.now(); strictLogStoreWithoutFts.logStore.addMessage( - makeMessageWithIndexAndTimestamp( - 1, "apple", TEST_DATASET_NAME, time, Map.of("customField", "value"))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 1, "apple", TEST_DATASET_NAME, time, Map.of("customField", "value")))); strictLogStoreWithoutFts.logStore.commit(); strictLogStoreWithoutFts.logStore.refresh(); @@ -352,11 +367,13 @@ public void testAllQueryWithFullTextSearchDisabled() { public void testExistsQuery() { Instant time = Instant.now(); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp( - 1, "apple", TEST_DATASET_NAME, time, Map.of("customField", "value"))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 1, "apple", TEST_DATASET_NAME, time, Map.of("customField", "value")))); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp( - 1, "apple", TEST_DATASET_NAME, time, Map.of("customField1", "value"))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 1, "apple", TEST_DATASET_NAME, time, Map.of("customField1", "value")))); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); @@ -398,11 +415,16 @@ public void testExistsQuery() { public void testRangeQuery() { Instant time = Instant.now(); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(1, "apple", TEST_DATASET_NAME, time, Map.of("val", 1))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 1, "apple", TEST_DATASET_NAME, time, Map.of("val", 1)))); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(2, "bear", TEST_DATASET_NAME, time, Map.of("val", 2))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 2, "bear", TEST_DATASET_NAME, time, Map.of("val", 2)))); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(3, "car", TEST_DATASET_NAME, time, Map.of("val", 3))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(3, "car", TEST_DATASET_NAME, time, Map.of("val", 3)))); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); @@ -433,12 +455,23 @@ public void testRangeQuery() { public void testQueryParsingFieldTypes() { Instant time = Instant.now(); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp( - 1, - "apple", - TEST_DATASET_NAME, - time, - Map.of("boolval", true, "intval", 1, "longval", 2L, "floatval", 3F, "doubleval", 4D))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 1, + "apple", + TEST_DATASET_NAME, + time, + Map.of( + "boolval", + true, + "intval", + 1, + "longval", + 2L, + "floatval", + 3F, + "doubleval", + 4D)))); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); @@ -530,9 +563,12 @@ public void testSearchMultipleCommits() { Instant time = Instant.ofEpochSecond(1593365471); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(1, "apple", TEST_DATASET_NAME, time)); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(1, "apple", TEST_DATASET_NAME, time))); strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(2, "apple baby", TEST_DATASET_NAME, time.plusSeconds(2))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 2, "apple baby", TEST_DATASET_NAME, time.plusSeconds(2)))); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); @@ -554,7 +590,8 @@ public void testSearchMultipleCommits() { // Add car but don't commit. So, no results for car. strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp(3, "car", TEST_DATASET_NAME, time.plusSeconds(3))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp(3, "car", TEST_DATASET_NAME, time.plusSeconds(3)))); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, strictLogStore.metricsRegistry)).isEqualTo(3); assertThat(getCount(MESSAGES_FAILED_COUNTER, strictLogStore.metricsRegistry)).isEqualTo(0); @@ -603,8 +640,9 @@ public void testSearchMultipleCommits() { // Add another message to search, refresh but don't commit. strictLogStore.logStore.addMessage( - makeMessageWithIndexAndTimestamp( - 4, "apple baby car", TEST_DATASET_NAME, time.plusSeconds(4))); + MessageUtil.convertLogMessageToSpan( + makeMessageWithIndexAndTimestamp( + 4, "apple baby car", TEST_DATASET_NAME, time.plusSeconds(4)))); strictLogStore.logStore.refresh(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, strictLogStore.metricsRegistry)).isEqualTo(4); @@ -788,7 +826,8 @@ public void testFullIndexSearchForMaxAgg() { InternalMax internalMax = (InternalMax) Objects.requireNonNull(allIndexItems.internalAggregation); - // NOTE: 1.593365475E12 is the epoch seconds above, with 4 more seconds added on due to the test + // NOTE: 1.593365475E12 is the epoch seconds above, with 4 more seconds added on due to the + // test // data, but in // milliseconds and in scientific notation assertThat(internalMax.getValue()).isEqualTo(Double.parseDouble("1.593365475E12")); @@ -982,7 +1021,7 @@ public void testFullTextSearch() { final LogMessage msg1 = makeMessageWithIndexAndTimestamp( 1, "apple", TEST_DATASET_NAME, time.plusSeconds(4), Map.of("field1", "1234")); - strictLogStore.logStore.addMessage(msg1); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg1)); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); // Search using _all field. @@ -1047,7 +1086,7 @@ public void testFullTextSearch() { final LogMessage msg2 = makeMessageWithIndexAndTimestamp( 2, "apple baby", TEST_DATASET_NAME, time.plusSeconds(4), Map.of("field1", "1234")); - strictLogStore.logStore.addMessage(msg2); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg2)); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); // Search using _all field. @@ -1112,7 +1151,7 @@ public void testFullTextSearch() { final LogMessage msg3 = makeMessageWithIndexAndTimestamp( 3, "baby car 1234", TEST_DATASET_NAME, time.plusSeconds(4)); - strictLogStore.logStore.addMessage(msg3); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg3)); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); // Search using _all field. @@ -1259,17 +1298,17 @@ public void testDisabledFullTextSearch() { final LogMessage msg1 = makeMessageWithIndexAndTimestamp( 1, "apple", TEST_DATASET_NAME, time.plusSeconds(4), Map.of("field1", "1234")); - strictLogStoreWithoutFts.logStore.addMessage(msg1); + strictLogStoreWithoutFts.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg1)); final LogMessage msg2 = makeMessageWithIndexAndTimestamp( 2, "apple baby", TEST_DATASET_NAME, time.plusSeconds(4), Map.of("field2", "1234")); - strictLogStoreWithoutFts.logStore.addMessage(msg2); + strictLogStoreWithoutFts.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg2)); final LogMessage msg3 = makeMessageWithIndexAndTimestamp( 3, "baby car 1234", TEST_DATASET_NAME, time.plusSeconds(4)); - strictLogStoreWithoutFts.logStore.addMessage(msg3); + strictLogStoreWithoutFts.logStore.addMessage(MessageUtil.convertLogMessageToSpan(msg3)); strictLogStoreWithoutFts.logStore.commit(); strictLogStoreWithoutFts.logStore.refresh(); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImplTest.java index 6266bd8d79..6a34796345 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/SearchResultAggregatorImplTest.java @@ -471,7 +471,7 @@ private InternalAggregation makeHistogram( tempFolder.getCanonicalPath(), false); MeterRegistry metricsRegistry = new SimpleMeterRegistry(); - DocumentBuilder documentBuilder = + DocumentBuilder documentBuilder = SchemaAwareLogDocumentBuilderImpl.build( SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.DROP_FIELD, true, @@ -483,7 +483,7 @@ private InternalAggregation makeHistogram( new LogIndexSearcherImpl(logStore.getSearcherManager(), logStore.getSchema()); for (LogMessage logMessage : logMessages) { - logStore.addMessage(logMessage); + logStore.addMessage(MessageUtil.convertLogMessageToSpan(logMessage)); } logStore.commit(); logStore.refresh(); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/StatsCollectorTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/StatsCollectorTest.java index f581ba2444..9f34c9db40 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/StatsCollectorTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/StatsCollectorTest.java @@ -13,6 +13,7 @@ import brave.Tracing; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; +import com.slack.kaldb.testlib.MessageUtil; import com.slack.kaldb.testlib.TemporaryLogStoreAndSearcherExtension; import java.io.IOException; import java.time.Instant; @@ -48,11 +49,11 @@ public void testStatsCollectorWithPerMinuteMessages() { LogMessage m5 = makeMessageWithIndexAndTimestamp( 5, "apple baby car", TEST_DATASET_NAME, time.plusSeconds(4 * 60)); - strictLogStore.logStore.addMessage(m1); - strictLogStore.logStore.addMessage(m2); - strictLogStore.logStore.addMessage(m3); - strictLogStore.logStore.addMessage(m4); - strictLogStore.logStore.addMessage(m5); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(m1)); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(m2)); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(m3)); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(m4)); + strictLogStore.logStore.addMessage(MessageUtil.convertLogMessageToSpan(m5)); strictLogStore.logStore.commit(); strictLogStore.logStore.refresh(); diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/MessageUtil.java b/kaldb/src/test/java/com/slack/kaldb/testlib/MessageUtil.java index 0c6349ddc2..27e79524d7 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/MessageUtil.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/MessageUtil.java @@ -1,18 +1,15 @@ package com.slack.kaldb.testlib; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.protobuf.ByteString; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.LogWireMessage; -import com.slack.kaldb.util.JsonUtil; -import java.io.IOException; -import java.net.ServerSocket; +import com.slack.service.murron.trace.Trace; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.IntStream; +import java.util.concurrent.TimeUnit; public class MessageUtil { // TODO: Add Timer @@ -35,27 +32,6 @@ public static LogWireMessage makeWireMessage(int i, Map properti return makeWireMessage(i, Instant.now(), properties); } - public static String makeLogMessageJSON(int i, Instant timeStamp) throws JsonProcessingException { - String id = DEFAULT_MESSAGE_PREFIX + i; - Map fieldMap = new HashMap<>(); - fieldMap.put("type", TEST_MESSAGE_TYPE); - fieldMap.put("index", TEST_DATASET_NAME); - fieldMap.put("id", id); - - Map sourceFieldMap = new HashMap<>(); - sourceFieldMap.put(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timeStamp.toEpochMilli()); - String message = String.format("The identifier in this message is %s", id); - sourceFieldMap.put(LogMessage.ReservedField.MESSAGE.fieldName, message); - sourceFieldMap.put(TEST_SOURCE_INT_PROPERTY, i); - sourceFieldMap.put(TEST_SOURCE_LONG_PROPERTY, (long) i); - sourceFieldMap.put(TEST_SOURCE_DOUBLE_PROPERTY, (double) i); - sourceFieldMap.put(TEST_SOURCE_FLOAT_PROPERTY, (float) i); - sourceFieldMap.put(TEST_SOURCE_STRING_PROPERTY, String.format("String-%s", i)); - fieldMap.put("source", sourceFieldMap); - - return JsonUtil.writeAsString(fieldMap); - } - public static LogWireMessage makeWireMessage( int i, Instant timeStamp, Map properties) { String id = DEFAULT_MESSAGE_PREFIX + i; @@ -107,49 +83,6 @@ public static LogMessage makeMessage(int i, Instant timestamp) { return LogMessage.fromWireMessage(makeWireMessage(i, timestamp, Map.of())); } - public static LogMessage makeMessage(int i, Instant timestamp, Map properties) { - return LogMessage.fromWireMessage(makeWireMessage(i, timestamp, properties)); - } - - public static String makeSerializedMessage(int i) { - try { - return JsonUtil.writeAsString(makeWireMessage(i)); - } catch (JsonProcessingException j) { - return null; - } - } - - public static String makeSerializedBadMessage(int i) { - LogWireMessage msg = - new LogWireMessage(TEST_DATASET_NAME, null, "Message" + i, Instant.now(), null); - try { - return JsonUtil.writeAsString(msg); - } catch (JsonProcessingException e) { - return null; - } - } - - public static List makeSerializedMessages(int low, int high) { - return IntStream.rangeClosed(low, high) - .boxed() - .map(MessageUtil::makeSerializedMessage) - .collect(Collectors.toList()); - } - - public static List makeSerializedBadMessages(int low, int high) { - return IntStream.rangeClosed(low, high) - .boxed() - .map(MessageUtil::makeSerializedBadMessage) - .collect(Collectors.toList()); - } - - public static List makeMessages(int low, int high) { - return IntStream.rangeClosed(low, high) - .boxed() - .map(MessageUtil::makeMessage) - .collect(Collectors.toList()); - } - public static List makeMessagesWithTimeDifference(int low, int high) { return makeMessagesWithTimeDifference(low, high, 1); } @@ -169,9 +102,67 @@ public static List makeMessagesWithTimeDifference( return result; } - // TODO: Move this to TestKafkaServer class. - public int getPort() throws IOException { - ServerSocket socket = new ServerSocket(0); - return socket.getLocalPort(); + public static List makeMessagesWithTimeDifference1( + int low, int high, long timeDeltaMills, Instant start) { + List result = new ArrayList<>(); + for (int i = 0; i <= (high - low); i++) { + String id = DEFAULT_MESSAGE_PREFIX + (low + i); + + Instant timeStamp = start.plusNanos(1000 * 1000 * timeDeltaMills * i); + String message = String.format("The identifier in this message is %s", id); + + // fieldMap.put(TEST_SOURCE_LONG_PROPERTY, (long) i); + // fieldMap.put(TEST_SOURCE_FLOAT_PROPERTY, (float) i); + Trace.Span span = + Trace.Span.newBuilder() + .setTimestamp( + TimeUnit.MICROSECONDS.convert(timeStamp.toEpochMilli(), TimeUnit.MILLISECONDS)) + .setId(ByteString.copyFromUtf8(id)) + .addTags( + Trace.KeyValue.newBuilder() + .setVStr(message) + .setKey("message") + .setVType(Trace.ValueType.STRING) + .build()) + .addTags( + Trace.KeyValue.newBuilder() + .setVInt64(i) + .setKey(TEST_SOURCE_INT_PROPERTY) + .setVType(Trace.ValueType.INT64) + .build()) + .addTags( + Trace.KeyValue.newBuilder() + .setVFloat64(i) + .setKey(TEST_SOURCE_DOUBLE_PROPERTY) + .setVType(Trace.ValueType.FLOAT64) + .build()) + .addTags( + Trace.KeyValue.newBuilder() + .setVStr(String.format("String-%s", i)) + .setKey(TEST_SOURCE_STRING_PROPERTY) + .setVType(Trace.ValueType.STRING) + .build()) + .build(); + + result.add(span); + } + return result; + } + + public static Trace.Span convertLogMessageToSpan(LogMessage logMessage) { + Trace.Span.Builder spanBuilder = Trace.Span.newBuilder(); + spanBuilder.setId(ByteString.copyFromUtf8(logMessage.getId())); + spanBuilder.setTimestamp( + TimeUnit.MICROSECONDS.convert( + logMessage.getTimestamp().toEpochMilli(), TimeUnit.MILLISECONDS)); + // TODO + return spanBuilder.build(); + } + + public static Trace.Span withMessageId(int i) { + String id = DEFAULT_MESSAGE_PREFIX + i; + Trace.Span.Builder spanBuilder = Trace.Span.newBuilder(); + spanBuilder.setId(ByteString.copyFromUtf8(id)); + return spanBuilder.build(); } } diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java b/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java index ff24b10841..a000b4051d 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/TemporaryLogStoreAndSearcherExtension.java @@ -32,7 +32,7 @@ public static List addMessages( List messages = MessageUtil.makeMessagesWithTimeDifference(low, high); for (LogMessage m : messages) { - logStore.addMessage(m); + logStore.addMessage(MessageUtil.convertLogMessageToSpan(m)); } if (requireCommit) { logStore.commit(); diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java index d9e73e139d..0178395b86 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java @@ -109,18 +109,14 @@ private static ConsumerRecord consumerRecordWithValue(byte[] rec @Test public void insertNullRecord() throws IOException { - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(null)).isFalse(); } @Test public void testMalformedMurronApiRecord() throws IOException { - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); ConsumerRecord apiRecord = consumerRecordWithMurronMessage( @@ -174,8 +170,7 @@ public void testAvgMessageSizeCalculationOnSpanIngestion() throws Exception { .collect(Collectors.toList()); IndexingChunkManager chunkManager = localChunkManagerUtil.chunkManager; - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl(chunkManager, LogMessageWriterImpl.traceSpanTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManager); for (Trace.Span span : spans) { ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); @@ -240,9 +235,7 @@ public void testUseIncorrectDataTransformer() throws IOException { .build(); ConsumerRecord spanRecord = consumerRecordWithMurronMessage(testMurronMsg); - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(spanRecord)).isFalse(); } @@ -269,9 +262,7 @@ public void testIngestTraceSpan() throws IOException { msgType); ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.traceSpanTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(1); @@ -324,9 +315,7 @@ public void parseAndIndexBulkApiRequestTest() throws IOException { IngestDocument ingestDocument = convertRequestToDocument(indexRequest); Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument); ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.traceSpanTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); } @@ -359,9 +348,7 @@ public void parseAndIndexBulkApiRequestTest() throws IOException { @Test public void testNullTraceSpan() throws IOException { - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.traceSpanTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(null)).isFalse(); } diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java index 2185beb1bf..ee1656b687 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java @@ -2,7 +2,6 @@ import static com.slack.kaldb.chunkManager.RecoveryChunkManager.LIVE_MESSAGES_INDEXED; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.server.ValidateKaldbConfig.INDEXER_DATA_TRANSFORMER_MAP; import static com.slack.kaldb.testlib.ChunkManagerUtil.makeChunkManagerUtil; import static com.slack.kaldb.testlib.MetricsUtil.getCount; import static com.slack.kaldb.testlib.MetricsUtil.getValue; @@ -93,8 +92,7 @@ public void setUp() throws Exception { chunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); LogMessageWriterImpl logMessageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); + new LogMessageWriterImpl(chunkManagerUtil.chunkManager); KaldbConfigs.KafkaConfig kafkaConfig = KaldbConfigs.KafkaConfig.newBuilder() .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC) @@ -302,9 +300,7 @@ public void setUp() throws Exception { chunkManagerUtil.chunkManager.startAsync(); chunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); - logMessageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, INDEXER_DATA_TRANSFORMER_MAP.get("spans")); + logMessageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); } @AfterEach @@ -406,8 +402,7 @@ public static TestKafkaServer.KafkaComponents getKafkaTestServer(S3MockExtension localChunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); LogMessageWriterImpl logMessageWriter = - new LogMessageWriterImpl( - localChunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); + new LogMessageWriterImpl(localChunkManagerUtil.chunkManager); AdminClient adminClient = AdminClient.create(