Skip to content

Commit

Permalink
Add IndexInput#prefetch.
Browse files Browse the repository at this point in the history
This adds `IndexInput#prefetch`, which is an optional operation that instructs
the `IndexInput` to start fetching bytes from storage in the background. These
bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods.
In the future, this will help Lucene move from a maximum of one I/O operation
per search thread to one I/O operation per search thread per `IndexInput`.
Typically, when running a query on two terms, the I/O into the terms dictionary
is sequential today. In the future, we would ideally do these I/Os in parallel
using this new API. Note that this will require API changes to some classes
including `TermsEnum`.

I settled on this API because it's simple and wouldn't require making all
Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I
worry would make the query evaluation logic too complicated.

Currently, only `NIOFSDirectory` implements this new API. I played with
`MMapDirectory` as well and found an approach that worked better in the
benchmark I've been playing with, but I'm not sure it makes sense to implement
this API on this directory as it either requires adding an explicit buffer on
`MMapDirectory`, or forcing data to be loaded into the page cache even though
the OS may have decided that it's not a good idea due to too few cache hits.

This change will require follow-ups to start using this new API when working
with terms dictionaries, postings, etc.

Relates apache#13179
  • Loading branch information
jpountz committed May 2, 2024
1 parent 9af3ef8 commit d613cf2
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 12 deletions.
135 changes: 129 additions & 6 deletions lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.lucene.util.GroupVIntUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;

