Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IndexInput#prefetch. #13337

Merged
merged 16 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lucene/core/src/java/org/apache/lucene/store/IndexInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,15 @@ public String toString() {
};
}
}

/**
* Optional method: Give a hint to this input that some bytes will be read in the near future.
* IndexInput implementations may take advantage of this hint to start fetching pages of data
* immediately from storage.
*
* <p>The default implementation is a no-op.
*
* @param length the number of bytes to prefetch
*/
public void prefetch(long length) throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.GroupVIntUtil;

Expand All @@ -50,6 +51,7 @@ abstract class MemorySegmentIndexInput extends IndexInput implements RandomAcces
final int chunkSizePower;
final Arena arena;
final MemorySegment[] segments;
final Optional<NativeAccess> nativeAccess;
Copy link
Contributor

@uschindler uschindler May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is a singleton, we can make it static and initialize it here. There's no need to pass the optional through constructors and have it in every clone.

We may also make it static final on the provider, but that's unrelated.


int curSegmentIndex = -1;
MemorySegment
Expand All @@ -61,12 +63,15 @@ public static MemorySegmentIndexInput newInstance(
Arena arena,
MemorySegment[] segments,
long length,
int chunkSizePower) {
int chunkSizePower,
Optional<NativeAccess> nativeAccess) {
assert Arrays.stream(segments).map(MemorySegment::scope).allMatch(arena.scope()::equals);
if (segments.length == 1) {
return new SingleSegmentImpl(resourceDescription, arena, segments[0], length, chunkSizePower);
return new SingleSegmentImpl(
resourceDescription, arena, segments[0], length, chunkSizePower, nativeAccess);
} else {
return new MultiSegmentImpl(resourceDescription, arena, segments, 0, length, chunkSizePower);
return new MultiSegmentImpl(
resourceDescription, arena, segments, 0, length, chunkSizePower, nativeAccess);
}
}

Expand All @@ -75,14 +80,16 @@ private MemorySegmentIndexInput(
Arena arena,
MemorySegment[] segments,
long length,
int chunkSizePower) {
int chunkSizePower,
Optional<NativeAccess> nativeAccess) {
super(resourceDescription);
this.arena = arena;
this.segments = segments;
this.length = length;
this.chunkSizePower = chunkSizePower;
this.chunkSizeMask = (1L << chunkSizePower) - 1L;
this.curSegment = segments[0];
this.nativeAccess = nativeAccess;
}

void ensureOpen() {
Expand Down Expand Up @@ -310,6 +317,53 @@ public void seek(long pos) throws IOException {
}
}

@Override
public void prefetch(long length) throws IOException {
ensureOpen();

Objects.checkFromIndexSize(getFilePointer(), length, length());

if (nativeAccess.isEmpty()) {
return;
}
final NativeAccess nativeAccess = this.nativeAccess.get();

// If at the boundary between two chunks, move to the next one.
seek(getFilePointer());
try {
// Compute the intersection of the current segment and the region that should be prefetched.
long offset = curPosition;
if (offset + length > curSegment.byteSize()) {
// Only prefetch bytes that are stored in the current segment. There may be bytes on the
// next segment but this case is rare enough that we don't try to optimize it and keep
// things simple instead.
length = curSegment.byteSize() - curPosition;
}
// Now align offset with the page size, this is required for madvise.
// Compute the offset of the current position in the OS's page.
final long offsetInPage = (curSegment.address() + offset) % nativeAccess.getPageSize();
offset -= offsetInPage;
length += offsetInPage;
if (offset < 0) {
// The start of the page is outside of this segment, ignore.
return;
}

MemorySegment prefetchSlice = curSegment.asSlice(offset, length);
// Tell the OS we'll need this page. nocommit: do we need to restore the original read advice?
// Source code for madvise.c suggests we don't since WILL_NEED only triggers read-ahead
// without updating the state of the virtual mapping?
// https://github.com/torvalds/linux/blob/master/mm/madvise.c
nativeAccess.madviseWillNeed(prefetchSlice);
} catch (
@SuppressWarnings("unused")
IndexOutOfBoundsException e) {
throw new EOFException("Read past EOF: " + this);
} catch (NullPointerException | IllegalStateException e) {
throw alreadyClosed(e);
}
}

@Override
public byte readByte(long pos) throws IOException {
try {
Expand Down Expand Up @@ -491,15 +545,17 @@ MemorySegmentIndexInput buildSlice(String sliceDescription, long offset, long le
null, // clones don't have an Arena, as they can't close)
slices[0].asSlice(offset, length),
length,
chunkSizePower);
chunkSizePower,
nativeAccess);
} else {
return new MultiSegmentImpl(
newResourceDescription,
null, // clones don't have an Arena, as they can't close)
slices,
offset,
length,
chunkSizePower);
chunkSizePower,
nativeAccess);
}
}

