diff --git a/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java b/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java deleted file mode 100644 index 0ec8e8c0dd..0000000000 --- a/benchmarks/src/main/java/com/slack/kaldb/IndexAPILog.java +++ /dev/null @@ -1,187 +0,0 @@ -package com.slack.kaldb; - -import com.google.protobuf.ByteString; -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LuceneIndexStoreImpl; -import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; -import com.slack.kaldb.writer.LogMessageWriterImpl; -import com.slack.service.murron.Murron; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.text.SimpleDateFormat; -import java.time.Duration; -import java.util.Comparator; -import java.util.Random; -import java.util.stream.Stream; -import org.apache.commons.io.FileUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.record.TimestampType; -import org.apache.lucene.store.Directory; -import org.openjdk.jmh.annotations.*; - -@State(Scope.Thread) -public class IndexAPILog { - - private Random random; - private final Duration commitInterval = Duration.ofSeconds(5 * 60); - private final Duration refreshInterval = Duration.ofSeconds(5 * 60); - - private Path tempDirectory; - private MeterRegistry registry; - LuceneIndexStoreImpl logStore; - - private String apiLogFile; - private BufferedReader reader; - private static SimpleDateFormat df = new SimpleDateFormat("yyyy-mm-ddHH:mm:ss.SSSzzz"); - private int skipCount; - private int indexCount; - - @Setup(Level.Iteration) - public void createIndexer() throws Exception { - random = new Random(); - registry = new SimpleMeterRegistry(); - tempDirectory = - Files.createDirectories( - Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); - logStore = - LuceneIndexStoreImpl.makeLogStore( - tempDirectory.toFile(), - commitInterval, - refreshInterval, - true, - SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, - registry); - - apiLogFile = System.getProperty("jmh.api.log.file", "api_logs.txt"); - reader = Files.newBufferedReader(Path.of(apiLogFile)); - skipCount = 0; - indexCount = 0; - } - - @TearDown(Level.Iteration) - public void tearDown() throws IOException { - Directory directory = logStore.getIndexWriter().getDirectory(); - String[] segmentFiles = directory.listAll(); - long indexedBytes = 0; - for (String segmentFile : segmentFiles) { - indexedBytes += directory.fileLength(segmentFile); - } - if (indexCount != 0) { - // Displaying indexCount only makes sense in measureAPILogIndexingSlingshotMode - System.out.println( - "Indexed = " - + indexCount - + " Skipped = " - + skipCount - + " Index size = " - + FileUtils.byteCountToDisplaySize(indexedBytes)); - } else { - System.out.println( - "Skipped = " - + skipCount - + " Index size = " - + FileUtils.byteCountToDisplaySize(indexedBytes)); - } - logStore.close(); - try (Stream walk = Files.walk(tempDirectory)) { - walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); - } - registry.close(); - if (reader != null) { - reader.close(); - } - } - - @Benchmark - public void measureAPILogIndexing() throws IOException { - String line = reader.readLine(); - if (line != null) { - // Work that ideally shouldn't count towards benchmark performance result - ConsumerRecord kafkaRecord = makeConsumerRecord(line); - if (kafkaRecord == null) { - // makeConsumerRecord will print why we skipped - return; - } - // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic - try { - LogMessage localLogMessage = - LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - logStore.addMessage(localLogMessage); - } catch (Exception e) { - System.out.println("skipping - cannot transform " + e); - skipCount++; - } - } else { - System.out.println("resetting - reach EOF"); - reader = Files.newBufferedReader(Path.of(apiLogFile)); - } - } - - @Benchmark - public void measureAPILogIndexingSlingshotMode() throws IOException { - String line; - do { - line = reader.readLine(); - if (line != null) { - // Work that ideally shouldn't count towards benchmark performance result - ConsumerRecord kafkaRecord = makeConsumerRecord(line); - if (kafkaRecord == null) { - // makeConsumerRecord will print why we skipped - continue; - } - // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic - try { - LogMessage localLogMessage = - LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - logStore.addMessage(localLogMessage); - indexCount++; - } catch (Exception e) { - System.out.println("skipping - cannot transform " + e); - } - } - } while (line != null); - } - - public ConsumerRecord makeConsumerRecord(String line) { - try { - // get start of messageBody - int messageDivision = line.indexOf("{"); - - // Everything will there is metadata - String[] splitLine = line.substring(0, messageDivision - 1).split("\\s+"); - String ts = splitLine[0] + splitLine[1] + splitLine[2] + splitLine[3]; - long timestamp = df.parse(ts).toInstant().toEpochMilli(); - - String message = line.substring(messageDivision); - Murron.MurronMessage testMurronMsg = - Murron.MurronMessage.newBuilder() - .setMessage(ByteString.copyFrom((message).getBytes(StandardCharsets.UTF_8))) - .setType(splitLine[5]) - .setHost(splitLine[4]) - .setTimestamp(timestamp) - .build(); - return new ConsumerRecord<>( - "testTopic", - 1, - 10, - 0L, - TimestampType.CREATE_TIME, - 0L, - 0, - 0, - "testKey", - testMurronMsg.toByteString().toByteArray()); - } catch (Exception e) { - System.out.println("skipping - cannot parse input" + e); - skipCount++; - return null; - } - } -} diff --git a/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java deleted file mode 100644 index 6b9bbd8fce..0000000000 --- a/benchmarks/src/main/java/com/slack/kaldb/IndexingBenchmark.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.slack.kaldb; - -import static com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD; - -import com.google.protobuf.ByteString; -import com.slack.kaldb.logstore.DocumentBuilder; -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LuceneIndexStoreImpl; -import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; -import com.slack.kaldb.writer.LogMessageWriterImpl; -import com.slack.service.murron.Murron; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Duration; -import java.util.Comparator; -import java.util.Random; -import java.util.stream.Stream; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.record.TimestampType; -import org.apache.lucene.document.*; -import org.apache.lucene.index.IndexWriter; -import org.openjdk.jmh.annotations.*; - -@State(Scope.Thread) -public class IndexingBenchmark { - - private final Duration commitInterval = Duration.ofSeconds(5 * 60); - private final Duration refreshInterval = Duration.ofSeconds(5 * 60); - - private Path tempDirectory; - private MeterRegistry registry; - LuceneIndexStoreImpl logStore; - private Random random; - - private ConsumerRecord kafkaRecord; - private LogMessage logMessage; - private Document luceneDocument; - - @Setup(Level.Iteration) - public void createIndexer() throws Exception { - random = new Random(); - registry = new SimpleMeterRegistry(); - tempDirectory = - Files.createDirectories( - Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); - logStore = - LuceneIndexStoreImpl.makeLogStore( - tempDirectory.toFile(), - commitInterval, - refreshInterval, - true, - CONVERT_VALUE_AND_DUPLICATE_FIELD, - registry); - - String message = - """ - { - "ip_address": "127.0.0.1", - "http_method": "POST", - "method": "callbacks.test", - "enterprise": "E1234ABCD56", - "team": "T98765XYZ12", - "user": "U000111222A", - "status": "ok", - "http_params": "param1=value1¶m2=value2¶m3=false", - "ua": "Hello-World-Web\\/vef2bd:1234", - "unique_id": "YBBccDDuu17CxYza6abcDEFzYzz", - "request_queue_time": 2262, - "microtime_elapsed": 1418, - "mysql_query_count": 0, - "mysql_query_time": 0, - "mysql_conns_count": 0, - "mysql_conns_time": 0, - "mysql_rows_count": 0, - "mysql_rows_affected": 0, - "my_queries_count": 11, - "my_queries_time": 6782, - "frl_time": 0, - "init_time": 1283, - "api_dispatch_time": 0, - "api_output_time": 0, - "api_output_size": 0, - "api_strict": false, - "decrypt_reqs_time": 0, - "decrypt_reqs_count": 0, - "encrypt_reqs_time": 0, - "encrypt_reqs_count": 0, - "grpc_req_count": 0, - "grpc_req_time": 0, - "service_req_count": 0, - "service_req_time": 0, - "trace": "#route_main() -> lib_controller.php:12#Controller::handlePost() -> Controller.php:58#CallbackApiController::handleRequest() -> api.php:100#local_callbacks_api_main_inner() -> api.php:250#api_dispatch() -> lib_api.php:000#api_callbacks_service_verifyToken() -> api__callbacks_service.php:1500#api_output_fb_thrift() -> lib_api_output.php:390#_api_output_log_call()", - "client_connection_state": "unset", - "ms_requests_count": 0, - "ms_requests_time": 0, - "token_type": "cookie", - "another_param": "", - "another_value": "", - "auth": true, - "ab_id": "1234abc12d:host-abc-dev-region-1234", - "external_user": "W012XYZAB", - "timestamp": "2021-02-05 10:41:52.340", - "sha": "unknown", - "php_version": "5.11.0", - "paramX": "yet.another.value", - "php_type": "api", - "bucket_type_something": 0, - "cluster_name": "cluster", - "cluster_param": "normal", - "env": "env-value", - "last_param": "lastvalue", - "level": "info" - } - """; - - String indexName = "hhvm-api_log"; - String host = "company-www-php-dev-cluster-abc-x8ab"; - long timestamp = 1612550512340953000L; - Murron.MurronMessage testMurronMsg = - Murron.MurronMessage.newBuilder() - .setMessage(ByteString.copyFrom(message.getBytes(StandardCharsets.UTF_8))) - .setType(indexName) - .setHost(host) - .setTimestamp(timestamp) - .build(); - - kafkaRecord = - new ConsumerRecord<>( - "testTopic", - 1, - 10, - 0L, - TimestampType.CREATE_TIME, - 0L, - 0, - 0, - "testKey", - testMurronMsg.toByteString().toByteArray()); - - logMessage = LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - - DocumentBuilder documentBuilder = - SchemaAwareLogDocumentBuilderImpl.build(CONVERT_VALUE_AND_DUPLICATE_FIELD, true, registry); - - luceneDocument = documentBuilder.fromMessage(logMessage); - } - - @TearDown(Level.Iteration) - public void tearDown() throws IOException { - logStore.close(); - try (Stream walk = Files.walk(tempDirectory)) { - walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); - } - registry.close(); - } - - @Benchmark - public void measureIndexingAsKafkaSerializedDocument() throws Exception { - // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic - LogMessage localLogMessage = - LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - logStore.addMessage(localLogMessage); - } - - @Benchmark - public void measureIndexingAsLogMessage() { - logStore.addMessage(logMessage); - } - - @Benchmark - public void measureIndexingAsLuceneDocument() { - IndexWriter indexWriter = logStore.getIndexWriter(); - try { - indexWriter.addDocument(luceneDocument); - } catch (IOException e) { - e.printStackTrace(); - } - } -} diff --git a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java b/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java deleted file mode 100644 index 19c28c65de..0000000000 --- a/benchmarks/src/main/java/com/slack/kaldb/QueryBenchmark.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.slack.kaldb; - -import brave.Tracer; -import brave.Tracing; -import com.google.protobuf.ByteString; -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.logstore.LuceneIndexStoreImpl; -import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; -import com.slack.kaldb.logstore.search.LogIndexSearcher; -import com.slack.kaldb.logstore.search.LogIndexSearcherImpl; -import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; -import com.slack.kaldb.writer.LogMessageWriterImpl; -import com.slack.service.murron.Murron; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.text.SimpleDateFormat; -import java.time.Duration; -import java.util.Comparator; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; -import java.util.stream.Stream; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.record.TimestampType; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; - -@State(Scope.Thread) -public class QueryBenchmark { - private final Duration commitInterval = Duration.ofSeconds(5 * 60); - private final Duration refreshInterval = Duration.ofSeconds(5 * 60); - - private Path tempDirectory; - private MeterRegistry registry; - LuceneIndexStoreImpl logStore; - - private static final SimpleDateFormat df = new SimpleDateFormat("yyyy-mm-ddHH:mm:ss.SSSzzz"); - private LogIndexSearcher logIndexSearcher; - - @Setup(Level.Trial) - public void createIndexer() throws Exception { - // active tracer required for search currently - Tracing.newBuilder().build(); - - // raises logging level of Tracer to Warning, to prevent excessive log messages - final Logger logger = Logger.getLogger(Tracer.class.getName()); - logger.setLevel(java.util.logging.Level.WARNING); - - Random random = new Random(); - registry = new SimpleMeterRegistry(); - tempDirectory = - Files.createDirectories( - Paths.get("jmh-output", String.valueOf(random.nextInt(Integer.MAX_VALUE)))); - logStore = - LuceneIndexStoreImpl.makeLogStore( - tempDirectory.toFile(), - commitInterval, - refreshInterval, - true, - SchemaAwareLogDocumentBuilderImpl.FieldConflictPolicy.CONVERT_VALUE_AND_DUPLICATE_FIELD, - registry); - - String apiLogFile = System.getProperty("jmh.api.log.file", "api_logs.txt"); - - // startup multi-threaded log message population - ExecutorService executorService = Executors.newFixedThreadPool(6); - try (BufferedReader reader = Files.newBufferedReader(Path.of(apiLogFile))) { - String line; - do { - line = reader.readLine(); - if (line != null) { - String finalLine = line; - executorService.submit( - () -> { - // Work that ideally shouldn't count towards benchmark performance result - ConsumerRecord kafkaRecord = makeConsumerRecord(finalLine); - if (kafkaRecord == null) { - // makeConsumerRecord will print why we skipped - return; - } - // Mimic LogMessageWriterImpl#insertRecord kinda without the chunk rollover logic - try { - LogMessage localLogMessage = - LogMessageWriterImpl.apiLogTransformer.toLogMessage(kafkaRecord).get(0); - logStore.addMessage(localLogMessage); - } catch (Exception e) { - // ignored - } - }); - } - } while (line != null); - } - executorService.shutdown(); - executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - - logStore.commit(); - logStore.refresh(); - logIndexSearcher = - new LogIndexSearcherImpl(logStore.getSearcherManager(), logStore.getSchema()); - } - - @TearDown(Level.Trial) - public void tearDown() throws IOException { - logStore.close(); - try (Stream walk = Files.walk(tempDirectory)) { - walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); - } - registry.close(); - } - - @Benchmark - public void measureLogSearcherSearch() { - logIndexSearcher.search( - "*", - "", - 0L, - Long.MAX_VALUE, - 500, - new DateHistogramAggBuilder( - "1", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, "100d")); - } - - public ConsumerRecord makeConsumerRecord(String line) { - try { - // get start of messageBody - int messageDivision = line.indexOf("{"); - - // Everything will there is metadata - String[] splitLine = line.substring(0, messageDivision - 1).split("\\s+"); - String ts = splitLine[0] + splitLine[1] + splitLine[2] + splitLine[3]; - long timestamp = df.parse(ts).toInstant().toEpochMilli(); - - String message = line.substring(messageDivision); - Murron.MurronMessage testMurronMsg = - Murron.MurronMessage.newBuilder() - .setMessage(ByteString.copyFrom((message).getBytes(StandardCharsets.UTF_8))) - .setType(splitLine[5]) - .setHost(splitLine[4]) - .setTimestamp(timestamp) - .build(); - return new ConsumerRecord<>( - "testTopic", - 1, - 10, - 0L, - TimestampType.CREATE_TIME, - 0L, - 0, - 0, - "testKey", - testMurronMsg.toByteString().toByteArray()); - } catch (Exception e) { - return null; - } - } -} diff --git a/config/config.yaml b/config/config.yaml index a261caccb0..6f53a74f47 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -8,7 +8,6 @@ indexerConfig: refreshDurationSecs: ${INDEXER_REFRESH_DURATION_SECS:-11} enableFullTextSearch: ${INDEXER_ENABLE_FULL_TEXT_SEARCH:-false} staleDurationSecs: ${INDEXER_STALE_DURATION_SECS:-7200} - dataTransformer: ${INDEXER_DATA_TRANSFORMER:-trace_span} dataDirectory: ${INDEXER_DATA_DIR:-/tmp} maxOffsetDelayMessages: ${INDEXER_MAX_OFFSET_DELAY_MESSAGES:-10000000} defaultQueryTimeoutMs: ${KALDB_INDEX_DEFAULT_QUERY_TIMEOUT_MS:-2500} diff --git a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java index 1bc97b4d12..4ec16684db 100644 --- a/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/recovery/RecoveryService.java @@ -1,7 +1,6 @@ package com.slack.kaldb.recovery; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.server.ValidateKaldbConfig.INDEXER_DATA_TRANSFORMER_MAP; import static com.slack.kaldb.util.TimeUtils.nanosToMillis; import com.google.common.annotations.VisibleForTesting; @@ -22,7 +21,6 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.proto.metadata.Metadata; -import com.slack.kaldb.writer.LogMessageTransformer; import com.slack.kaldb.writer.LogMessageWriterImpl; import com.slack.kaldb.writer.kafka.KaldbKafkaConsumer; import io.micrometer.core.instrument.Counter; @@ -305,10 +303,7 @@ boolean handleRecoveryTask(RecoveryTaskMetadata recoveryTaskMetadata) { kaldbConfig.getS3Config()); // Ingest data in parallel - LogMessageTransformer messageTransformer = - INDEXER_DATA_TRANSFORMER_MAP.get(kaldbConfig.getIndexerConfig().getDataTransformer()); - LogMessageWriterImpl logMessageWriterImpl = - new LogMessageWriterImpl(chunkManager, messageTransformer); + LogMessageWriterImpl logMessageWriterImpl = new LogMessageWriterImpl(chunkManager); KaldbKafkaConsumer kafkaConsumer = new KaldbKafkaConsumer( makeKafkaConfig( diff --git a/kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java b/kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java index 5da436f389..0cc80fa170 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/KaldbIndexer.java @@ -2,7 +2,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.server.ValidateKaldbConfig.INDEXER_DATA_TRANSFORMER_MAP; import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.slack.kaldb.chunkManager.ChunkRollOverException; @@ -12,7 +11,6 @@ import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.util.RuntimeHalterImpl; -import com.slack.kaldb.writer.LogMessageTransformer; import com.slack.kaldb.writer.LogMessageWriterImpl; import com.slack.kaldb.writer.kafka.KaldbKafkaConsumer; import io.micrometer.core.instrument.MeterRegistry; @@ -67,10 +65,7 @@ public KaldbIndexer( // Create a chunk manager this.chunkManager = chunkManager; // set up indexing pipelne - LogMessageTransformer messageTransformer = - INDEXER_DATA_TRANSFORMER_MAP.get(indexerConfig.getDataTransformer()); - LogMessageWriterImpl logMessageWriterImpl = - new LogMessageWriterImpl(chunkManager, messageTransformer); + LogMessageWriterImpl logMessageWriterImpl = new LogMessageWriterImpl(chunkManager); this.kafkaConsumer = new KaldbKafkaConsumer(kafkaConfig, logMessageWriterImpl, meterRegistry); } diff --git a/kaldb/src/main/java/com/slack/kaldb/server/ValidateKaldbConfig.java b/kaldb/src/main/java/com/slack/kaldb/server/ValidateKaldbConfig.java index 7efc85de6a..9bdeb678b0 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/ValidateKaldbConfig.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/ValidateKaldbConfig.java @@ -2,14 +2,9 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import com.slack.kaldb.proto.config.KaldbConfigs; -import com.slack.kaldb.writer.LogMessageTransformer; -import com.slack.kaldb.writer.LogMessageWriterImpl; import java.util.Arrays; import java.util.List; -import java.util.Map; public class ValidateKaldbConfig { @@ -32,7 +27,6 @@ public static void validateConfig(KaldbConfigs.KaldbConfig kaldbConfig) { } private static void validateIndexConfig(KaldbConfigs.IndexerConfig indexerConfig) { - validateDataTransformerConfig(indexerConfig.getDataTransformer()); checkArgument( indexerConfig.getServerConfig().getRequestTimeoutMs() >= 3000, "IndexerConfig requestTimeoutMs cannot less than 3000ms"); @@ -71,23 +65,6 @@ private static void validateCacheConfig(KaldbConfigs.CacheConfig cacheConfig) { "CacheConfig requestTimeoutMs must be higher than defaultQueryTimeoutMs"); } - @VisibleForTesting - public static final Map INDEXER_DATA_TRANSFORMER_MAP = - ImmutableMap.of( - "api_log", - LogMessageWriterImpl.apiLogTransformer, - "trace_span", - LogMessageWriterImpl.traceSpanTransformer); - - public static void validateDataTransformerConfig(String dataTransformerConfig) { - checkArgument( - dataTransformerConfig != null && !dataTransformerConfig.isEmpty(), - "IndexerConfig can't have an empty dataTransformer config."); - checkArgument( - INDEXER_DATA_TRANSFORMER_MAP.containsKey(dataTransformerConfig), - "Invalid data transformer config: " + dataTransformerConfig); - } - public static void validateNodeRoles(List nodeRoleList) { // We don't need further checks for node roles since JSON parsing will throw away roles not part // of the enum diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageTransformer.java b/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageTransformer.java deleted file mode 100644 index 744a60c36d..0000000000 --- a/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageTransformer.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.slack.kaldb.writer; - -import com.slack.kaldb.logstore.LogMessage; -import java.util.List; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -/** An interface a ConsumerRecord message from Kafka into a LogMessage. */ -@FunctionalInterface -public interface LogMessageTransformer { - List toLogMessage(ConsumerRecord record) throws Exception; -} diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java b/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java index cae310b7ae..b468bc3aca 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java @@ -2,13 +2,10 @@ import com.slack.kaldb.chunkManager.ChunkManager; import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.preprocessor.KaldbSerdes; -import com.slack.service.murron.Murron; import com.slack.service.murron.trace.Trace; import java.io.IOException; import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,34 +49,10 @@ public class LogMessageWriterImpl implements MessageWriter { private static final Logger LOG = LoggerFactory.getLogger(LogMessageWriterImpl.class); - private static final Deserializer murronMessageDeserializer = - KaldbSerdes.MurronMurronMessage().deserializer(); - - // An apiLog message is a json blob wrapped in a murron message. - @Deprecated - public static final LogMessageTransformer apiLogTransformer = - (ConsumerRecord record) -> { - final Murron.MurronMessage murronMsg = - murronMessageDeserializer.deserialize("", record.value()); - Trace.Span apiSpan = MurronLogFormatter.fromApiLog(murronMsg); - return SpanFormatter.toLogMessage(Trace.ListOfSpans.newBuilder().addSpans(apiSpan).build()); - }; - - // A protobuf Trace.Span - public static final LogMessageTransformer traceSpanTransformer = - (ConsumerRecord record) -> { - final Trace.Span span = Trace.Span.parseFrom(record.value()); - final Trace.ListOfSpans listOfSpans = Trace.ListOfSpans.newBuilder().addSpans(span).build(); - return SpanFormatter.toLogMessage(listOfSpans); - }; - private final ChunkManager chunkManager; - private final LogMessageTransformer dataTransformer; - public LogMessageWriterImpl( - ChunkManager chunkManager, LogMessageTransformer dataTransformer) { + public LogMessageWriterImpl(ChunkManager chunkManager) { this.chunkManager = chunkManager; - this.dataTransformer = dataTransformer; } @Override @@ -88,7 +61,9 @@ public boolean insertRecord(ConsumerRecord record) throws IOExce final List logMessages; try { - logMessages = this.dataTransformer.toLogMessage(record); + final Trace.Span span = Trace.Span.parseFrom(record.value()); + final Trace.ListOfSpans listOfSpans = Trace.ListOfSpans.newBuilder().addSpans(span).build(); + logMessages = SpanFormatter.toLogMessage(listOfSpans); // Ideally, we should return true when logMessages are empty. But, fail the record, since we // don't expect any empty records or we may have a bug in earlier code. if (logMessages.isEmpty()) return false; diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java index 188e25aa60..2db5d61be2 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/SpanFormatter.java @@ -196,6 +196,10 @@ public static LogMessage toLogMessage(Trace.Span span) { jsonMap.put(key, tag.getVFloat64()); } else if (valueType == 4) { jsonMap.put(key, tag.getVBinary().toStringUtf8()); + } else if (valueType == 5) { + jsonMap.put(key, tag.getVInt32()); + } else if (valueType == 6) { + jsonMap.put(key, tag.getVFloat32()); } else { LOG.warn("Skipping field with unknown value type {} with key {}", valueType, key); } diff --git a/kaldb/src/main/proto/kaldb_configs.proto b/kaldb/src/main/proto/kaldb_configs.proto index 8784eb3b24..f6fd33b9c8 100644 --- a/kaldb/src/main/proto/kaldb_configs.proto +++ b/kaldb/src/main/proto/kaldb_configs.proto @@ -106,8 +106,6 @@ message IndexerConfig { // check will be executed every time we roll over a chunk. int64 stale_duration_secs = 4; - // Name of the data transformation pipeline to use when ingesting the data. - string data_transformer = 5; // Folder where data is persisted locally on disk. string data_directory = 6; // Indexer server config. diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java index cb4cf97e9d..2b510b7c8c 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkManager/IndexingChunkManagerTest.java @@ -169,7 +169,7 @@ private void initChunkManager( listeningExecutorService, curatorFramework, searchContext, - KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100)); + KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, 100)); chunkManager.startAsync(); chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); } @@ -203,7 +203,7 @@ public void testDeleteOverMaxThresholdGreaterThanZero() throws IOException, Time new DiskOrMessageCountBasedRolloverStrategy(metricsRegistry, 10 * 1024 * 1024 * 1024L, 10L); KaldbConfigs.IndexerConfig indexerConfig = - KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100, 1, 1_000_000_000L); + KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, 100, 1, 1_000_000_000L); initChunkManager( chunkRollOverStrategy, S3_TEST_BUCKET, @@ -284,7 +284,7 @@ public void testDeleteStaleDataDoesNothingWhenGivenLimitLessThan0() metricsRegistry, 10 * 1024 * 1024 * 1024L, 1000000L); KaldbConfigs.IndexerConfig indexerConfig = - KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100, -1, 10_000); + KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, 100, -1, 10_000); initChunkManager( chunkRollOverStrategy, S3_TEST_BUCKET, diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java index 7df5dfd325..db4b04cbad 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkManager/RecoveryChunkManagerTest.java @@ -141,7 +141,6 @@ private void initChunkManager(String testS3Bucket) throws Exception { "recoveryZK_", KaldbConfigs.NodeRole.RECOVERY, 10000, - "api_log", 9003, 100); diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java index 3b080ae35f..df7b5b92b2 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java @@ -142,7 +142,7 @@ private void initChunkManager( listeningExecutorService, curatorFramework, searchContext, - KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, "log_message", 100)); + KaldbConfigUtil.makeIndexerConfig(TEST_PORT, 1000, 100)); chunkManager.startAsync(); chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/KaldbLocalQueryServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/KaldbLocalQueryServiceTest.java index 1a09bf23f8..8dac14b218 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/KaldbLocalQueryServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/KaldbLocalQueryServiceTest.java @@ -70,7 +70,7 @@ public void setUp() throws Exception { metricsRegistry, 10 * 1024 * 1024 * 1024L, 100, - KaldbConfigUtil.makeIndexerConfig(1000, 1000, "log_message", 100)); + KaldbConfigUtil.makeIndexerConfig(1000, 1000, 100)); chunkManagerUtil.chunkManager.startAsync(); chunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); kaldbLocalQueryService = diff --git a/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java b/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java index d3e23fb654..d3f942230d 100644 --- a/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/recovery/RecoveryServiceTest.java @@ -145,7 +145,6 @@ private KaldbConfigs.KaldbConfig makeKaldbConfig( "recoveryZK_", KaldbConfigs.NodeRole.RECOVERY, 10000, - "api_log", 9003, 100); } diff --git a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java index 474a635701..067f2a4583 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java @@ -43,21 +43,6 @@ public void testEmptyJsonCfgFile() { .isThrownBy(() -> KaldbConfig.fromJsonConfig("")); } - @Test - public void testMissingDataTransformerConfig() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - ObjectNode indexerConfig = - mapper.createObjectNode().put("maxMessagesPerChunk", 1).put("maxBytesPerChunk", 100); - ObjectNode node = mapper.createObjectNode(); - node.set("nodeRoles", mapper.createArrayNode().add("INDEX")); - node.set("indexerConfig", indexerConfig); - final String missingRequiredField = - mapper.writerWithDefaultPrettyPrinter().writeValueAsString(node); - - assertThatIllegalArgumentException() - .isThrownBy(() -> KaldbConfig.fromJsonConfig(missingRequiredField)); - } - @Test public void testIntToStrTypeConversionForWrongJsonType() throws InvalidProtocolBufferException, JsonProcessingException { @@ -68,7 +53,6 @@ public void testIntToStrTypeConversionForWrongJsonType() .createObjectNode() .put("maxMessagesPerChunk", 1) .put("maxBytesPerChunk", 100) - .put("dataTransformer", "api_log") .put("defaultQueryTimeoutMs", "2500") .set("serverConfig", serverConfig); ObjectNode kafkaConfig = @@ -99,7 +83,6 @@ public void testStrToIntTypeConversionForWrongJsonType() .put("maxMessagesPerChunk", 1) .put("maxBytesPerChunk", 100) .put("defaultQueryTimeoutMs", "2500") - .put("dataTransformer", "api_log") .set("serverConfig", serverConfig); ObjectNode node = mapper.createObjectNode(); node.set("nodeRoles", mapper.createArrayNode().add("INDEX")); @@ -129,10 +112,8 @@ public void testIgnoreExtraConfigField() throws IOException { .createObjectNode() .put("maxMessagesPerChunk", 1) .put("maxBytesPerChunk", 100) - .put("dataTransformer", "api_log") .put("ignoredField", "ignore") .put("defaultQueryTimeoutMs", "2500") - .put("dataTransformer", "api_log") .set("serverConfig", serverConfig); indexerConfig.set("kafkaConfig", kafkaConfig); @@ -208,7 +189,6 @@ public void testParseKaldbJsonConfigFile() throws IOException { assertThat(indexerConfig.getLuceneConfig().getEnableFullTextSearch()).isTrue(); assertThat(indexerConfig.getMaxChunksOnDisk()).isEqualTo(3); assertThat(indexerConfig.getStaleDurationSecs()).isEqualTo(7200); - assertThat(indexerConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(indexerConfig.getDataDirectory()).isEqualTo("/tmp"); assertThat(indexerConfig.getServerConfig().getServerPort()).isEqualTo(8080); assertThat(indexerConfig.getServerConfig().getServerAddress()).isEqualTo("localhost"); @@ -292,7 +272,6 @@ public void testParseKaldbJsonConfigFile() throws IOException { assertThat(preprocessorConfig.getPreprocessorInstanceCount()).isEqualTo(1); assertThat(preprocessorConfig.getUpstreamTopicsList()).isEqualTo(List.of("test-topic")); assertThat(preprocessorConfig.getDownstreamTopic()).isEqualTo("test-topic-out"); - assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false); assertThat(preprocessorConfig.getRateLimitExceededErrorCode()).isEqualTo(400); @@ -385,7 +364,6 @@ public void testParseKaldbYamlConfigFile() throws IOException { assertThat(indexerConfig.getLuceneConfig().getEnableFullTextSearch()).isTrue(); assertThat(indexerConfig.getMaxChunksOnDisk()).isEqualTo(3); assertThat(indexerConfig.getStaleDurationSecs()).isEqualTo(7200); - assertThat(indexerConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(indexerConfig.getDataDirectory()).isEqualTo("/tmp"); assertThat(indexerConfig.getMaxOffsetDelayMessages()).isEqualTo(10001); assertThat(indexerConfig.getServerConfig().getServerPort()).isEqualTo(8080); @@ -468,7 +446,6 @@ public void testParseKaldbYamlConfigFile() throws IOException { assertThat(preprocessorConfig.getPreprocessorInstanceCount()).isEqualTo(1); assertThat(preprocessorConfig.getUpstreamTopicsList()).isEqualTo(List.of("test-topic")); assertThat(preprocessorConfig.getDownstreamTopic()).isEqualTo("test-topic-out"); - assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); final KaldbConfigs.KafkaConfig preprocessorKafkaConfig = @@ -515,8 +492,6 @@ public void testMissingDataTransformerConfigForCache() throws InvalidProtocolBuf assertThat(config.getNodeRolesList()).containsOnly(KaldbConfigs.NodeRole.CACHE); assertThat(config.getCacheConfig().getServerConfig().getRequestTimeoutMs()).isEqualTo(3000); assertThat(config.getCacheConfig().getDefaultQueryTimeoutMs()).isEqualTo(2500); - final KaldbConfigs.IndexerConfig indexerConfig = config.getIndexerConfig(); - assertThat(indexerConfig.getDataTransformer()).isEmpty(); } @Test @@ -524,7 +499,7 @@ public void testEmptyJsonStringInit() throws InvalidProtocolBufferException { KaldbConfigs.KaldbConfig config = KaldbConfig.fromJsonConfig( "{nodeRoles: [INDEX], " - + "indexerConfig:{dataTransformer:api_log,defaultQueryTimeoutMs:2500,serverConfig:{requestTimeoutMs:3000}}}"); + + "indexerConfig:{defaultQueryTimeoutMs:2500,serverConfig:{requestTimeoutMs:3000}}}"); assertThat(config.getNodeRolesList().size()).isEqualTo(1); @@ -557,7 +532,6 @@ public void testEmptyJsonStringInit() throws InvalidProtocolBufferException { assertThat(indexerConfig.getStaleDurationSecs()).isZero(); assertThat(indexerConfig.getDataDirectory()).isEmpty(); assertThat(indexerConfig.getDefaultQueryTimeoutMs()).isEqualTo(2500); - assertThat(indexerConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(indexerConfig.getMaxOffsetDelayMessages()).isZero(); assertThat(indexerConfig.getServerConfig().getServerPort()).isZero(); assertThat(indexerConfig.getServerConfig().getServerAddress()).isEmpty(); @@ -640,7 +614,6 @@ public void testEmptyJsonStringInit() throws InvalidProtocolBufferException { assertThat(preprocessorConfig.getPreprocessorInstanceCount()).isZero(); assertThat(preprocessorConfig.getUpstreamTopicsList()).isEmpty(); assertThat(preprocessorConfig.getDownstreamTopic()).isEmpty(); - assertThat(preprocessorConfig.getDataTransformer()).isEmpty(); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isZero(); assertThat(preprocessorConfig.getUseBulkApi()).isFalse(); @@ -662,7 +635,6 @@ public void testEmptyYamlStringInit() String yamlCfgString = "nodeRoles: [INDEX]\n" + "indexerConfig:\n" - + " dataTransformer: api_log\n" + " defaultQueryTimeoutMs: 2500\n" + " serverConfig:\n" + " requestTimeoutMs: 3000\n"; @@ -698,7 +670,6 @@ public void testEmptyYamlStringInit() assertThat(indexerConfig.getMaxChunksOnDisk()).isZero(); assertThat(indexerConfig.getStaleDurationSecs()).isZero(); assertThat(indexerConfig.getDataDirectory()).isEmpty(); - assertThat(indexerConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(indexerConfig.getMaxOffsetDelayMessages()).isZero(); assertThat(indexerConfig.getServerConfig().getServerPort()).isZero(); assertThat(indexerConfig.getServerConfig().getServerAddress()).isEmpty(); @@ -771,7 +742,6 @@ public void testEmptyYamlStringInit() assertThat(preprocessorConfig.getPreprocessorInstanceCount()).isZero(); assertThat(preprocessorConfig.getUpstreamTopicsList()).isEmpty(); assertThat(preprocessorConfig.getDownstreamTopic()).isEmpty(); - assertThat(preprocessorConfig.getDataTransformer()).isEmpty(); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isZero(); final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); @@ -798,7 +768,6 @@ public void testNodeRoleValidation() throws Exception { String yamlCfgString = "nodeRoles: [INDEX]\n" + "indexerConfig:\n" - + " dataTransformer: api_log\n" + " defaultQueryTimeoutMs: 2500\n" + " serverConfig:\n" + " requestTimeoutMs: 3000\n" @@ -816,7 +785,6 @@ public void testBadDefaultQueryTimeoutMs() { final String yamlCfgString = "nodeRoles: [INDEX]\n" + "indexerConfig:\n" - + " dataTransformer: api_log\n" + " defaultQueryTimeoutMs: 3500\n" + " serverConfig:\n" + " requestTimeoutMs: 3000\n" @@ -828,7 +796,6 @@ public void testBadDefaultQueryTimeoutMs() { final String yamlCfgString1 = "nodeRoles: [INDEX]\n" + "indexerConfig:\n" - + " dataTransformer: api_log\n" + " defaultQueryTimeoutMs: 2500\n" + " serverConfig:\n" + " requestTimeoutMs: 2999\n" diff --git a/kaldb/src/test/java/com/slack/kaldb/server/KaldbIndexerTest.java b/kaldb/src/test/java/com/slack/kaldb/server/KaldbIndexerTest.java index 49d2c2008f..84ce7eb975 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/KaldbIndexerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/KaldbIndexerTest.java @@ -172,7 +172,7 @@ public void testIndexFreshConsumerKafkaSearchViaGrpcSearchApi() throws Exception new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(1000, "api_log"), + makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); @@ -216,7 +216,7 @@ public void testDeleteStaleSnapshotAndStartConsumerKafkaSearchViaGrpcSearchApi() new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(1000, "api_log"), + makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); @@ -276,7 +276,7 @@ public void testExceptionOnIndexerStartup() throws Exception { new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(1000, "api_log"), + makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); @@ -325,7 +325,7 @@ public void testWithMultipleLiveSnapshotsOnIndexerStart() throws Exception { new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(1000, "api_log"), + makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); @@ -388,7 +388,7 @@ public void testIndexerStartsWithPreviousOffset() throws Exception { new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(1000, "api_log"), + makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); @@ -453,7 +453,7 @@ public void testIndexerCreatesRecoveryTask() throws Exception { new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(50, "api_log"), + makeIndexerConfig(50), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); @@ -526,7 +526,7 @@ public void testIndexerShutdownTwice() throws Exception { new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(50, "api_log"), + makeIndexerConfig(50), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); @@ -603,7 +603,7 @@ public void testIndexerRestart() throws Exception { new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(1000, "api_log"), + makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); @@ -657,7 +657,7 @@ public void testIndexerRestart() throws Exception { new KaldbIndexer( chunkManagerUtil.chunkManager, curatorFramework, - makeIndexerConfig(1000, "api_log"), + makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); kaldbIndexer.startAsync(); diff --git a/kaldb/src/test/java/com/slack/kaldb/server/KaldbTest.java b/kaldb/src/test/java/com/slack/kaldb/server/KaldbTest.java index cfc287b5cc..e54e683f45 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/KaldbTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/KaldbTest.java @@ -181,7 +181,6 @@ private KaldbConfigs.KaldbConfig makeKaldbConfig( zkPathPrefix, nodeRole, maxOffsetDelay, - "api_log", recoveryPort, 100); } diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/KaldbConfigUtil.java b/kaldb/src/test/java/com/slack/kaldb/testlib/KaldbConfigUtil.java index 6d412e9685..fe2f50e3e9 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/KaldbConfigUtil.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/KaldbConfigUtil.java @@ -16,7 +16,6 @@ public static KaldbConfigs.KaldbConfig makeKaldbConfig( String metadataZkPathPrefix, KaldbConfigs.NodeRole nodeRole, int maxOffsetDelay, - String dataTransformerConfig, int recoveryPort, int maxMessagesPerChunk) { KaldbConfigs.KafkaConfig kafkaConfig = @@ -57,7 +56,6 @@ public static KaldbConfigs.KaldbConfig makeKaldbConfig( .build()) .setMaxChunksOnDisk(3) .setStaleDurationSecs(7200) - .setDataTransformer(dataTransformerConfig) .setMaxOffsetDelayMessages(maxOffsetDelay) .setDefaultQueryTimeoutMs(2500) .setKafkaConfig(kafkaConfig) @@ -122,16 +120,15 @@ public static KaldbConfigs.KafkaConfig makeKafkaConfig( public static int TEST_INDEXER_PORT = 10000; public static KaldbConfigs.IndexerConfig makeIndexerConfig() { - return makeIndexerConfig(TEST_INDEXER_PORT, 1000, "log_message", 100); + return makeIndexerConfig(TEST_INDEXER_PORT, 1000, 100); } - public static KaldbConfigs.IndexerConfig makeIndexerConfig( - int maxOffsetDelay, String dataTransformer) { - return makeIndexerConfig(TEST_INDEXER_PORT, maxOffsetDelay, dataTransformer, 100); + public static KaldbConfigs.IndexerConfig makeIndexerConfig(int maxOffsetDelay) { + return makeIndexerConfig(TEST_INDEXER_PORT, maxOffsetDelay, 100); } public static KaldbConfigs.IndexerConfig makeIndexerConfig( - int indexerPort, int maxOffsetDelay, String dataTransformer, int maxMessagesPerChunk) { + int indexerPort, int maxOffsetDelay, int maxMessagesPerChunk) { return KaldbConfigs.IndexerConfig.newBuilder() .setServerConfig( KaldbConfigs.ServerConfig.newBuilder() @@ -149,14 +146,12 @@ public static KaldbConfigs.IndexerConfig makeIndexerConfig( .setMaxChunksOnDisk(3) .setStaleDurationSecs(7200) .setMaxOffsetDelayMessages(maxOffsetDelay) - .setDataTransformer(dataTransformer) .build(); } public static KaldbConfigs.IndexerConfig makeIndexerConfig( int indexerPort, int maxOffsetDelay, - String dataTransformer, int maxMessagesPerChunk, int maxChunksOnDisk, long staleDuration) { @@ -177,7 +172,6 @@ public static KaldbConfigs.IndexerConfig makeIndexerConfig( .setMaxChunksOnDisk(maxChunksOnDisk) .setStaleDurationSecs(staleDuration) .setMaxOffsetDelayMessages(maxOffsetDelay) - .setDataTransformer(dataTransformer) .build(); } } diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/MessageUtil.java b/kaldb/src/test/java/com/slack/kaldb/testlib/MessageUtil.java index 0c6349ddc2..168cba7016 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/MessageUtil.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/MessageUtil.java @@ -1,16 +1,17 @@ package com.slack.kaldb.testlib; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.protobuf.ByteString; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.LogWireMessage; import com.slack.kaldb.util.JsonUtil; -import java.io.IOException; -import java.net.ServerSocket; +import com.slack.service.murron.trace.Trace; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -169,9 +170,5 @@ public static List makeMessagesWithTimeDifference( return result; } - // TODO: Move this to TestKafkaServer class. - public int getPort() throws IOException { - ServerSocket socket = new ServerSocket(0); - return socket.getLocalPort(); - } + } diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/SpanUtil.java b/kaldb/src/test/java/com/slack/kaldb/testlib/SpanUtil.java index 47783898b9..296b1a52b5 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/SpanUtil.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/SpanUtil.java @@ -3,8 +3,13 @@ import com.google.protobuf.ByteString; import com.slack.kaldb.logstore.LogMessage; import com.slack.service.murron.trace.Trace; + +import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.slack.kaldb.testlib.MessageUtil.*; public class SpanUtil { @@ -25,7 +30,7 @@ public static Trace.Span makeSpan( return spanBuilder.build(); } - public static Trace.Span.Builder makeSpanBuilder( + private static Trace.Span.Builder makeSpanBuilder( String traceId, String id, String parentId, @@ -106,4 +111,61 @@ public static Trace.Span.Builder makeSpanBuilder( spanBuilder.addAllTags(tags); return spanBuilder; } + + public static List makeSpansWithTimeDifference( + int low, int high, long timeDeltaMills, Instant start) { + List result = new ArrayList<>(); + for (int i = 0; i <= (high - low); i++) { + String id = DEFAULT_MESSAGE_PREFIX + (low + i); + + Instant timeStamp = start.plusNanos(1000 * 1000 * timeDeltaMills * i); + String message = String.format("The identifier in this message is %s", id); + + Trace.Span span = + Trace.Span.newBuilder() + .setTimestamp( + TimeUnit.MICROSECONDS.convert(timeStamp.toEpochMilli(), TimeUnit.MILLISECONDS)) + .setId(ByteString.copyFromUtf8(id)) + .addTags( + Trace.KeyValue.newBuilder() + .setVStr(message) + .setKey("message") + .setVType(Trace.ValueType.STRING) + .build()) + .addTags( + Trace.KeyValue.newBuilder() + .setVInt32((low + i)) + .setKey(TEST_SOURCE_INT_PROPERTY) + .setVType(Trace.ValueType.INT32) + .build()) + .addTags( + Trace.KeyValue.newBuilder() + .setVInt64((low + i)) + .setKey(TEST_SOURCE_LONG_PROPERTY) + .setVType(Trace.ValueType.INT64) + .build()) + .addTags( + Trace.KeyValue.newBuilder() + .setVFloat32((low + i)) + .setKey(TEST_SOURCE_FLOAT_PROPERTY) + .setVType(Trace.ValueType.FLOAT32) + .build()) + .addTags( + Trace.KeyValue.newBuilder() + .setVFloat64((low + i)) + .setKey(TEST_SOURCE_DOUBLE_PROPERTY) + .setVType(Trace.ValueType.FLOAT64) + .build()) + .addTags( + Trace.KeyValue.newBuilder() + .setVStr(String.format("String-%s", (low + i))) + .setKey(TEST_SOURCE_STRING_PROPERTY) + .setVType(Trace.ValueType.STRING) + .build()) + .build(); + + result.add(span); + } + return result; + } } diff --git a/kaldb/src/test/java/com/slack/kaldb/testlib/TestKafkaServer.java b/kaldb/src/test/java/com/slack/kaldb/testlib/TestKafkaServer.java index 748a8bad67..5420740341 100644 --- a/kaldb/src/test/java/com/slack/kaldb/testlib/TestKafkaServer.java +++ b/kaldb/src/test/java/com/slack/kaldb/testlib/TestKafkaServer.java @@ -2,14 +2,10 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.fasterxml.jackson.core.JsonProcessingException; import com.github.charithe.kafka.EphemeralKafkaBroker; import com.google.common.util.concurrent.Futures; -import com.google.protobuf.ByteString; -import com.slack.kaldb.logstore.LogMessage; -import com.slack.kaldb.util.JsonUtil; import com.slack.kaldb.writer.LogMessageWriterImpl; -import com.slack.service.murron.Murron; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import java.nio.file.Files; import java.nio.file.Path; @@ -39,34 +35,30 @@ public class TestKafkaServer { public static final String TEST_KAFKA_TOPIC = "test-topic"; - // Create messages, format them into murron protobufs, write them to kafka public static int produceMessagesToKafka( EphemeralKafkaBroker broker, Instant startTime, String kafkaTopic, int partitionId, int count) throws Exception { - List messages = - MessageUtil.makeMessagesWithTimeDifference(1, count, 1000, startTime); + List messages = + SpanUtil.makeSpansWithTimeDifference(1, count, 1000, startTime); return produceMessagesToKafka(broker, kafkaTopic, partitionId, messages); } public static int produceMessagesToKafka( - EphemeralKafkaBroker broker, String kafkaTopic, int partitionId, List messages) + EphemeralKafkaBroker broker, String kafkaTopic, int partitionId, List messages) throws Exception { int indexedCount = 0; // Insert messages into Kafka. try (KafkaProducer producer = broker.createProducer(new StringSerializer(), new ByteArraySerializer(), null)) { - for (LogMessage msg : messages) { + for (Trace.Span msg : messages) { // Kafka producer creates only a partition 0 on first message. So, set the partition to 0 // always. Future result = producer.send( new ProducerRecord<>( - kafkaTopic, - partitionId, - String.valueOf(indexedCount), - fromLogMessage(msg, indexedCount).toByteArray())); + kafkaTopic, partitionId, String.valueOf(indexedCount), msg.toByteArray())); RecordMetadata metadata = result.get(500L, TimeUnit.MILLISECONDS); assertThat(metadata).isNotNull(); @@ -91,19 +83,6 @@ public static int produceMessagesToKafka(EphemeralKafkaBroker broker, Instant st return produceMessagesToKafka(broker, startTime, TEST_KAFKA_TOPIC, 0); } - public static Murron.MurronMessage fromLogMessage(LogMessage message, int offset) - throws JsonProcessingException { - String jsonStr = JsonUtil.writeAsString(message.getSource()); - return Murron.MurronMessage.newBuilder() - .setTimestamp(message.getTimestamp().toEpochMilli() * 1000 * 1000) - .setType(MessageUtil.TEST_DATASET_NAME) - .setHost("localhost") - .setPid(100) - .setOffset(offset) - .setMessage(ByteString.copyFromUtf8(jsonStr)) - .build(); - } - private final EphemeralKafkaBroker broker; private final CompletableFuture brokerStart; private final AdminClient adminClient; diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java index d9e73e139d..84fc9291d4 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/LogMessageWriterImplTest.java @@ -13,7 +13,6 @@ import brave.Tracing; import com.adobe.testing.s3mock.junit5.S3MockExtension; -import com.google.protobuf.ByteString; import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; import com.slack.kaldb.chunkManager.IndexingChunkManager; import com.slack.kaldb.logstore.LogMessage; @@ -22,7 +21,6 @@ import com.slack.kaldb.logstore.search.aggregations.DateHistogramAggBuilder; import com.slack.kaldb.testlib.ChunkManagerUtil; import com.slack.kaldb.testlib.KaldbConfigUtil; -import com.slack.service.murron.Murron; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.io.IOException; @@ -97,11 +95,6 @@ private SearchResult searchChunkManager(String indexName, String que Duration.ofMillis(3000)); } - private static ConsumerRecord consumerRecordWithMurronMessage( - Murron.MurronMessage testMurronMsg) { - return consumerRecordWithValue(testMurronMsg.toByteString().toByteArray()); - } - private static ConsumerRecord consumerRecordWithValue(byte[] recordValue) { return new ConsumerRecord<>( "testTopic", 1, 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "testKey", recordValue); @@ -109,31 +102,11 @@ private static ConsumerRecord consumerRecordWithValue(byte[] rec @Test public void insertNullRecord() throws IOException { - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(null)).isFalse(); } - @Test - public void testMalformedMurronApiRecord() throws IOException { - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); - - ConsumerRecord apiRecord = - consumerRecordWithMurronMessage( - Murron.MurronMessage.newBuilder() - .setMessage(ByteString.copyFromUtf8("malformedMurronMessage")) - .setType("testIndex") - .setHost("testHost") - .setTimestamp(1612550512340953000L) - .build()); - - assertThat(messageWriter.insertRecord(apiRecord)).isFalse(); - } - // TODO: Add a unit test where message fails to index. Can't do it now since the field conflict // policy is hard-coded. @@ -174,8 +147,7 @@ public void testAvgMessageSizeCalculationOnSpanIngestion() throws Exception { .collect(Collectors.toList()); IndexingChunkManager chunkManager = localChunkManagerUtil.chunkManager; - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl(chunkManager, LogMessageWriterImpl.traceSpanTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManager); for (Trace.Span span : spans) { ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); @@ -208,45 +180,6 @@ public void testAvgMessageSizeCalculationOnSpanIngestion() throws Exception { .isEqualTo(15); } - @Test - public void testUseIncorrectDataTransformer() throws IOException { - // Data Prep: Span -> ListOfSpans -> MurronMessage -> ConsumerReord - final String traceId = "t1"; - final String id = "i2"; - final String parentId = "p2"; - final long timestampMicros = 1612550512340953L; - final long durationMicros = 500000L; - final String serviceName = "testService"; - final String name = "testSpanName"; - - final Trace.Span span = - makeSpan( - traceId, - id, - parentId, - timestampMicros, - durationMicros, - name, - serviceName, - SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE); - - Murron.MurronMessage testMurronMsg = - Murron.MurronMessage.newBuilder() - .setMessage( - Trace.ListOfSpans.newBuilder().addAllSpans(List.of(span)).build().toByteString()) - .setType("test") - .setHost("testHost") - .setTimestamp(timestampMicros) - .build(); - ConsumerRecord spanRecord = consumerRecordWithMurronMessage(testMurronMsg); - - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); - - assertThat(messageWriter.insertRecord(spanRecord)).isFalse(); - } - @Test public void testIngestTraceSpan() throws IOException { final String traceId = "t1"; @@ -269,9 +202,7 @@ public void testIngestTraceSpan() throws IOException { msgType); ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.traceSpanTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(1); @@ -324,9 +255,7 @@ public void parseAndIndexBulkApiRequestTest() throws IOException { IngestDocument ingestDocument = convertRequestToDocument(indexRequest); Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument); ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.traceSpanTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); } @@ -359,9 +288,7 @@ public void parseAndIndexBulkApiRequestTest() throws IOException { @Test public void testNullTraceSpan() throws IOException { - LogMessageWriterImpl messageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.traceSpanTransformer); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); assertThat(messageWriter.insertRecord(null)).isFalse(); } diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java index 2185beb1bf..ee1656b687 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java @@ -2,7 +2,6 @@ import static com.slack.kaldb.chunkManager.RecoveryChunkManager.LIVE_MESSAGES_INDEXED; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.server.ValidateKaldbConfig.INDEXER_DATA_TRANSFORMER_MAP; import static com.slack.kaldb.testlib.ChunkManagerUtil.makeChunkManagerUtil; import static com.slack.kaldb.testlib.MetricsUtil.getCount; import static com.slack.kaldb.testlib.MetricsUtil.getValue; @@ -93,8 +92,7 @@ public void setUp() throws Exception { chunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); LogMessageWriterImpl logMessageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); + new LogMessageWriterImpl(chunkManagerUtil.chunkManager); KaldbConfigs.KafkaConfig kafkaConfig = KaldbConfigs.KafkaConfig.newBuilder() .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC) @@ -302,9 +300,7 @@ public void setUp() throws Exception { chunkManagerUtil.chunkManager.startAsync(); chunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); - logMessageWriter = - new LogMessageWriterImpl( - chunkManagerUtil.chunkManager, INDEXER_DATA_TRANSFORMER_MAP.get("spans")); + logMessageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); } @AfterEach @@ -406,8 +402,7 @@ public static TestKafkaServer.KafkaComponents getKafkaTestServer(S3MockExtension localChunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); LogMessageWriterImpl logMessageWriter = - new LogMessageWriterImpl( - localChunkManagerUtil.chunkManager, LogMessageWriterImpl.apiLogTransformer); + new LogMessageWriterImpl(localChunkManagerUtil.chunkManager); AdminClient adminClient = AdminClient.create( diff --git a/kaldb/src/test/resources/test_config.json b/kaldb/src/test/resources/test_config.json index 74ec5e6ce7..109a765160 100644 --- a/kaldb/src/test/resources/test_config.json +++ b/kaldb/src/test/resources/test_config.json @@ -28,7 +28,6 @@ "refreshDurationSecs": 11, "enableFullTextSearch": true }, - "dataTransformer": "api_log", "dataDirectory": "/tmp", "maxOffsetDelayMessages" : 10002, "serverConfig": { diff --git a/kaldb/src/test/resources/test_config.yaml b/kaldb/src/test/resources/test_config.yaml index 8c179af084..8d6e74b0c9 100644 --- a/kaldb/src/test/resources/test_config.yaml +++ b/kaldb/src/test/resources/test_config.yaml @@ -8,7 +8,6 @@ indexerConfig: refreshDurationSecs: 11 enableFullTextSearch: true staleDurationSecs: 7200 - dataTransformer: "api_log" dataDirectory: "/tmp" maxOffsetDelayMessages: 10001 defaultQueryTimeoutMs: 1500