From d613cf20c047a9952cfb26b2aaed7b250f218ae3 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Tue, 31 Jan 2023 15:39:50 +0100 Subject: [PATCH] Add IndexInput#prefetch. This adds `IndexInput#prefetch`, which is an optional operation that instructs the `IndexInput` to start fetching bytes from storage in the background. These bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods. In the future, this will help Lucene move from a maximum of one I/O operation per search thread to one I/O operation per search thread per `IndexInput`. Typically, when running a query on two terms, the I/O into the terms dictionary is sequential today. In the future, we would ideally do these I/Os in parallel using this new API. Note that this will require API changes to some classes including `TermsEnum`. I settled on this API because it's simple and wouldn't require making all Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I worry would make the query evaluation logic too complicated. Currently, only `NIOFSDirectory` implements this new API. I played with `MMapDirectory` as well and found an approach that worked better in the benchmark I've been playing with, but I'm not sure it makes sense to implement this API on this directory as it either requires adding an explicit buffer on `MMapDirectory`, or forcing data to be loaded into the page cache even though the OS may have decided that it's not a good idea due to too few cache hits. This change will require follow-ups to start using this new API when working with terms dictionaries, postings, etc. Relates #13179 --- .../lucene/store/BufferedIndexInput.java | 135 +++++++++++++++++- .../org/apache/lucene/store/IndexInput.java | 9 ++ .../apache/lucene/store/NIOFSDirectory.java | 66 ++++++++- .../tests/store/BaseDirectoryTestCase.java | 61 ++++++++ 4 files changed, 259 insertions(+), 12 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java index 13151692bc06..36f33f64bf64 100644 --- a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java @@ -20,7 +20,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.lucene.util.GroupVIntUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.ThreadInterruptedException; /** Base implementation class for buffered {@link IndexInput}. */ public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput { @@ -45,7 +50,15 @@ public abstract class BufferedIndexInput extends IndexInput implements RandomAcc private final int bufferSize; - private ByteBuffer buffer = EMPTY_BYTEBUFFER; + // Despite the two buffer references below, BufferedIndexInput only tracks a single buffer. Either + // prefetch() has been called last and `buffer` is set to EMPTY_BYTEBUFFER while `prefetchBuffer` + // tracks the actual buffer, or prefetchBuffer is set to EMPTY_BYTEBUFFER and `buffer` tracks the + // actual buffer. This approach helps only check if `buffer.hasRemaining()` to know whether to + // trigger a refill(), and refill() will check if there is a pending prefetch() before actually + // reading bytes. + private ByteBuffer buffer = EMPTY_BYTEBUFFER; // initialized lazily + private ByteBuffer prefetchBuffer = EMPTY_BYTEBUFFER; + private Future pendingPrefetch; // only non-null if there is a pending prefetch() private long bufferStart = 0; // position in file of buffer @@ -90,6 +103,13 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException { @Override public final void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException { + // We need to finish pending prefetch operations to use data from the prefetch() instead of + // reading directly bytes into the user's buffer. + // Other readXXX methods don't need to do this since they always call refill() when they don't + // have enough data, which in-turn calls finishPendingPrefetch(). But readBytes() may read bytes + // into the user's buffer without refilling the internal buffer. + finishPendingPrefetch(); + int available = buffer.remaining(); if (len <= available) { // the buffer contains enough data to satisfy this request @@ -297,7 +317,24 @@ public final long readLong(long pos) throws IOException { return buffer.getLong((int) index); } + private void maybeInitBuffer() throws IOException { + assert pendingPrefetch == null; + assert prefetchBuffer == EMPTY_BYTEBUFFER; + + if (buffer == EMPTY_BYTEBUFFER) { + buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN).limit(0); + seekInternal(bufferStart); + } + } + private void refill() throws IOException { + assert buffer.hasRemaining() == false; + + // Wait for pending prefetching to finish. + if (finishPendingPrefetch()) { + return; + } + long start = bufferStart + buffer.position(); long end = start + bufferSize; if (end > length()) // don't read past EOF @@ -305,11 +342,8 @@ private void refill() throws IOException { int newLength = (int) (end - start); if (newLength <= 0) throw new EOFException("read past EOF: " + this); - if (buffer == EMPTY_BYTEBUFFER) { - buffer = - ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); // allocate buffer lazily - seekInternal(bufferStart); - } + // allocate buffer lazily + maybeInitBuffer(); buffer.position(0); buffer.limit(newLength); bufferStart = start; @@ -321,6 +355,78 @@ private void refill() throws IOException { buffer.flip(); } + private boolean finishPendingPrefetch() throws IOException { + if (pendingPrefetch != null) { + final int i; + try { + i = pendingPrefetch.get(); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } catch (ExecutionException e) { + throw IOUtils.rethrowAlways(e.getCause()); + } finally { + // Always clear pendingPrefetch and swap buffers, regardless of success/failure so that + // future read() operations work on the correct buffer. + pendingPrefetch = null; + prefetchBuffer.flip(); + buffer = prefetchBuffer; + prefetchBuffer = EMPTY_BYTEBUFFER; + } + + if (i < 0) { + // be defensive here, even though we checked before hand, something could have changed + throw new EOFException( + "read past EOF: " + this + " buffer: " + prefetchBuffer + " length: " + length()); + } + + return buffer.hasRemaining(); + } + return false; + } + + @Override + public void prefetch() throws IOException { + final long pos = getFilePointer(); + final long length = length(); + if (pos >= length) { + throw new EOFException("read past EOF: " + this); + } + + // Make sure to never have two concurrent prefetch() calls trying to push bytes to the same + // buffer. + if (pendingPrefetch != null) { + // prefetch() got called twice without reading bytes in-between? + // nocommit should we fail instead? + return; + } + + if (buffer.hasRemaining()) { + // the seek() that preceded prefetch() moved within the buffer, so we still have valid bytes + // TODO: should we still prefetch more bytes in this case if there are very few bytes left? + return; + } else { + // The buffer may not have been initialized yet, e.g. if prefetch() was called immediately + // after + // calling clone() then seek(). + maybeInitBuffer(); + } + + assert buffer.capacity() > 0; + assert prefetchBuffer == EMPTY_BYTEBUFFER; + + bufferStart = pos; + final ByteBuffer prefetchBuffer = buffer; + prefetchBuffer.position(0); + final int limit = (int) Math.min(length - bufferStart, prefetchBuffer.capacity()); + assert limit > 0; + prefetchBuffer.limit(limit); + // Note: The read operation may read fewer bytes than requested. This is ok. + pendingPrefetch = readInternalAsync(prefetchBuffer); + // Only swap buffers if we successfully scheduled an async read. + this.prefetchBuffer = prefetchBuffer; + this.buffer = EMPTY_BYTEBUFFER; // trigger refill on next read() + } + /** * Expert: implements buffer refill. Reads bytes from the current position in the input. * @@ -328,6 +434,16 @@ private void refill() throws IOException { */ protected abstract void readInternal(ByteBuffer b) throws IOException; + /** + * Expert: implements asynchronous buffer refill. Unlike {@link #readInternal}, this may read less + * than {@link ByteBuffer#remaining()} bytes. + */ + protected Future readInternalAsync(ByteBuffer b) throws IOException { + CompletableFuture res = new CompletableFuture<>(); + res.complete(0); + return res; + } + @Override public final long getFilePointer() { return bufferStart + buffer.position(); @@ -335,11 +451,16 @@ public final long getFilePointer() { @Override public final void seek(long pos) throws IOException { + // If there is a pending prefetch(), wait for it to finish before moving the file pointer. + finishPendingPrefetch(); + assert prefetchBuffer == EMPTY_BYTEBUFFER; + if (pos >= bufferStart && pos < (bufferStart + buffer.limit())) buffer.position((int) (pos - bufferStart)); // seek within buffer else { bufferStart = pos; buffer.limit(0); // trigger refill() on read + prefetchBuffer.limit(0); seekInternal(pos); } } @@ -357,6 +478,8 @@ public BufferedIndexInput clone() { BufferedIndexInput clone = (BufferedIndexInput) super.clone(); clone.buffer = EMPTY_BYTEBUFFER; + clone.prefetchBuffer = EMPTY_BYTEBUFFER; + clone.pendingPrefetch = null; clone.bufferStart = getFilePointer(); return clone; diff --git a/lucene/core/src/java/org/apache/lucene/store/IndexInput.java b/lucene/core/src/java/org/apache/lucene/store/IndexInput.java index 3f703bc54b26..1c0780a2127e 100644 --- a/lucene/core/src/java/org/apache/lucene/store/IndexInput.java +++ b/lucene/core/src/java/org/apache/lucene/store/IndexInput.java @@ -191,4 +191,13 @@ public String toString() { }; } } + + /** + * Optional method: Give a hint to this input that some bytes will be read in the near future. + * IndexInput implementations may take advantage of this hint to start fetching a page of data + * immediately from storage. + * + *