/** Base implementation class for buffered {@link IndexInput}. */
public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput {
Expand All @@ -45,7 +50,15 @@ public abstract class BufferedIndexInput extends IndexInput implements RandomAcc

private final int bufferSize;

private ByteBuffer buffer = EMPTY_BYTEBUFFER;
// Despite the two buffer references below, BufferedIndexInput only tracks a single buffer. Either
// prefetch() has been called last and `buffer` is set to EMPTY_BYTEBUFFER while `prefetchBuffer`
// tracks the actual buffer, or prefetchBuffer is set to EMPTY_BYTEBUFFER and `buffer` tracks the
// actual buffer. This approach helps only check if `buffer.hasRemaining()` to know whether to
// trigger a refill(), and refill() will check if there is a pending prefetch() before actually
// reading bytes.
private ByteBuffer buffer = EMPTY_BYTEBUFFER; // initialized lazily
private ByteBuffer prefetchBuffer = EMPTY_BYTEBUFFER;
private Future<Integer> pendingPrefetch; // only non-null if there is a pending prefetch()

private long bufferStart = 0; // position in file of buffer

Expand Down Expand Up @@ -90,6 +103,13 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException {

@Override
public final void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
// We need to finish pending prefetch operations to use data from the prefetch() instead of
// reading directly bytes into the user's buffer.
// Other readXXX methods don't need to do this since they always call refill() when they don't
// have enough data, which in-turn calls finishPendingPrefetch(). But readBytes() may read bytes
// into the user's buffer without refilling the internal buffer.
finishPendingPrefetch();

int available = buffer.remaining();
if (len <= available) {
// the buffer contains enough data to satisfy this request
Expand Down Expand Up @@ -297,19 +317,33 @@ public final long readLong(long pos) throws IOException {
return buffer.getLong((int) index);
}

private void maybeInitBuffer() throws IOException {
assert pendingPrefetch == null;
assert prefetchBuffer == EMPTY_BYTEBUFFER;

if (buffer == EMPTY_BYTEBUFFER) {
buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN).limit(0);
seekInternal(bufferStart);
}
}

private void refill() throws IOException {
assert buffer.hasRemaining() == false;

// Wait for pending prefetching to finish.
if (finishPendingPrefetch()) {
return;
}

long start = bufferStart + buffer.position();
long end = start + bufferSize;
if (end > length()) // don't read past EOF
end = length();
int newLength = (int) (end - start);
if (newLength <= 0) throw new EOFException("read past EOF: " + this);

if (buffer == EMPTY_BYTEBUFFER) {
buffer =
ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); // allocate buffer lazily
seekInternal(bufferStart);
}
// allocate buffer lazily
maybeInitBuffer();
buffer.position(0);
buffer.limit(newLength);
bufferStart = start;
Expand All @@ -321,25 +355,112 @@ private void refill() throws IOException {
buffer.flip();
}

private boolean finishPendingPrefetch() throws IOException {
if (pendingPrefetch != null) {
final int i;
try {
i = pendingPrefetch.get();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException e) {
throw IOUtils.rethrowAlways(e.getCause());
} finally {
// Always clear pendingPrefetch and swap buffers, regardless of success/failure so that
// future read() operations work on the correct buffer.
pendingPrefetch = null;
prefetchBuffer.flip();
buffer = prefetchBuffer;
prefetchBuffer = EMPTY_BYTEBUFFER;
}

if (i < 0) {
// be defensive here, even though we checked before hand, something could have changed
throw new EOFException(
"read past EOF: " + this + " buffer: " + prefetchBuffer + " length: " + length());
}

return buffer.hasRemaining();
}
return false;
}

@Override
public void prefetch() throws IOException {
final long pos = getFilePointer();
final long length = length();
if (pos >= length) {
throw new EOFException("read past EOF: " + this);
}

// Make sure to never have two concurrent prefetch() calls trying to push bytes to the same
// buffer.
if (pendingPrefetch != null) {
// prefetch() got called twice without reading bytes in-between?
// nocommit should we fail instead?
return;
}

if (buffer.hasRemaining()) {
// the seek() that preceded prefetch() moved within the buffer, so we still have valid bytes
// TODO: should we still prefetch more bytes in this case if there are very few bytes left?
return;
} else {
// The buffer may not have been initialized yet, e.g. if prefetch() was called immediately
// after
// calling clone() then seek().
maybeInitBuffer();
}

assert buffer.capacity() > 0;
assert prefetchBuffer == EMPTY_BYTEBUFFER;

bufferStart = pos;
final ByteBuffer prefetchBuffer = buffer;
prefetchBuffer.position(0);
final int limit = (int) Math.min(length - bufferStart, prefetchBuffer.capacity());
assert limit > 0;
prefetchBuffer.limit(limit);
// Note: The read operation may read fewer bytes than requested. This is ok.
pendingPrefetch = readInternalAsync(prefetchBuffer);
// Only swap buffers if we successfully scheduled an async read.
this.prefetchBuffer = prefetchBuffer;
this.buffer = EMPTY_BYTEBUFFER; // trigger refill on next read()
}

/**
* Expert: implements buffer refill. Reads bytes from the current position in the input.
*
* @param b the buffer to read bytes into
*/
protected abstract void readInternal(ByteBuffer b) throws IOException;

/**
* Expert: implements asynchronous buffer refill. Unlike {@link #readInternal}, this may read less
* than {@link ByteBuffer#remaining()} bytes.
*/
protected Future<Integer> readInternalAsync(ByteBuffer b) throws IOException {
CompletableFuture<Integer> res = new CompletableFuture<>();
res.complete(0);
return res;
}

@Override
public final long getFilePointer() {
return bufferStart + buffer.position();
}

@Override
public final void seek(long pos) throws IOException {
// If there is a pending prefetch(), wait for it to finish before moving the file pointer.
finishPendingPrefetch();
assert prefetchBuffer == EMPTY_BYTEBUFFER;

if (pos >= bufferStart && pos < (bufferStart + buffer.limit()))
buffer.position((int) (pos - bufferStart)); // seek within buffer
else {
bufferStart = pos;
buffer.limit(0); // trigger refill() on read
prefetchBuffer.limit(0);
seekInternal(pos);
}
}
Expand All @@ -357,6 +478,8 @@ public BufferedIndexInput clone() {
BufferedIndexInput clone = (BufferedIndexInput) super.clone();

clone.buffer = EMPTY_BYTEBUFFER;
clone.prefetchBuffer = EMPTY_BYTEBUFFER;
clone.pendingPrefetch = null;
clone.bufferStart = getFilePointer();

return clone;
Expand Down
9 changes: 9 additions & 0 deletions lucene/core/src/java/org/apache/lucene/store/IndexInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,13 @@ public String toString() {
};
}
}

/**
* Optional method: Give a hint to this input that some bytes will be read in the near future.
* IndexInput implementations may take advantage of this hint to start fetching a page of data
* immediately from storage.
*
* <p>The default implementation is a no-op.
*/
public void prefetch() throws IOException {}
}
66 changes: 60 additions & 6 deletions lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.ClosedChannelException; // javadoc @link
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; // javadoc
import org.apache.lucene.util.IOUtils;

