diff --git a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java index d7a47979c7..2a6a38c57d 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunk/ReadWriteChunk.java @@ -220,7 +220,7 @@ public boolean snapshotToS3(String bucket, String prefix, BlobFs blobFs) { IndexCommit indexCommit = null; try { - Path dirPath = logStore.getDirectory().toAbsolutePath(); + Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath(); // Create schema file to upload ChunkSchema chunkSchema = diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java b/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java index 27cf8c656f..0270979e82 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkManager/IndexingChunkManager.java @@ -256,7 +256,7 @@ private ReadWriteChunk getOrCreateActiveChunk( LuceneIndexStoreImpl.makeLogStore( dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry); - chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory().toFile()); + chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory()); ReadWriteChunk newChunk = new IndexingChunkImpl<>( diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/ChunkRollOverStrategy.java b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/ChunkRollOverStrategy.java index c2ad00ca1f..0a7d71a904 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/ChunkRollOverStrategy.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/ChunkRollOverStrategy.java @@ -1,13 +1,13 @@ package com.slack.kaldb.chunkrollover; -import java.io.File; +import org.apache.lucene.store.FSDirectory; // TODO: ChunkRollOverStrategy should take a chunk as an input and get statistics // like message count, size etc. from the chunk public interface ChunkRollOverStrategy { boolean shouldRollOver(long currentBytesIndexed, long currentMessagesIndexed); - public void setActiveChunkDirectory(File activeChunkDirectory); + public void setActiveChunkDirectory(FSDirectory directory); public void close(); } diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java index 4ca1c1700f..a240da63f1 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategy.java @@ -6,13 +6,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.slack.kaldb.proto.config.KaldbConfigs; import io.micrometer.core.instrument.MeterRegistry; -import java.io.File; +import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.io.FileUtils; +import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class DiskOrMessageCountBasedRolloverStrategy implements ChunkRollOverStr Executors.newScheduledThreadPool( 1, new ThreadFactoryBuilder().setNameFormat("directory-size-%d").build()); - private final AtomicReference activeChunkDirectory = new AtomicReference<>(); + private final AtomicReference activeChunkDirectory = new AtomicReference<>(); private final AtomicLong approximateDirectoryBytes = new AtomicLong(0); public static DiskOrMessageCountBasedRolloverStrategy fromConfig( @@ -59,7 +60,13 @@ public DiskOrMessageCountBasedRolloverStrategy( this.liveBytesDirGauge = this.registry.gauge(LIVE_BYTES_DIR, new AtomicLong(0)); directorySizeExecutorService.scheduleAtFixedRate( - this::calculateDirectorySize, + () -> { + long dirSize = calculateDirectorySize(activeChunkDirectory); + // in case the method fails to calculate we return -1 so don't update the old value + if (dirSize > 0) { + approximateDirectoryBytes.set(dirSize); + } + }, DIRECTORY_SIZE_EXECUTOR_PERIOD_MS, DIRECTORY_SIZE_EXECUTOR_PERIOD_MS, TimeUnit.MILLISECONDS); @@ -86,20 +93,40 @@ public long getMaxBytesPerChunk() { } @Override - public void setActiveChunkDirectory(File activeChunkDirectory) { - this.activeChunkDirectory.set(activeChunkDirectory); - this.approximateDirectoryBytes.set(0); + public void setActiveChunkDirectory(FSDirectory directory) { + this.activeChunkDirectory.set(directory); + approximateDirectoryBytes.set(0); } - public void calculateDirectorySize() { + public static long calculateDirectorySize(AtomicReference activeChunkDirectoryRef) { + FSDirectory activeChunkDirectory = activeChunkDirectoryRef.get(); + return calculateDirectorySize(activeChunkDirectory); + } + + public static long calculateDirectorySize(FSDirectory activeChunkDirectory) { try { - File activeChunkDir = activeChunkDirectory.get(); - if (activeChunkDir != null && activeChunkDir.exists()) { - approximateDirectoryBytes.set(FileUtils.sizeOf(activeChunkDir)); + if (activeChunkDirectory != null && activeChunkDirectory.listAll().length > 0) { + + long directorySize = + Arrays.stream(activeChunkDirectory.listAll()) + .mapToLong( + file -> { + try { + return activeChunkDirectory.fileLength(file); + } catch (IOException e) { + // There can be a race condition b/w the listAll which filters + // pendingDeletes and then fileLength method which will throw + // NoSuchFileException if the file has now become a pending delete + return 0; + } + }) + .sum(); + return directorySize; } } catch (Exception e) { LOG.error("Error calculating the directory size", e); } + return -1; } @Override diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/MessageSizeOrCountBasedRolloverStrategy.java b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/MessageSizeOrCountBasedRolloverStrategy.java index 8c9c6bdc91..6699b12716 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/MessageSizeOrCountBasedRolloverStrategy.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/MessageSizeOrCountBasedRolloverStrategy.java @@ -4,7 +4,7 @@ import com.slack.kaldb.proto.config.KaldbConfigs; import io.micrometer.core.instrument.MeterRegistry; -import java.io.File; +import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,7 @@ public boolean shouldRollOver(long currentBytesIndexed, long currentMessagesInde } @Override - public void setActiveChunkDirectory(File activeChunkDirectory) {} + public void setActiveChunkDirectory(FSDirectory activeChunkDirectory) {} @Override public void close() {} diff --git a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/NeverRolloverChunkStrategy.java b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/NeverRolloverChunkStrategy.java index c42fb76b09..f3c0fcb456 100644 --- a/kaldb/src/main/java/com/slack/kaldb/chunkrollover/NeverRolloverChunkStrategy.java +++ b/kaldb/src/main/java/com/slack/kaldb/chunkrollover/NeverRolloverChunkStrategy.java @@ -1,6 +1,6 @@ package com.slack.kaldb.chunkrollover; -import java.io.File; +import org.apache.lucene.store.FSDirectory; /** * The NeverRolloverChunkStrategy always responds in the negative for a chunk roll over request. It @@ -13,7 +13,7 @@ public boolean shouldRollOver(long currentBytesIndexed, long currentMessagesInde } @Override - public void setActiveChunkDirectory(File activeChunkDirectory) {} + public void setActiveChunkDirectory(FSDirectory activeChunkDirectory) {} @Override public void close() {} diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java index 20c20cb3db..1fa8cea1c4 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java @@ -3,11 +3,11 @@ import com.slack.kaldb.metadata.schema.LuceneFieldDef; import java.io.Closeable; import java.io.IOException; -import java.nio.file.Path; import java.util.concurrent.ConcurrentHashMap; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.store.FSDirectory; /* An interface that implements a read and write interface for the LogStore */ public interface LogStore extends Closeable { @@ -24,7 +24,7 @@ public interface LogStore extends Closeable { void cleanup() throws IOException; - Path getDirectory(); + FSDirectory getDirectory(); /** * After a commit, lucene may merge multiple segments into one in the background. So, getting a diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index 65512baee8..60335b9e68 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -8,7 +8,6 @@ import io.micrometer.core.instrument.MeterRegistry; import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.time.Duration; import java.util.NoSuchElementException; import java.util.Optional; @@ -220,8 +219,8 @@ private void syncRefresh() throws IOException { } @Override - public Path getDirectory() { - return indexDirectory.getDirectory(); + public FSDirectory getDirectory() { + return indexDirectory; } private void handleNonFatal(Throwable ex) { @@ -289,7 +288,7 @@ public String toString() { + id + '\'' + ", at=" - + getDirectory().toAbsolutePath() + + getDirectory().getDirectory().toAbsolutePath() + '}'; } diff --git a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java index 66558e1733..56fd673cdf 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunk/ReadOnlyChunkImplTest.java @@ -522,7 +522,7 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception assertThat(getTimerCount(REFRESHES_TIMER, meterRegistry)).isEqualTo(1); assertThat(getTimerCount(COMMITS_TIMER, meterRegistry)).isEqualTo(1); - Path dirPath = logStore.getDirectory().toAbsolutePath(); + Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath(); // Create schema file to upload ChunkSchema chunkSchema = diff --git a/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java b/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java index 2b159e1ebf..df711ef078 100644 --- a/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java @@ -9,6 +9,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import brave.Tracing; import com.adobe.testing.s3mock.junit5.S3MockExtension; @@ -39,6 +41,7 @@ import java.util.concurrent.TimeoutException; import org.apache.curator.test.TestingServer; import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.lucene.store.FSDirectory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -329,4 +332,26 @@ public void testChunkRollOver() { assertThat(chunkRollOverStrategy.shouldRollOver(100, 2001)).isTrue(); assertThat(chunkRollOverStrategy.shouldRollOver(1001, 2001)).isTrue(); } + + @Test + public void testCalculateDirectorySize() throws IOException { + FSDirectory directory = mock(FSDirectory.class); + when(directory.listAll()).thenThrow(IOException.class); + assertThat(DiskOrMessageCountBasedRolloverStrategy.calculateDirectorySize(directory)) + .isEqualTo(-1); + + directory = mock(FSDirectory.class); + when(directory.listAll()).thenReturn(new String[] {"file1", "file2"}); + when(directory.fileLength("file1")).thenReturn(1L); + when(directory.fileLength("file2")).thenReturn(2L); + assertThat(DiskOrMessageCountBasedRolloverStrategy.calculateDirectorySize(directory)) + .isEqualTo(3); + + directory = mock(FSDirectory.class); + when(directory.listAll()).thenReturn(new String[] {"file1", "file2"}); + when(directory.fileLength("file1")).thenReturn(1L); + when(directory.fileLength("file2")).thenThrow(IOException.class); + assertThat(DiskOrMessageCountBasedRolloverStrategy.calculateDirectorySize(directory)) + .isEqualTo(1); + } } diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java index 1a17c19f1f..5cce91c125 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/LuceneIndexStoreImplTest.java @@ -359,7 +359,7 @@ private void testS3Snapshot(String bucket, String prefix) throws Exception { assertThat(getTimerCount(REFRESHES_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1); assertThat(getTimerCount(COMMITS_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1); - Path dirPath = logStore.getDirectory().toAbsolutePath(); + Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath(); IndexCommit indexCommit = logStore.getIndexCommit(); Collection activeFiles = indexCommit.getFileNames(); LocalBlobFs localBlobFs = new LocalBlobFs(); @@ -424,7 +424,7 @@ public void testLocalSnapshot() throws IOException { assertThat(getTimerCount(REFRESHES_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1); assertThat(getTimerCount(COMMITS_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1); - Path dirPath = logStore.getDirectory().toAbsolutePath(); + Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath(); IndexCommit indexCommit = logStore.getIndexCommit(); Collection activeFiles = indexCommit.getFileNames(); LocalBlobFs blobFs = new LocalBlobFs(); @@ -472,7 +472,7 @@ public void testCleanup() throws IOException { strictLogStore.logStore.close(); strictLogStore.logSearcher.close(); - File tempFolder = strictLogStore.logStore.getDirectory().toFile(); + File tempFolder = strictLogStore.logStore.getDirectory().getDirectory().toFile(); assertThat(tempFolder.exists()).isTrue(); strictLogStore.logStore.cleanup(); assertThat(tempFolder.exists()).isFalse();