Expand Down Expand Up @@ -539,8 +595,15 @@ static final class SingleSegmentImpl extends MemorySegmentIndexInput {
Arena arena,
MemorySegment segment,
long length,
int chunkSizePower) {
super(resourceDescription, arena, new MemorySegment[] {segment}, length, chunkSizePower);
int chunkSizePower,
Optional<NativeAccess> nativeAccess) {
super(
resourceDescription,
arena,
new MemorySegment[] {segment},
length,
chunkSizePower,
nativeAccess);
this.curSegmentIndex = 0;
}

Expand Down Expand Up @@ -626,8 +689,9 @@ static final class MultiSegmentImpl extends MemorySegmentIndexInput {
MemorySegment[] segments,
long offset,
long length,
int chunkSizePower) {
super(resourceDescription, arena, segments, length, chunkSizePower);
int chunkSizePower,
Optional<NativeAccess> nativeAccess) {
super(resourceDescription, arena, segments, length, chunkSizePower, nativeAccess);
this.offset = offset;
try {
seek(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ public IndexInput openInput(Path path, IOContext context, int chunkSizePower, bo
context.readAdvice(),
chunkSizePower,
preload,
fileSize),
fileSize,
nativeAccess),
fileSize,
chunkSizePower);
chunkSizePower,
nativeAccess);
success = true;
return in;
} finally {
Expand All @@ -88,7 +90,8 @@ private final MemorySegment[] map(
ReadAdvice readAdvice,
int chunkSizePower,
boolean preload,
long length)
long length,
Optional<NativeAccess> nativeAccess)
throws IOException {
if ((length >>> chunkSizePower) >= Integer.MAX_VALUE)
throw new IllegalArgumentException("File too big for chunk size: " + resourceDescription);
Expand All @@ -111,10 +114,11 @@ private final MemorySegment[] map(
throw convertMapFailedIOException(ioe, resourceDescription, segSize);
}
// if preload apply it without madvise.
// if chunk size is too small (2 MiB), disable madvise support (incorrect alignment)
// skip madvise if the address of our segment is not page-aligned (small segments due to
// internal FileChannel logic)
if (preload) {
segment.load();
} else if (nativeAccess.isPresent() && chunkSizePower >= 21) {
} else if (nativeAccess.filter(na -> segment.address() % na.getPageSize() == 0).isPresent()) {
nativeAccess.get().madvise(segment, readAdvice);
}
segments[segNr] = segment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ abstract class NativeAccess {
/** Invoke the {@code madvise} call for the given {@link MemorySegment}. */
public abstract void madvise(MemorySegment segment, ReadAdvice readAdvice) throws IOException;

/**
* Invoke the {@code madvise} call for the given {@link MemorySegment} with {@code MADV_WILLNEED}.
*/
public abstract void madviseWillNeed(MemorySegment segment) throws IOException;

/** Returns native page size. */
public abstract int getPageSize();

/**
* Return the NativeAccess instance for this platform. At moment we only support Linux and MacOS
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ final class PosixNativeAccess extends NativeAccess {
public static final int POSIX_MADV_DONTNEED = 4;

private static final MethodHandle MH$posix_madvise;
private static final int PAGE_SIZE;

private static final Optional<NativeAccess> INSTANCE;

Expand All @@ -60,10 +61,14 @@ static Optional<NativeAccess> getInstance() {
}

static {
final Linker linker = Linker.nativeLinker();
final SymbolLookup stdlib = linker.defaultLookup();
MethodHandle adviseHandle = null;
int pagesize = -1;
PosixNativeAccess instance = null;
try {
adviseHandle = lookupMadvise();
adviseHandle = lookupMadvise(linker, stdlib);
pagesize = (int) lookupGetPageSize(linker, stdlib).invokeExact();
instance = new PosixNativeAccess();
} catch (UnsupportedOperationException uoe) {
LOG.warning(uoe.getMessage());
Expand All @@ -77,14 +82,17 @@ static Optional<NativeAccess> getInstance() {
+ "pass the following on command line: --enable-native-access=%s",
Optional.ofNullable(PosixNativeAccess.class.getModule().getName())
.orElse("ALL-UNNAMED")));
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable e) {
throw new AssertionError(e);
}
MH$posix_madvise = adviseHandle;
PAGE_SIZE = pagesize;
INSTANCE = Optional.ofNullable(instance);
}

private static MethodHandle lookupMadvise() {
final Linker linker = Linker.nativeLinker();
final SymbolLookup stdlib = linker.defaultLookup();
private static MethodHandle lookupMadvise(Linker linker, SymbolLookup stdlib) {
return findFunction(
linker,
stdlib,
Expand All @@ -96,6 +104,10 @@ private static MethodHandle lookupMadvise() {
ValueLayout.JAVA_INT));
}

private static MethodHandle lookupGetPageSize(Linker linker, SymbolLookup stdlib) {
return findFunction(linker, stdlib, "getpagesize", FunctionDescriptor.of(ValueLayout.JAVA_INT));
}

private static MethodHandle findFunction(
Linker linker, SymbolLookup lookup, String name, FunctionDescriptor desc) {
final MemorySegment symbol =
Expand All @@ -110,17 +122,26 @@ private static MethodHandle findFunction(

@Override
public void madvise(MemorySegment segment, ReadAdvice readAdvice) throws IOException {
// Note: madvise is bypassed if the segment should be preloaded via MemorySegment#load.
if (segment.byteSize() == 0L) {
return; // empty segments should be excluded, because they may have no address at all
}
final Integer advice = mapReadAdvice(readAdvice);
if (advice == null) {
return; // do nothing
}
madvise(segment, advice);
}

@Override
public void madviseWillNeed(MemorySegment segment) throws IOException {
madvise(segment, POSIX_MADV_WILLNEED);
}

private void madvise(MemorySegment segment, int advice) throws IOException {
// Note: madvise is bypassed if the segment should be preloaded via MemorySegment#load.
if (segment.byteSize() == 0L) {
return; // empty segments should be excluded, because they may have no address at all
}
final int ret;
try {
ret = (int) MH$posix_madvise.invokeExact(segment, segment.byteSize(), advice.intValue());
ret = (int) MH$posix_madvise.invokeExact(segment, segment.byteSize(), advice);
} catch (Throwable th) {
throw new AssertionError(th);
}
Expand All @@ -143,4 +164,9 @@ private Integer mapReadAdvice(ReadAdvice readAdvice) {
case RANDOM_PRELOAD -> null;
};
}

@Override
public int getPageSize() {
return PAGE_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.packed.PackedInts;
import org.junit.Assert;
Expand Down Expand Up @@ -1512,4 +1513,65 @@ protected void doTestGroupVInt(
dir.deleteFile("group-varint");
dir.deleteFile("vint");
}

public void testPrefetch() throws IOException {
doTestPrefetch(0);
}

public void testPrefetchOnSlice() throws IOException {
doTestPrefetch(TestUtil.nextInt(random(), 1, 1024));
}

private void doTestPrefetch(int startOffset) throws IOException {
try (Directory dir = getDirectory(createTempDir())) {
final int totalLength = startOffset + TestUtil.nextInt(random(), 16384, 65536);
byte[] arr = new byte[totalLength];
random().nextBytes(arr);
try (IndexOutput out = dir.createOutput("temp.bin", IOContext.DEFAULT)) {
out.writeBytes(arr, arr.length);
}
byte[] temp = new byte[2048];

try (IndexInput orig = dir.openInput("temp.bin", IOContext.DEFAULT)) {
IndexInput in;
if (startOffset == 0) {
in = orig.clone();
} else {
in = orig.slice("slice", startOffset, totalLength - startOffset);
}
for (int i = 0; i < 10_000; ++i) {
final int startPointer = (int) in.getFilePointer();
assertTrue(startPointer < in.length());
if (random().nextBoolean()) {
final long prefetchLength = TestUtil.nextLong(random(), 1, in.length() - startPointer);
in.prefetch(prefetchLength);
}
assertEquals(startPointer, in.getFilePointer());
switch (random().nextInt(100)) {
case 0:
assertEquals(arr[startOffset + startPointer], in.readByte());
break;
case 1:
if (in.length() - startPointer >= Long.BYTES) {
assertEquals(
(long) BitUtil.VH_LE_LONG.get(arr, startOffset + startPointer), in.readLong());
}
break;
default:
final int readLength =
TestUtil.nextInt(
random(), 1, (int) Math.min(temp.length, in.length() - startPointer));
in.readBytes(temp, 0, readLength);
assertArrayEquals(
ArrayUtil.copyOfSubArray(
arr, startOffset + startPointer, startOffset + startPointer + readLength),
ArrayUtil.copyOfSubArray(temp, 0, readLength));
}
if (in.getFilePointer() == in.length() || random().nextBoolean()) {
in.seek(TestUtil.nextInt(random(), 0, (int) in.length() - 1));
}
}
}
}
}
}
Loading