The default implementation is a no-op. + */ + public void prefetch() throws IOException {} } diff --git a/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java b/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java index 246f48082cfe..15cfd1ce5197 100644 --- a/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java +++ b/lucene/core/src/java/org/apache/lucene/store/NIOFSDirectory.java @@ -19,10 +19,17 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.ClosedChannelException; // javadoc @link import java.nio.channels.FileChannel; +import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; // javadoc import org.apache.lucene.util.IOUtils; @@ -47,6 +54,23 @@ */ public class NIOFSDirectory extends FSDirectory { + private final ExecutorService executorService; + + /** + * Create a new NIOFSDirectory for the named location. The directory is created at the named + * location if it does not yet exist. + * + * @param path the path of the directory + * @param lockFactory the lock factory to use + * @param executorService the executor to use for prefetching + * @throws IOException if there is a low-level I/O error + */ + public NIOFSDirectory(Path path, LockFactory lockFactory, ExecutorService executorService) + throws IOException { + super(path, lockFactory); + this.executorService = Objects.requireNonNull(executorService); + } + /** * Create a new NIOFSDirectory for the named location. The directory is created at the named * location if it does not yet exist. @@ -57,6 +81,7 @@ public class NIOFSDirectory extends FSDirectory { */ public NIOFSDirectory(Path path, LockFactory lockFactory) throws IOException { super(path, lockFactory); + this.executorService = Executors.newVirtualThreadPerTaskExecutor(); } /** @@ -70,21 +95,32 @@ public NIOFSDirectory(Path path) throws IOException { this(path, FSLockFactory.getDefault()); } + @Override + public void close() throws IOException { + super.close(); + executorService.shutdown(); + } + @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); ensureCanRead(name); Path path = getDirectory().resolve(name); - FileChannel fc = FileChannel.open(path, StandardOpenOption.READ); + Set openOptions = Collections.singleton(StandardOpenOption.READ); + // nocommit: does it really make sense to open both a sync and an async channel on the same + // file? or should we do sync reads via the async channel (but this seems to come with + // noticeable overhead when data fits in the cache)? + FileChannel fc = FileChannel.open(path, openOptions); + AsynchronousFileChannel afc = AsynchronousFileChannel.open(path, openOptions, executorService); boolean success = false; try { final NIOFSIndexInput indexInput = - new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context); + new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, afc, context); success = true; return indexInput; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(fc); + IOUtils.closeWhileHandlingException(fc, afc); } } } @@ -97,6 +133,9 @@ static final class NIOFSIndexInput extends BufferedIndexInput { /** the file channel we will read from */ protected final FileChannel channel; + /** the asynchronous channel to use for prefetching */ + protected final AsynchronousFileChannel asynchronousChannel; + /** is this instance a clone and hence does not own the file to close it */ boolean isClone = false; @@ -106,18 +145,26 @@ static final class NIOFSIndexInput extends BufferedIndexInput { /** end offset (start+length) */ protected final long end; - public NIOFSIndexInput(String resourceDesc, FileChannel fc, IOContext context) + public NIOFSIndexInput( + String resourceDesc, FileChannel fc, AsynchronousFileChannel afc, IOContext context) throws IOException { super(resourceDesc, context); this.channel = fc; + this.asynchronousChannel = afc; this.off = 0L; this.end = fc.size(); } public NIOFSIndexInput( - String resourceDesc, FileChannel fc, long off, long length, int bufferSize) { + String resourceDesc, + FileChannel fc, + AsynchronousFileChannel afc, + long off, + long length, + int bufferSize) { super(resourceDesc, bufferSize); this.channel = fc; + this.asynchronousChannel = afc; this.off = off; this.end = off + length; this.isClone = true; @@ -126,7 +173,7 @@ public NIOFSIndexInput( @Override public void close() throws IOException { if (!isClone) { - channel.close(); + IOUtils.close(channel, asynchronousChannel); } } @@ -155,6 +202,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw return new NIOFSIndexInput( getFullSliceDescription(sliceDescription), channel, + asynchronousChannel, off + offset, length, getBufferSize()); @@ -204,6 +252,12 @@ protected void readInternal(ByteBuffer b) throws IOException { } } + @Override + protected Future readInternalAsync(ByteBuffer b) throws IOException { + long pos = getFilePointer() + off; + return asynchronousChannel.read(b, pos); + } + @Override protected void seekInternal(long pos) throws IOException { if (pos > length()) { diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java index 24d8db0b02f6..f8b65281a0e0 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java @@ -58,6 +58,7 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BitUtil; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.packed.PackedInts; import org.junit.Assert; @@ -1512,4 +1513,64 @@ protected void doTestGroupVInt( dir.deleteFile("group-varint"); dir.deleteFile("vint"); } + + public void testPrefetch() throws IOException { + doTestPrefetch(0); + } + + public void testPrefetchOnSlice() throws IOException { + doTestPrefetch(TestUtil.nextInt(random(), 1, 1024)); + } + + private void doTestPrefetch(int startOffset) throws IOException { + try (Directory dir = getDirectory(createTempDir())) { + final int totalLength = startOffset + TestUtil.nextInt(random(), 16384, 65536); + byte[] arr = new byte[totalLength]; + random().nextBytes(arr); + try (IndexOutput out = dir.createOutput("temp.bin", IOContext.DEFAULT)) { + out.writeBytes(arr, arr.length); + } + byte[] temp = new byte[2048]; + + try (IndexInput orig = dir.openInput("temp.bin", IOContext.DEFAULT)) { + IndexInput in; + if (startOffset == 0) { + in = orig.clone(); + } else { + in = orig.slice("slice", startOffset, totalLength - startOffset); + } + for (int i = 0; i < 10_000; ++i) { + final int startPointer = (int) in.getFilePointer(); + assertTrue(startPointer < in.length()); + if (random().nextBoolean()) { + in.prefetch(); + } + assertEquals(startPointer, in.getFilePointer()); + switch (random().nextInt(100)) { + case 0: + assertEquals(arr[startOffset + startPointer], in.readByte()); + break; + case 1: + if (in.length() - startPointer >= Long.BYTES) { + assertEquals( + (long) BitUtil.VH_LE_LONG.get(arr, startOffset + startPointer), in.readLong()); + } + break; + default: + final int readLength = + TestUtil.nextInt( + random(), 1, (int) Math.min(temp.length, in.length() - startPointer)); + in.readBytes(temp, 0, readLength); + assertArrayEquals( + ArrayUtil.copyOfSubArray( + arr, startOffset + startPointer, startOffset + startPointer + readLength), + ArrayUtil.copyOfSubArray(temp, 0, readLength)); + } + if (in.getFilePointer() == in.length() || random().nextBoolean()) { + in.seek(TestUtil.nextInt(random(), 0, (int) in.length() - 1)); + } + } + } + } + } }