diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java index d7a47979c7..2a6a38c57d 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java @@ -220,7 +220,7 @@ public boolean snapshotToS3(String bucket, String prefix, BlobFs blobFs) { IndexCommit indexCommit = null; try { - Path dirPath = logStore.getDirectory().toAbsolutePath(); + Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath(); // Create schema file to upload ChunkSchema chunkSchema = diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java index 27cf8c656f..0270979e82 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java @@ -256,7 +256,7 @@ private ReadWriteChunk getOrCreateActiveChunk( LuceneIndexStoreImpl.makeLogStore( dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry); - chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory().toFile()); + chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory()); ReadWriteChunk newChunk = new IndexingChunkImpl<>( diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/ChunkRollOverStrategy.java b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/ChunkRollOverStrategy.java index c2ad00ca1f..0a7d71a904 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/ChunkRollOverStrategy.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/ChunkRollOverStrategy.java @@ -1,13 +1,13 @@ package com.slack.kaldb.chunkrollover; -import java.io.File; +import org.apache.lucene.store.FSDirectory; // TODO: ChunkRollOverStrategy should take a chunk as an input and get statistics // like message count, size etc. from the chunk public interface ChunkRollOverStrategy { boolean shouldRollOver(long currentBytesIndexed, long currentMessagesIndexed); - public void setActiveChunkDirectory(File activeChunkDirectory); + public void setActiveChunkDirectory(FSDirectory directory); public void close(); } diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java index 4ca1c1700f..a89408d7fb 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java @@ -6,13 +6,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.slack.kaldb.proto.config.KaldbConfigs; import io.micrometer.core.instrument.MeterRegistry; -import java.io.File; +import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.io.FileUtils; +import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class DiskOrMessageCountBasedRolloverStrategy implements ChunkRollOverStr Executors.newScheduledThreadPool( 1, new ThreadFactoryBuilder().setNameFormat("directory-size-%d").build()); - private final AtomicReference activeChunkDirectory = new AtomicReference<>(); + private final AtomicReference activeChunkDirectory = new AtomicReference<>(); private final AtomicLong approximateDirectoryBytes = new AtomicLong(0); public static DiskOrMessageCountBasedRolloverStrategy fromConfig( @@ -86,16 +87,31 @@ public long getMaxBytesPerChunk() { } @Override - public void setActiveChunkDirectory(File activeChunkDirectory) { - this.activeChunkDirectory.set(activeChunkDirectory); + public void setActiveChunkDirectory(FSDirectory directory) { + this.activeChunkDirectory.set(directory); this.approximateDirectoryBytes.set(0); } public void calculateDirectorySize() { try { - File activeChunkDir = activeChunkDirectory.get(); - if (activeChunkDir != null && activeChunkDir.exists()) { - approximateDirectoryBytes.set(FileUtils.sizeOf(activeChunkDir)); + FSDirectory activeChunkDir = activeChunkDirectory.get(); + if (activeChunkDir != null && activeChunkDir.listAll().length > 0) { + + long directorySize = + Arrays.stream(activeChunkDir.listAll()) + .mapToLong( + file -> { + try { + return activeChunkDir.fileLength(file); + } catch (IOException e) { + // There can be a race condition b/w the listAll which filters + // pendingDeletes and then fileLength method which will throw + // NoSuchFileException if the file has now become a pending delete + return 0; + } + }) + .sum(); + approximateDirectoryBytes.set(directorySize); } } catch (Exception e) { LOG.error("Error calculating the directory size", e); diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/MessageSizeOrCountBasedRolloverStrategy.java b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/MessageSizeOrCountBasedRolloverStrategy.java index 8c9c6bdc91..6699b12716 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/MessageSizeOrCountBasedRolloverStrategy.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/MessageSizeOrCountBasedRolloverStrategy.java @@ -4,7 +4,7 @@ import com.slack.kaldb.proto.config.KaldbConfigs; import io.micrometer.core.instrument.MeterRegistry; -import java.io.File; +import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,7 @@ public boolean shouldRollOver(long currentBytesIndexed, long currentMessagesInde } @Override - public void setActiveChunkDirectory(File activeChunkDirectory) {} + public void setActiveChunkDirectory(FSDirectory activeChunkDirectory) {} @Override public void close() {} diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/NeverRolloverChunkStrategy.java b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/NeverRolloverChunkStrategy.java index c42fb76b09..f3c0fcb456 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/NeverRolloverChunkStrategy.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/NeverRolloverChunkStrategy.java @@ -1,6 +1,6 @@ package com.slack.kaldb.chunkrollover; -import java.io.File; +import org.apache.lucene.store.FSDirectory; /** * The NeverRolloverChunkStrategy always responds in the negative for a chunk roll over request. It @@ -13,7 +13,7 @@ public boolean shouldRollOver(long currentBytesIndexed, long currentMessagesInde } @Override - public void setActiveChunkDirectory(File activeChunkDirectory) {} + public void setActiveChunkDirectory(FSDirectory activeChunkDirectory) {} @Override public void close() {} diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java index 20c20cb3db..1fa8cea1c4 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java @@ -3,11 +3,11 @@ import com.slack.kaldb.metadata.schema.LuceneFieldDef; import java.io.Closeable; import java.io.IOException; -import java.nio.file.Path; import java.util.concurrent.ConcurrentHashMap; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.store.FSDirectory; /* An interface that implements a read and write interface for the LogStore */ public interface LogStore extends Closeable { @@ -24,7 +24,7 @@ public interface LogStore extends Closeable { void cleanup() throws IOException; - Path getDirectory(); + FSDirectory getDirectory(); /** * After a commit, lucene may merge multiple segments into one in the background. So, getting a diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index 65512baee8..60335b9e68 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -8,7 +8,6 @@ import io.micrometer.core.instrument.MeterRegistry; import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.time.Duration; import java.util.NoSuchElementException; import java.util.Optional; @@ -220,8 +219,8 @@ private void syncRefresh() throws IOException { } @Override - public Path getDirectory() { - return indexDirectory.getDirectory(); + public FSDirectory getDirectory() { + return indexDirectory; } private void handleNonFatal(Throwable ex) { @@ -289,7 +288,7 @@ public String toString() { + id + '\'' + ", at=" - + getDirectory().toAbsolutePath() + + getDirectory().getDirectory().toAbsolutePath() + '}'; } diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index 66558e1733..56fd673cdf 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -522,7 +522,7 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception assertThat(getTimerCount(REFRESHES_TIMER, meterRegistry)).isEqualTo(1); assertThat(getTimerCount(COMMITS_TIMER, meterRegistry)).isEqualTo(1); - Path dirPath = logStore.getDirectory().toAbsolutePath(); + Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath(); // Create schema file to upload ChunkSchema chunkSchema = diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java index 1a17c19f1f..5cce91c125 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java @@ -359,7 +359,7 @@ private void testS3Snapshot(String bucket, String prefix) throws Exception { assertThat(getTimerCount(REFRESHES_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1); assertThat(getTimerCount(COMMITS_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1); - Path dirPath = logStore.getDirectory().toAbsolutePath(); + Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath(); IndexCommit indexCommit = logStore.getIndexCommit(); Collection activeFiles = indexCommit.getFileNames(); LocalBlobFs localBlobFs = new LocalBlobFs(); @@ -424,7 +424,7 @@ public void testLocalSnapshot() throws IOException { assertThat(getTimerCount(REFRESHES_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1); assertThat(getTimerCount(COMMITS_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1); - Path dirPath = logStore.getDirectory().toAbsolutePath(); + Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath(); IndexCommit indexCommit = logStore.getIndexCommit(); Collection activeFiles = indexCommit.getFileNames(); LocalBlobFs blobFs = new LocalBlobFs(); @@ -472,7 +472,7 @@ public void testCleanup() throws IOException { strictLogStore.logStore.close(); strictLogStore.logSearcher.close(); - File tempFolder = strictLogStore.logStore.getDirectory().toFile(); + File tempFolder = strictLogStore.logStore.getDirectory().getDirectory().toFile(); assertThat(tempFolder.exists()).isTrue(); strictLogStore.logStore.cleanup(); assertThat(tempFolder.exists()).isFalse();