Skip to content

Commit

Permalink
use lucene file directory abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Sep 8, 2023
1 parent 469c00f commit 8988fc1
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public boolean snapshotToS3(String bucket, String prefix, BlobFs blobFs) {

IndexCommit indexCommit = null;
try {
Path dirPath = logStore.getDirectory().toAbsolutePath();
Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath();

// Create schema file to upload
ChunkSchema chunkSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private ReadWriteChunk<T> getOrCreateActiveChunk(
LuceneIndexStoreImpl.makeLogStore(
dataDirectory, indexerConfig.getLuceneConfig(), meterRegistry);

chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory().toFile());
chunkRollOverStrategy.setActiveChunkDirectory(logStore.getDirectory());

ReadWriteChunk<T> newChunk =
new IndexingChunkImpl<>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.slack.kaldb.chunkrollover;

import java.io.File;
import org.apache.lucene.store.FSDirectory;

// TODO: ChunkRollOverStrategy should take a chunk as an input and get statistics
// like message count, size etc. from the chunk
public interface ChunkRollOverStrategy {
boolean shouldRollOver(long currentBytesIndexed, long currentMessagesIndexed);

public void setActiveChunkDirectory(File activeChunkDirectory);
public void setActiveChunkDirectory(FSDirectory directory);

public void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.kaldb.proto.config.KaldbConfigs;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -40,7 +41,7 @@ public class DiskOrMessageCountBasedRolloverStrategy implements ChunkRollOverStr
Executors.newScheduledThreadPool(
1, new ThreadFactoryBuilder().setNameFormat("directory-size-%d").build());

private final AtomicReference<File> activeChunkDirectory = new AtomicReference<>();
private final AtomicReference<FSDirectory> activeChunkDirectory = new AtomicReference<>();
private final AtomicLong approximateDirectoryBytes = new AtomicLong(0);

public static DiskOrMessageCountBasedRolloverStrategy fromConfig(
Expand Down Expand Up @@ -86,16 +87,31 @@ public long getMaxBytesPerChunk() {
}

@Override
public void setActiveChunkDirectory(File activeChunkDirectory) {
this.activeChunkDirectory.set(activeChunkDirectory);
public void setActiveChunkDirectory(FSDirectory directory) {
this.activeChunkDirectory.set(directory);
this.approximateDirectoryBytes.set(0);
}

public void calculateDirectorySize() {
try {
File activeChunkDir = activeChunkDirectory.get();
if (activeChunkDir != null && activeChunkDir.exists()) {
approximateDirectoryBytes.set(FileUtils.sizeOf(activeChunkDir));
FSDirectory activeChunkDir = activeChunkDirectory.get();
if (activeChunkDir != null && activeChunkDir.listAll().length > 0) {

long directorySize =
Arrays.stream(activeChunkDir.listAll())
.mapToLong(
file -> {
try {
return activeChunkDir.fileLength(file);
} catch (IOException e) {
// There can be a race condition b/w the listAll which filters
// pendingDeletes and then fileLength method which will throw
// NoSuchFileException if the file has now become a pending delete
return 0;
}
})
.sum();
approximateDirectoryBytes.set(directorySize);
}
} catch (Exception e) {
LOG.error("Error calculating the directory size", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import com.slack.kaldb.proto.config.KaldbConfigs;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.File;
import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,7 +52,7 @@ public boolean shouldRollOver(long currentBytesIndexed, long currentMessagesInde
}

@Override
public void setActiveChunkDirectory(File activeChunkDirectory) {}
public void setActiveChunkDirectory(FSDirectory activeChunkDirectory) {}

@Override
public void close() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.slack.kaldb.chunkrollover;

import java.io.File;
import org.apache.lucene.store.FSDirectory;

/**
* The NeverRolloverChunkStrategy always responds in the negative for a chunk roll over request. It
Expand All @@ -13,7 +13,7 @@ public boolean shouldRollOver(long currentBytesIndexed, long currentMessagesInde
}

@Override
public void setActiveChunkDirectory(File activeChunkDirectory) {}
public void setActiveChunkDirectory(FSDirectory activeChunkDirectory) {}

@Override
public void close() {}
Expand Down
4 changes: 2 additions & 2 deletions kaldb/src/main/java/com/slack/kaldb/logstore/LogStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import com.slack.kaldb.metadata.schema.LuceneFieldDef;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.FSDirectory;

/* An interface that implements a read and write interface for the LogStore */
public interface LogStore<T> extends Closeable {
Expand All @@ -24,7 +24,7 @@ public interface LogStore<T> extends Closeable {

void cleanup() throws IOException;

Path getDirectory();
FSDirectory getDirectory();

/**
* After a commit, lucene may merge multiple segments into one in the background. So, getting a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.micrometer.core.instrument.MeterRegistry;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Optional;
Expand Down Expand Up @@ -220,8 +219,8 @@ private void syncRefresh() throws IOException {
}

@Override
public Path getDirectory() {
return indexDirectory.getDirectory();
public FSDirectory getDirectory() {
return indexDirectory;
}

private void handleNonFatal(Throwable ex) {
Expand Down Expand Up @@ -289,7 +288,7 @@ public String toString() {
+ id
+ '\''
+ ", at="
+ getDirectory().toAbsolutePath()
+ getDirectory().getDirectory().toAbsolutePath()
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ private void initializeBlobStorageWithIndex(String snapshotId) throws Exception
assertThat(getTimerCount(REFRESHES_TIMER, meterRegistry)).isEqualTo(1);
assertThat(getTimerCount(COMMITS_TIMER, meterRegistry)).isEqualTo(1);

Path dirPath = logStore.getDirectory().toAbsolutePath();
Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath();

// Create schema file to upload
ChunkSchema chunkSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ private void testS3Snapshot(String bucket, String prefix) throws Exception {
assertThat(getTimerCount(REFRESHES_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1);
assertThat(getTimerCount(COMMITS_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1);

Path dirPath = logStore.getDirectory().toAbsolutePath();
Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath();
IndexCommit indexCommit = logStore.getIndexCommit();
Collection<String> activeFiles = indexCommit.getFileNames();
LocalBlobFs localBlobFs = new LocalBlobFs();
Expand Down Expand Up @@ -424,7 +424,7 @@ public void testLocalSnapshot() throws IOException {
assertThat(getTimerCount(REFRESHES_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1);
assertThat(getTimerCount(COMMITS_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1);

Path dirPath = logStore.getDirectory().toAbsolutePath();
Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath();
IndexCommit indexCommit = logStore.getIndexCommit();
Collection<String> activeFiles = indexCommit.getFileNames();
LocalBlobFs blobFs = new LocalBlobFs();
Expand Down Expand Up @@ -472,7 +472,7 @@ public void testCleanup() throws IOException {
strictLogStore.logStore.close();
strictLogStore.logSearcher.close();

File tempFolder = strictLogStore.logStore.getDirectory().toFile();
File tempFolder = strictLogStore.logStore.getDirectory().getDirectory().toFile();
assertThat(tempFolder.exists()).isTrue();
strictLogStore.logStore.cleanup();
assertThat(tempFolder.exists()).isFalse();
Expand Down

0 comments on commit 8988fc1

Please sign in to comment.