Expand All @@ -47,6 +54,23 @@
*/
public class NIOFSDirectory extends FSDirectory {

private final ExecutorService executorService;

/**
* Create a new NIOFSDirectory for the named location. The directory is created at the named
* location if it does not yet exist.
*
* @param path the path of the directory
* @param lockFactory the lock factory to use
* @param executorService the executor to use for prefetching
* @throws IOException if there is a low-level I/O error
*/
public NIOFSDirectory(Path path, LockFactory lockFactory, ExecutorService executorService)
throws IOException {
super(path, lockFactory);
this.executorService = Objects.requireNonNull(executorService);
}

/**
* Create a new NIOFSDirectory for the named location. The directory is created at the named
* location if it does not yet exist.
Expand All @@ -57,6 +81,7 @@ public class NIOFSDirectory extends FSDirectory {
*/
public NIOFSDirectory(Path path, LockFactory lockFactory) throws IOException {
super(path, lockFactory);
this.executorService = Executors.newVirtualThreadPerTaskExecutor();
}

/**
Expand All @@ -70,21 +95,32 @@ public NIOFSDirectory(Path path) throws IOException {
this(path, FSLockFactory.getDefault());
}

@Override
public void close() throws IOException {
super.close();
executorService.shutdown();
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
ensureCanRead(name);
Path path = getDirectory().resolve(name);
FileChannel fc = FileChannel.open(path, StandardOpenOption.READ);
Set<OpenOption> openOptions = Collections.singleton(StandardOpenOption.READ);
// nocommit: does it really make sense to open both a sync and an async channel on the same
// file? or should we do sync reads via the async channel (but this seems to come with
// noticeable overhead when data fits in the cache)?
FileChannel fc = FileChannel.open(path, openOptions);
AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, openOptions, executorService);
boolean success = false;
try {
final NIOFSIndexInput indexInput =
new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context);
new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, afc, context);
success = true;
return indexInput;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(fc);
IOUtils.closeWhileHandlingException(fc, afc);
}
}
}
Expand All @@ -97,6 +133,9 @@ static final class NIOFSIndexInput extends BufferedIndexInput {
/** the file channel we will read from */
protected final FileChannel channel;

/** the asynchronous channel to use for prefetching */
protected final AsynchronousFileChannel asynchronousChannel;

/** is this instance a clone and hence does not own the file to close it */
boolean isClone = false;

Expand All @@ -106,18 +145,26 @@ static final class NIOFSIndexInput extends BufferedIndexInput {
/** end offset (start+length) */
protected final long end;

public NIOFSIndexInput(String resourceDesc, FileChannel fc, IOContext context)
public NIOFSIndexInput(
String resourceDesc, FileChannel fc, AsynchronousFileChannel afc, IOContext context)
throws IOException {
super(resourceDesc, context);
this.channel = fc;
this.asynchronousChannel = afc;
this.off = 0L;
this.end = fc.size();
}

public NIOFSIndexInput(
String resourceDesc, FileChannel fc, long off, long length, int bufferSize) {
String resourceDesc,
FileChannel fc,
AsynchronousFileChannel afc,
long off,
long length,
int bufferSize) {
super(resourceDesc, bufferSize);
this.channel = fc;
this.asynchronousChannel = afc;
this.off = off;
this.end = off + length;
this.isClone = true;
Expand All @@ -126,7 +173,7 @@ public NIOFSIndexInput(
@Override
public void close() throws IOException {
if (!isClone) {
channel.close();
IOUtils.close(channel, asynchronousChannel);
}
}

Expand Down Expand Up @@ -155,6 +202,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw
return new NIOFSIndexInput(
getFullSliceDescription(sliceDescription),
channel,
asynchronousChannel,
off + offset,
length,
getBufferSize());
Expand Down Expand Up @@ -204,6 +252,12 @@ protected void readInternal(ByteBuffer b) throws IOException {
}
}

@Override
protected Future<Integer> readInternalAsync(ByteBuffer b) throws IOException {
long pos = getFilePointer() + off;
return asynchronousChannel.read(b, pos);
}

@Override
protected void seekInternal(long pos) throws IOException {
if (pos > length()) {
Expand Down
Loading

0 comments on commit d613cf2

Please sign in to comment.