diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/IndexingChunkImpl.java b/kaldb/src/main/java/com/slack/kaldb/chunk/IndexingChunkImpl.java index edf12af0ec..70a7f1ba0a 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/IndexingChunkImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/IndexingChunkImpl.java @@ -41,7 +41,7 @@ public class IndexingChunkImpl extends ReadWriteChunk { private static final Logger LOG = LoggerFactory.getLogger(IndexingChunkImpl.class); public IndexingChunkImpl( - LogStore logStore, + LogStore logStore, String chunkDataPrefix, MeterRegistry meterRegistry, SearchMetadataStore searchMetadataStore, diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java index 79adfe31be..edbfb997b1 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java @@ -6,7 +6,6 @@ import com.google.common.annotations.VisibleForTesting; import com.slack.kaldb.blobfs.BlobFs; -import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.logstore.LogStore; import com.slack.kaldb.logstore.LuceneIndexStoreImpl; import com.slack.kaldb.logstore.search.LogIndexSearcher; @@ -19,6 +18,7 @@ import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadata; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.lucene.index.IndexCommit; import org.slf4j.Logger; @@ -70,7 +71,7 @@ public abstract class ReadWriteChunk implements Chunk { public static final String LIVE_SNAPSHOT_PREFIX = SnapshotMetadata.LIVE_SNAPSHOT_PATH + "_"; public static final String SCHEMA_FILE_NAME = "schema.json"; - private final LogStore logStore; + private final LogStore logStore; private final String kafkaPartitionId; private final Logger logger; private LogIndexSearcher logSearcher; @@ -89,7 +90,7 @@ public abstract class ReadWriteChunk implements Chunk { private boolean readOnly; protected ReadWriteChunk( - LogStore logStore, + LogStore logStore, String chunkDataPrefix, MeterRegistry meterRegistry, SearchMetadataStore searchMetadataStore, @@ -141,7 +142,7 @@ public static SearchMetadata toSearchMetadata(String snapshotName, SearchContext } /** Index the message in the logstore and update the chunk data time range. */ - public void addMessage(T message, String kafkaPartitionId, long offset) { + public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) { if (!this.kafkaPartitionId.equals(kafkaPartitionId)) { throw new IllegalArgumentException( "All messages for this chunk should belong to partition: " @@ -151,13 +152,12 @@ public void addMessage(T message, String kafkaPartitionId, long offset) { } if (!readOnly) { logStore.addMessage(message); - // Update the chunk with the time range of the data in the chunk. - // TODO: This type conversion is a temporary hack, fix it by adding timestamp field to the - // message. - if (message instanceof LogMessage) { - chunkInfo.updateDataTimeRange(((LogMessage) message).getTimestamp().toEpochMilli()); - chunkInfo.updateMaxOffset(offset); - } + + chunkInfo.updateDataTimeRange( + TimeUnit.MILLISECONDS.convert(message.getTimestamp(), TimeUnit.MICROSECONDS)); + // if we do this i.e also validate the timestamp tests + // that use dates from 2020 start failing so not touching this logic for now + // chunkInfo.updateDataTimeRange(SpanFormatter.getTimestampFromSpan(message).toEpochMilli()); } else { throw new IllegalStateException(String.format("Chunk %s is read only", chunkInfo)); } diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/RecoveryChunkFactoryImpl.java b/kaldb/src/main/java/com/slack/kaldb/chunk/RecoveryChunkFactoryImpl.java index c495f256c8..3718a3eb20 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/RecoveryChunkFactoryImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/RecoveryChunkFactoryImpl.java @@ -49,10 +49,9 @@ public ReadWriteChunk makeChunk() throws IOException { ensureNonNullString(kafkaPartitionId, "kafkaPartitionId can't be null and should be set."); ensureNonNullString(indexerConfig.getDataDirectory(), "The data directory shouldn't be empty"); final File dataDirectory = new File(indexerConfig.getDataDirectory()); - LogStore logStore = - (LogStore) - LuceneIndexStoreImpl.makeLogStore( - dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry); + LogStore logStore = + LuceneIndexStoreImpl.makeLogStore( + dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry); return new RecoveryChunkImpl<>( logStore, diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/RecoveryChunkImpl.java b/kaldb/src/main/java/com/slack/kaldb/chunk/RecoveryChunkImpl.java index 6a525a07df..544ef41b74 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/RecoveryChunkImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/RecoveryChunkImpl.java @@ -23,7 +23,7 @@ public class RecoveryChunkImpl extends ReadWriteChunk { private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyChunkImpl.class); public RecoveryChunkImpl( - LogStore logStore, + LogStore logStore, String chunkDataPrefix, MeterRegistry meterRegistry, SearchMetadataStore searchMetadataStore, diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java index edb60ce143..d898dbee8f 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/CachingChunkManager.java @@ -9,6 +9,7 @@ import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import java.io.IOException; import org.apache.curator.x.async.AsyncCuratorFramework; @@ -120,7 +121,7 @@ public static CachingChunkManager fromConfig( } @Override - public void addMessage(T message, long msgSize, String kafkaPartitionId, long offset) + public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset) throws IOException { throw new UnsupportedOperationException( "Adding messages is not supported on a caching chunk manager"); diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManager.java index 68f8019bce..75751ad375 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/ChunkManager.java @@ -3,12 +3,14 @@ import com.slack.kaldb.logstore.search.SearchQuery; import com.slack.kaldb.logstore.search.SearchResult; import com.slack.kaldb.metadata.schema.FieldType; +import com.slack.service.murron.trace.Trace; import java.io.IOException; import java.time.Duration; import java.util.Map; public interface ChunkManager { - void addMessage(T message, long msgSize, String kafkaPartitionId, long offset) throws IOException; + void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset) + throws IOException; SearchResult query(SearchQuery query, Duration queryTimeout); diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java index 7756cd5829..fee5686722 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java @@ -25,6 +25,7 @@ import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import java.io.File; import java.io.IOException; @@ -166,7 +167,8 @@ public IndexingChunkManager( *

TODO: Indexer should stop cleanly if the roll over fails or an exception. */ @Override - public void addMessage(final T message, long msgSize, String kafkaPartitionId, long offset) + public void addMessage( + final Trace.Span message, long msgSize, String kafkaPartitionId, long offset) throws IOException { if (stopIngestion) { // Currently, this flag is set on only a chunkRollOverException. @@ -258,10 +260,9 @@ private ReadWriteChunk getOrCreateActiveChunk( String kafkaPartitionId, KaldbConfigs.IndexerConfig indexerConfig) throws IOException { if (activeChunk == null) { @SuppressWarnings("unchecked") - LogStore logStore = - (LogStore) - LuceneIndexStoreImpl.makeLogStore( - dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry); + LogStore logStore = + LuceneIndexStoreImpl.makeLogStore( + dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry); chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory()); diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/RecoveryChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/RecoveryChunkManager.java index f7fc03fc30..e8819fe8c1 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/RecoveryChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/RecoveryChunkManager.java @@ -19,6 +19,7 @@ import com.slack.kaldb.metadata.search.SearchMetadataStore; import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import java.io.IOException; import java.time.Instant; @@ -78,7 +79,8 @@ public RecoveryChunkManager( } @Override - public void addMessage(final T message, long msgSize, String kafkaPartitionId, long offset) + public void addMessage( + final Trace.Span message, long msgSize, String kafkaPartitionId, long offset) throws IOException { if (readOnly) { LOG.warn("Ingestion is stopped since the chunk is in read only mode."); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/DocumentBuilder.java b/kaldb/src/main/java/com/slack/kaldb/logstore/DocumentBuilder.java index 8b1d262598..621a43a794 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/DocumentBuilder.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/DocumentBuilder.java @@ -1,6 +1,7 @@ package com.slack.kaldb.logstore; import com.slack.kaldb.metadata.schema.LuceneFieldDef; +import com.slack.service.murron.trace.Trace; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import org.apache.lucene.document.Document; @@ -9,8 +10,8 @@ * DocumentBuilder defines the interfaces for classes that generate Lucene documents out of * messages. */ -public interface DocumentBuilder { - Document fromMessage(T message) throws IOException; +public interface DocumentBuilder { + Document fromMessage(Trace.Span message) throws IOException; ConcurrentHashMap getSchema(); } diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java index af86bcfc10..5bbf3644f7 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java @@ -1,6 +1,7 @@ package com.slack.kaldb.logstore; import com.slack.kaldb.metadata.schema.LuceneFieldDef; +import com.slack.service.murron.trace.Trace; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @@ -10,8 +11,8 @@ import org.apache.lucene.store.FSDirectory; /* An interface that implements a read and write interface for the LogStore */ -public interface LogStore extends Closeable { - void addMessage(T message); +public interface LogStore extends Closeable { + void addMessage(Trace.Span message); // TODO: Instead of exposing the searcherManager, consider returning an instance of the searcher. SearcherManager getSearcherManager(); diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index f60d8a93b6..68e53743bb 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -4,6 +4,7 @@ import com.slack.kaldb.metadata.schema.LuceneFieldDef; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.util.RuntimeHalterImpl; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; import java.io.File; @@ -39,7 +40,7 @@ * * TODO: Each index store has a unique id that is used to as a suffix/prefix in files associated with this store? */ -public class LuceneIndexStoreImpl implements LogStore { +public class LuceneIndexStoreImpl implements LogStore { private final String id = UUID.randomUUID().toString(); @@ -52,7 +53,7 @@ public class LuceneIndexStoreImpl implements LogStore { public static final String FINAL_MERGES_TIMER = "kaldb_index_final_merges"; private final SearcherManager searcherManager; - private final DocumentBuilder documentBuilder; + private final DocumentBuilder documentBuilder; private final FSDirectory indexDirectory; private final Timer timer; private final SnapshotDeletionPolicy snapshotDeletionPolicy; @@ -108,9 +109,7 @@ public static LuceneIndexStoreImpl makeLogStore( } public LuceneIndexStoreImpl( - LuceneIndexStoreConfig config, - DocumentBuilder documentBuilder, - MeterRegistry registry) + LuceneIndexStoreConfig config, DocumentBuilder documentBuilder, MeterRegistry registry) throws IOException { this.documentBuilder = documentBuilder; @@ -254,7 +253,7 @@ private void handleNonFatal(Throwable ex) { } @Override - public void addMessage(LogMessage message) { + public void addMessage(Trace.Span message) { try { messagesReceivedCounter.increment(); if (indexWriter.isPresent()) { diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java index d98dd18b2e..e26a7289dc 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java @@ -1,19 +1,30 @@ package com.slack.kaldb.logstore.schema; +import static com.slack.kaldb.writer.SpanFormatter.DEFAULT_INDEX_NAME; +import static com.slack.kaldb.writer.SpanFormatter.DEFAULT_LOG_MESSAGE_TYPE; + import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.slack.kaldb.logstore.DocumentBuilder; import com.slack.kaldb.logstore.FieldDefMismatchException; import com.slack.kaldb.logstore.LogMessage; +import com.slack.kaldb.logstore.LogWireMessage; import com.slack.kaldb.metadata.schema.FieldType; import com.slack.kaldb.metadata.schema.LuceneFieldDef; import com.slack.kaldb.util.JsonUtil; +import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.logging.log4j.util.Strings; import org.apache.lucene.document.Document; import org.slf4j.Logger; @@ -31,7 +42,7 @@ * rarely an issue and helps with performance. If this is an issue, we need to scan the json twice * to ensure document is good to index. */ -public class SchemaAwareLogDocumentBuilderImpl implements DocumentBuilder { +public class SchemaAwareLogDocumentBuilderImpl implements DocumentBuilder { private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareLogDocumentBuilderImpl.class); @@ -347,31 +358,148 @@ public static SchemaAwareLogDocumentBuilderImpl build( } @Override - public Document fromMessage(LogMessage message) throws JsonProcessingException { + public Document fromMessage(Trace.Span message) throws JsonProcessingException { Document doc = new Document(); - addField(doc, LogMessage.SystemField.INDEX.fieldName, message.getIndex(), "", 0); + + // today we rely on source to construct the document at search time so need to keep in + // consistent for now + Map jsonMap = new HashMap<>(); + if (!message.getParentId().isEmpty()) { + jsonMap.put( + LogMessage.ReservedField.PARENT_ID.fieldName, message.getParentId().toStringUtf8()); + addField( + doc, + LogMessage.ReservedField.PARENT_ID.fieldName, + message.getParentId().toStringUtf8(), + "", + 0); + } + if (!message.getTraceId().isEmpty()) { + jsonMap.put(LogMessage.ReservedField.TRACE_ID.fieldName, message.getTraceId().toStringUtf8()); + addField( + doc, + LogMessage.ReservedField.TRACE_ID.fieldName, + message.getTraceId().toStringUtf8(), + "", + 0); + } + if (!message.getName().isEmpty()) { + jsonMap.put(LogMessage.ReservedField.NAME.fieldName, message.getName()); + addField(doc, LogMessage.ReservedField.NAME.fieldName, message.getName(), "", 0); + } + if (message.getDuration() != 0) { + jsonMap.put( + LogMessage.ReservedField.DURATION_MS.fieldName, + Duration.of(message.getDuration(), ChronoUnit.MICROS).toMillis()); + addField( + doc, + LogMessage.ReservedField.DURATION_MS.fieldName, + Duration.of(message.getDuration(), ChronoUnit.MICROS).toMillis(), + "", + 0); + } + if (!message.getId().isEmpty()) { + jsonMap.put(LogMessage.SystemField.ID.fieldName, message.getId().toStringUtf8()); + addField(doc, LogMessage.SystemField.ID.fieldName, message.getId().toStringUtf8(), "", 0); + } else { + throw new IllegalArgumentException("Span id is empty"); + } + + // TODO: this interferes in tests + // Instant timestamp = SpanFormatter.getTimestampFromSpan(message); + Instant timestamp = + Instant.ofEpochMilli( + TimeUnit.MILLISECONDS.convert(message.getTimestamp(), TimeUnit.MICROSECONDS)); addField( - doc, - LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, - message.getTimestamp().toEpochMilli(), - "", - 0); - addField(doc, LogMessage.ReservedField.TYPE.fieldName, message.getType(), "", 0); - addField(doc, LogMessage.SystemField.ID.fieldName, message.getId(), "", 0); - - final String msgString = JsonUtil.writeAsString(message.toWireMessage()); + doc, LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toEpochMilli(), "", 0); + + Map tags = + message.getTagsList().stream() + .map(keyValue -> Map.entry(keyValue.getKey(), keyValue)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // TODO: this should just be top level Trace.Span fields. This is error prone - what if type is + // not a string? + String indexName = + tags.containsKey(LogMessage.ReservedField.SERVICE_NAME.fieldName) + ? tags.get(LogMessage.ReservedField.SERVICE_NAME.fieldName).getVStr() + : DEFAULT_INDEX_NAME; + String msgType = + tags.containsKey(LogMessage.ReservedField.TYPE.fieldName) + ? tags.get(LogMessage.ReservedField.TYPE.fieldName).getVStr() + : DEFAULT_LOG_MESSAGE_TYPE; + + jsonMap.put(LogMessage.ReservedField.TYPE.fieldName, msgType); + addField(doc, LogMessage.ReservedField.TYPE.fieldName, msgType, "", 0); + + jsonMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName); + addField(doc, LogMessage.SystemField.INDEX.fieldName, indexName, "", 0); + addField(doc, LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName, "", 0); + + tags.remove(LogMessage.ReservedField.SERVICE_NAME.fieldName); + tags.remove(LogMessage.ReservedField.TYPE.fieldName); + + for (Trace.KeyValue keyValue : tags.values()) { + if (keyValue.getVType() == Trace.ValueType.STRING) { + addField(doc, keyValue.getKey(), keyValue.getVStr(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVStr()); + } else if (keyValue.getVType() == Trace.ValueType.BOOL) { + addField(doc, keyValue.getKey(), keyValue.getVBool(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVBool()); + } else if (keyValue.getVType() == Trace.ValueType.INT64) { + addField(doc, keyValue.getKey(), keyValue.getVInt64(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVInt64()); + } else if (keyValue.getVType() == Trace.ValueType.FLOAT64) { + addField(doc, keyValue.getKey(), keyValue.getVFloat64(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVFloat64()); + } else if (keyValue.getVType() == Trace.ValueType.BINARY) { + addField(doc, keyValue.getKey(), keyValue.getVBinary().toStringUtf8(), "", 0); + jsonMap.put(keyValue.getKey(), keyValue.getVBinary().toStringUtf8()); + } else { + LOG.warn( + "Skipping field with unknown value type {} with key {}", + keyValue.getVType(), + keyValue.getKey()); + } + } + + LogWireMessage logWireMessage = + new LogWireMessage(indexName, msgType, message.getId().toStringUtf8(), timestamp, jsonMap); + final String msgString = JsonUtil.writeAsString(logWireMessage); addField(doc, LogMessage.SystemField.SOURCE.fieldName, msgString, "", 0); if (enableFullTextSearch) { addField(doc, LogMessage.SystemField.ALL.fieldName, msgString, "", 0); } - for (String key : message.getSource().keySet()) { - addField(doc, key, message.getSource().get(key), "", 0); - } - LOG.trace("Lucene document {} for message {}", doc, message); return doc; } + // @Override + // public Document fromMessage(LogMessage message) throws JsonProcessingException { + // Document doc = new Document(); + // addField(doc, LogMessage.SystemField.INDEX.fieldName, message.getIndex(), "", 0); + // addField( + // doc, + // LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, + // message.getTimestamp().toEpochMilli(), + // "", + // 0); + // addField(doc, LogMessage.ReservedField.TYPE.fieldName, message.getType(), "", 0); + // addField(doc, LogMessage.SystemField.ID.fieldName, message.getId(), "", 0); + // + // final String msgString = JsonUtil.writeAsString(message.toWireMessage()); + // addField(doc, LogMessage.SystemField.SOURCE.fieldName, msgString, "", 0); + // if (enableFullTextSearch) { + // addField(doc, LogMessage.SystemField.ALL.fieldName, msgString, "", 0); + // } + // + // for (String key : message.getSource().keySet()) { + // addField(doc, key, message.getSource().get(key), "", 0); + // } + // LOG.trace("Lucene document {} for message {}", doc, message); + // return doc; + // } + @Override public ConcurrentHashMap getSchema() { return fieldDefMap; diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java b/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java index b468bc3aca..a87940d3ed 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/LogMessageWriterImpl.java @@ -4,10 +4,7 @@ import com.slack.kaldb.logstore.LogMessage; import com.slack.service.murron.trace.Trace; import java.io.IOException; -import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A LogMessageWriter ingests ConsumerRecords into a ChunkManager. @@ -47,7 +44,6 @@ *

TODO: In future, implement MessageWriter interfacce on ChunkManager. */ public class LogMessageWriterImpl implements MessageWriter { - private static final Logger LOG = LoggerFactory.getLogger(LogMessageWriterImpl.class); private final ChunkManager chunkManager; @@ -59,28 +55,15 @@ public LogMessageWriterImpl(ChunkManager chunkManager) { public boolean insertRecord(ConsumerRecord record) throws IOException { if (record == null) return false; - final List logMessages; - try { - final Trace.Span span = Trace.Span.parseFrom(record.value()); - final Trace.ListOfSpans listOfSpans = Trace.ListOfSpans.newBuilder().addSpans(span).build(); - logMessages = SpanFormatter.toLogMessage(listOfSpans); - // Ideally, we should return true when logMessages are empty. But, fail the record, since we - // don't expect any empty records or we may have a bug in earlier code. - if (logMessages.isEmpty()) return false; - } catch (Exception e) { - LOG.warn("Parsing consumer record: {} failed with an exception.", record, e); - return false; - } - - final int avgMsgSize = record.serializedValueSize() / logMessages.size(); - for (LogMessage logMessage : logMessages) { - // Currently, ChunkManager.addMessage increments a failure counter to indicate an ingestion - // error. We decided to throw the exception to a higher level since in a batch ingestion - // the upper layers of the stack can't take any further action. If this becomes an issue - // in future, propagate the exception upwards here or return a value. - chunkManager.addMessage( - logMessage, avgMsgSize, String.valueOf(record.partition()), record.offset()); - } + // Currently, ChunkManager.addMessage increments a failure counter to indicate an ingestion + // error. We decided to throw the exception to a higher level since in a batch ingestion + // the upper layers of the stack can't take any further action. If this becomes an issue + // in future, propagate the exception upwards here or return a value. + chunkManager.addMessage( + Trace.Span.parseFrom(record.value()), + record.serializedValueSize(), + String.valueOf(record.partition()), + record.offset()); return true; } }