Skip to content

Commit

Permalink
Remove LogMessage and use Trace.Span as the carrier object for documents
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Feb 29, 2024
1 parent d670e44 commit 00149e2
Show file tree
Hide file tree
Showing 38 changed files with 11,468 additions and 11,363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class IndexingChunkImpl<T> extends ReadWriteChunk<T> {
private static final Logger LOG = LoggerFactory.getLogger(IndexingChunkImpl.class);

public IndexingChunkImpl(
LogStore<T> logStore,
LogStore logStore,
String chunkDataPrefix,
MeterRegistry meterRegistry,
SearchMetadataStore searchMetadataStore,
Expand Down
22 changes: 11 additions & 11 deletions kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.slack.kaldb.blobfs.BlobFs;
import com.slack.kaldb.logstore.LogMessage;
import com.slack.kaldb.logstore.LogStore;
import com.slack.kaldb.logstore.LuceneIndexStoreImpl;
import com.slack.kaldb.logstore.search.LogIndexSearcher;
Expand All @@ -19,6 +18,7 @@
import com.slack.kaldb.metadata.search.SearchMetadataStore;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadata;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
Expand All @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.lucene.index.IndexCommit;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,7 +71,7 @@ public abstract class ReadWriteChunk<T> implements Chunk<T> {
public static final String LIVE_SNAPSHOT_PREFIX = SnapshotMetadata.LIVE_SNAPSHOT_PATH + "_";
public static final String SCHEMA_FILE_NAME = "schema.json";

private final LogStore<T> logStore;
private final LogStore logStore;
private final String kafkaPartitionId;
private final Logger logger;
private LogIndexSearcher<T> logSearcher;
Expand All @@ -89,7 +90,7 @@ public abstract class ReadWriteChunk<T> implements Chunk<T> {
private boolean readOnly;

protected ReadWriteChunk(
LogStore<T> logStore,
LogStore logStore,
String chunkDataPrefix,
MeterRegistry meterRegistry,
SearchMetadataStore searchMetadataStore,
Expand Down Expand Up @@ -141,7 +142,7 @@ public static SearchMetadata toSearchMetadata(String snapshotName, SearchContext
}

/** Index the message in the logstore and update the chunk data time range. */
public void addMessage(T message, String kafkaPartitionId, long offset) {
public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) {
if (!this.kafkaPartitionId.equals(kafkaPartitionId)) {
throw new IllegalArgumentException(
"All messages for this chunk should belong to partition: "
Expand All @@ -151,13 +152,12 @@ public void addMessage(T message, String kafkaPartitionId, long offset) {
}
if (!readOnly) {
logStore.addMessage(message);
// Update the chunk with the time range of the data in the chunk.
// TODO: This type conversion is a temporary hack, fix it by adding timestamp field to the
// message.
if (message instanceof LogMessage) {
chunkInfo.updateDataTimeRange(((LogMessage) message).getTimestamp().toEpochMilli());
chunkInfo.updateMaxOffset(offset);
}

chunkInfo.updateDataTimeRange(
TimeUnit.MILLISECONDS.convert(message.getTimestamp(), TimeUnit.MICROSECONDS));
// if we do this i.e also validate the timestamp tests
// that use dates from 2020 start failing so not touching this logic for now
// chunkInfo.updateDataTimeRange(SpanFormatter.getTimestampFromSpan(message).toEpochMilli());
} else {
throw new IllegalStateException(String.format("Chunk %s is read only", chunkInfo));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ public ReadWriteChunk<T> makeChunk() throws IOException {
ensureNonNullString(kafkaPartitionId, "kafkaPartitionId can't be null and should be set.");
ensureNonNullString(indexerConfig.getDataDirectory(), "The data directory shouldn't be empty");
final File dataDirectory = new File(indexerConfig.getDataDirectory());
LogStore<T> logStore =
(LogStore<T>)
LuceneIndexStoreImpl.makeLogStore(
dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry);
LogStore logStore =
LuceneIndexStoreImpl.makeLogStore(
dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry);

return new RecoveryChunkImpl<>(
logStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class RecoveryChunkImpl<T> extends ReadWriteChunk<T> {
private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyChunkImpl.class);

public RecoveryChunkImpl(
LogStore<T> logStore,
LogStore logStore,
String chunkDataPrefix,
MeterRegistry meterRegistry,
SearchMetadataStore searchMetadataStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.slack.kaldb.metadata.search.SearchMetadataStore;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import org.apache.curator.x.async.AsyncCuratorFramework;
Expand Down Expand Up @@ -120,7 +121,7 @@ public static CachingChunkManager<LogMessage> fromConfig(
}

@Override
public void addMessage(T message, long msgSize, String kafkaPartitionId, long offset)
public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
throws IOException {
throw new UnsupportedOperationException(
"Adding messages is not supported on a caching chunk manager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import com.slack.kaldb.logstore.search.SearchQuery;
import com.slack.kaldb.logstore.search.SearchResult;
import com.slack.kaldb.metadata.schema.FieldType;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;

public interface ChunkManager<T> {
void addMessage(T message, long msgSize, String kafkaPartitionId, long offset) throws IOException;
void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
throws IOException;

SearchResult<T> query(SearchQuery query, Duration queryTimeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.slack.kaldb.metadata.search.SearchMetadataStore;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -166,7 +167,8 @@ public IndexingChunkManager(
* <p>TODO: Indexer should stop cleanly if the roll over fails or an exception.
*/
@Override
public void addMessage(final T message, long msgSize, String kafkaPartitionId, long offset)
public void addMessage(
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
throws IOException {
if (stopIngestion) {
// Currently, this flag is set on only a chunkRollOverException.
Expand Down Expand Up @@ -258,10 +260,9 @@ private ReadWriteChunk<T> getOrCreateActiveChunk(
String kafkaPartitionId, KaldbConfigs.IndexerConfig indexerConfig) throws IOException {
if (activeChunk == null) {
@SuppressWarnings("unchecked")
LogStore<T> logStore =
(LogStore<T>)
LuceneIndexStoreImpl.makeLogStore(
dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry);
LogStore logStore =
LuceneIndexStoreImpl.makeLogStore(
dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry);

chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.slack.kaldb.metadata.search.SearchMetadataStore;
import com.slack.kaldb.metadata.snapshot.SnapshotMetadataStore;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -78,7 +79,8 @@ public RecoveryChunkManager(
}

@Override
public void addMessage(final T message, long msgSize, String kafkaPartitionId, long offset)
public void addMessage(
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
throws IOException {
if (readOnly) {
LOG.warn("Ingestion is stopped since the chunk is in read only mode.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.slack.kaldb.logstore;

import com.slack.kaldb.metadata.schema.LuceneFieldDef;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.document.Document;
Expand All @@ -9,8 +10,8 @@
* DocumentBuilder defines the interfaces for classes that generate Lucene documents out of
* messages.
*/
public interface DocumentBuilder<T> {
Document fromMessage(T message) throws IOException;
public interface DocumentBuilder {
Document fromMessage(Trace.Span message) throws IOException;

ConcurrentHashMap<String, LuceneFieldDef> getSchema();
}
5 changes: 3 additions & 2 deletions kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.slack.kaldb.logstore;

import com.slack.kaldb.metadata.schema.LuceneFieldDef;
import com.slack.service.murron.trace.Trace;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -10,8 +11,8 @@
import org.apache.lucene.store.FSDirectory;

/* An interface that implements a read and write interface for the LogStore */
public interface LogStore<T> extends Closeable {
void addMessage(T message);
public interface LogStore extends Closeable {
void addMessage(Trace.Span message);

// TODO: Instead of exposing the searcherManager, consider returning an instance of the searcher.
SearcherManager getSearcherManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.slack.kaldb.metadata.schema.LuceneFieldDef;
import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.util.RuntimeHalterImpl;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.File;
Expand Down Expand Up @@ -39,7 +40,7 @@
*
* TODO: Each index store has a unique id that is used to as a suffix/prefix in files associated with this store?
*/
public class LuceneIndexStoreImpl implements LogStore<LogMessage> {
public class LuceneIndexStoreImpl implements LogStore {

private final String id = UUID.randomUUID().toString();

Expand All @@ -52,7 +53,7 @@ public class LuceneIndexStoreImpl implements LogStore<LogMessage> {
public static final String FINAL_MERGES_TIMER = "kaldb_index_final_merges";

private final SearcherManager searcherManager;
private final DocumentBuilder<LogMessage> documentBuilder;
private final DocumentBuilder documentBuilder;
private final FSDirectory indexDirectory;
private final Timer timer;
private final SnapshotDeletionPolicy snapshotDeletionPolicy;
Expand Down Expand Up @@ -108,9 +109,7 @@ public static LuceneIndexStoreImpl makeLogStore(
}

public LuceneIndexStoreImpl(
LuceneIndexStoreConfig config,
DocumentBuilder<LogMessage> documentBuilder,
MeterRegistry registry)
LuceneIndexStoreConfig config, DocumentBuilder documentBuilder, MeterRegistry registry)
throws IOException {

this.documentBuilder = documentBuilder;
Expand Down Expand Up @@ -254,7 +253,7 @@ private void handleNonFatal(Throwable ex) {
}

@Override
public void addMessage(LogMessage message) {
public void addMessage(Trace.Span message) {
try {
messagesReceivedCounter.increment();
if (indexWriter.isPresent()) {
Expand Down
Loading

0 comments on commit 00149e2

Please sign in to comment.