Skip to content

Commit

Permalink
Document Frozen and Cold Indexinputs better and refactor slightly (#9…
Browse files Browse the repository at this point in the history
…8312)

Move things into a couple more methods and add some documentation.
Also, some minor cleanups around logger usage and inlining some
single-use, never-skipped methods to make things easier to follow.
  • Loading branch information
original-brownbear authored Aug 9, 2023
1 parent e87483f commit 0f6fd0f
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public static int toIntBytes(long l) {
return ByteSizeUnit.BYTES.toIntBytes(l);
}

public static void throwEOF(long channelPos, long len, Object file) throws EOFException {
throw new EOFException(format("unexpected EOF reading [%d-%d] from %s", channelPos, channelPos + len, file));
public static void throwEOF(long channelPos, long len) throws EOFException {
throw new EOFException(format("unexpected EOF reading [%d-%d]", channelPos, channelPos + len));
}

public static void ensureSeek(long pos, IndexInput input) throws IOException {
Expand Down Expand Up @@ -74,12 +74,11 @@ public static void ensureSlice(String sliceName, long sliceOffset, long sliceLen
*
* Most of its arguments are there simply to make the message of the {@link EOFException} more informative.
*/
public static int readSafe(InputStream inputStream, ByteBuffer copyBuffer, long rangeStart, long remaining, Object cacheFileReference)
throws IOException {
public static int readSafe(InputStream inputStream, ByteBuffer copyBuffer, long rangeStart, long remaining) throws IOException {
final int len = (remaining < copyBuffer.remaining()) ? toIntBytes(remaining) : copyBuffer.remaining();
final int bytesRead = Streams.read(inputStream, copyBuffer, len);
if (bytesRead <= 0) {
throwEOF(rangeStart, remaining, cacheFileReference);
throwEOF(rangeStart, remaining);
}
return bytesRead;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,32 @@ public SparseFileTracker(String description, long length, SortedSet<ByteRange> r
if (length < 0) {
throw new IllegalArgumentException("Length [" + length + "] must be equal to or greater than 0 for [" + description + "]");
}
this.initialLength = ranges.isEmpty() ? 0 : addInitialRanges(length, ranges);
}

private long addInitialRanges(long length, SortedSet<ByteRange> ranges) {
long initialLength = 0;
if (ranges.isEmpty() == false) {
synchronized (mutex) {
Range previous = null;
for (ByteRange next : ranges) {
if (next.length() == 0L) {
throw new IllegalArgumentException("Range " + next + " cannot be empty");
}
if (length < next.end()) {
throw new IllegalArgumentException("Range " + next + " is exceeding maximum length [" + length + ']');
}
final Range range = new Range(next);
if (previous != null && range.start <= previous.end) {
throw new IllegalArgumentException("Range " + range + " is overlapping a previous range " + previous);
}
final boolean added = this.ranges.add(range);
assert added : range + " already exist in " + this.ranges;
previous = range;
initialLength += range.end - range.start;
synchronized (mutex) {
Range previous = null;
for (ByteRange next : ranges) {
if (next.isEmpty()) {
throw new IllegalArgumentException("Range " + next + " cannot be empty");
}
assert invariant();
if (length < next.end()) {
throw new IllegalArgumentException("Range " + next + " is exceeding maximum length [" + length + ']');
}
final Range range = new Range(next);
if (previous != null && range.start <= previous.end) {
throw new IllegalArgumentException("Range " + range + " is overlapping a previous range " + previous);
}
final boolean added = this.ranges.add(range);
assert added : range + " already exist in " + this.ranges;
previous = range;
initialLength += range.end - range.start;
}
assert invariant();
}
this.initialLength = initialLength;
return initialLength;
}

public long getLength() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ public static Path findCacheSnapshotCacheFilePath(NodeEnvironment environment, l
* @param length number of bytes to copy
* @param progressUpdater callback to invoke with the number of copied bytes as they are copied
* @param buf bytebuffer to use for writing
* @param cacheFile object that describes the cached file, only used in logging and exception throwing as context information
* @throws IOException on failure
*/
public static void copyToCacheFileAligned(
Expand All @@ -163,13 +162,12 @@ public static void copyToCacheFileAligned(
long relativePos,
long length,
LongConsumer progressUpdater,
ByteBuffer buf,
final Object cacheFile
ByteBuffer buf
) throws IOException {
long bytesCopied = 0L;
long remaining = length;
while (remaining > 0L) {
final int bytesRead = BlobCacheUtils.readSafe(input, buf, relativePos, remaining, cacheFile);
final int bytesRead = BlobCacheUtils.readSafe(input, buf, relativePos, remaining);
if (buf.hasRemaining()) {
break;
}
Expand Down Expand Up @@ -207,7 +205,6 @@ private static int positionalWrite(IO fc, long start, ByteBuffer byteBuffer) thr
* @param relativePos position in {@code byteBufferReference}
* @param length number of bytes to read
* @param byteBufferReference buffer reference
* @param cacheFile cache file reference used for exception messages only
* @return number of bytes read
* @throws IOException on failure
*/
Expand All @@ -216,8 +213,7 @@ public static int readCacheFile(
long channelPos,
long relativePos,
long length,
final ByteBufferReference byteBufferReference,
Object cacheFile
final ByteBufferReference byteBufferReference
) throws IOException {
if (length == 0L) {
return 0;
Expand All @@ -228,7 +224,7 @@ public static int readCacheFile(
try {
bytesRead = fc.read(dup, channelPos);
if (bytesRead == -1) {
BlobCacheUtils.throwEOF(channelPos, dup.remaining(), cacheFile);
BlobCacheUtils.throwEOF(channelPos, dup.remaining());
}
} finally {
byteBufferReference.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ public class BlobCacheUtilsTests extends ESTestCase {
public void testReadSafeThrows() {
final ByteBuffer buffer = ByteBuffer.allocate(randomIntBetween(1, 1025));
final int remaining = randomIntBetween(1, 1025);
expectThrows(EOFException.class, () -> BlobCacheUtils.readSafe(BytesArray.EMPTY.streamInput(), buffer, 0, remaining, null));
expectThrows(EOFException.class, () -> BlobCacheUtils.readSafe(BytesArray.EMPTY.streamInput(), buffer, 0, remaining));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ private void prewarmCache(ActionListener<Void> listener, Supplier<Boolean> cance
try {
final IndexInput input = openInput(file.physicalName(), CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT);
assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass();
CachedBlobContainerIndexInput cachedIndexInput = (CachedBlobContainerIndexInput) input;

final AtomicBoolean alreadyCached = new AtomicBoolean();
try (var fileListener = new RefCountingListener(ActionListener.runBefore(completionListener.acquire().map(v -> {
Expand All @@ -511,21 +512,16 @@ private void prewarmCache(ActionListener<Void> listener, Supplier<Boolean> cance
recoveryState.getIndex().addRecoveredFromSnapshotBytesToFile(file.physicalName(), file.length());
}
return v;
}), () -> IOUtils.closeWhileHandlingException(input)))) {

if (input instanceof CachedBlobContainerIndexInput cachedIndexInput) {
if (cachedIndexInput.getPersistentCacheInitialLength() == file.length()) {
alreadyCached.set(true);
logger.trace(
() -> format(
"%s file [%s] is already available in cache (%d bytes)",
shardId,
file.physicalName(),
file.length()
)
);
continue;
}
}), () -> IOUtils.closeWhileHandlingException(cachedIndexInput)))) {
if (cachedIndexInput.getPersistentCacheInitialLength() == file.length()) {
alreadyCached.set(true);
logger.trace(
"{} file [{}] is already available in cache ({} bytes)",
shardId,
file.physicalName(),
file.length()
);
continue;
}

for (int p = 0; p < file.numberOfParts(); p++) {
Expand All @@ -534,18 +530,16 @@ private void prewarmCache(ActionListener<Void> listener, Supplier<Boolean> cance
try (releasable) {
var fileName = file.physicalName();
final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong();
var prefetchedPartBytes = ((CachedBlobContainerIndexInput) input).prefetchPart(part, cancelPreWarming);
if (prefetchedPartBytes > -1L) {
var prefetchedPartBytes = cachedIndexInput.prefetchPart(part, cancelPreWarming);
if (prefetchedPartBytes > -1L && logger.isTraceEnabled()) {
logger.trace(
() -> format(
"%s part [%s/%s] of [%s] warmed in [%s] ms (%d bytes)",
shardId,
part + 1,
file.numberOfParts(),
fileName,
timeValueNanos(statsCurrentTimeNanosSupplier.getAsLong() - startTimeInNanos).millis(),
prefetchedPartBytes
)
"{} part [{}/{}] of [{}] warmed in [{}] ms ({} bytes)",
shardId,
part + 1,
file.numberOfParts(),
fileName,
timeValueNanos(statsCurrentTimeNanosSupplier.getAsLong() - startTimeInNanos).millis(),
prefetchedPartBytes
);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -118,8 +117,7 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
assert rangeToRead.isSubRangeOf(rangeToWrite) : rangeToRead + " vs " + rangeToWrite;
assert rangeToRead.length() == b.remaining() : b.remaining() + " vs " + rangeToRead;

final Future<Integer> populateCacheFuture = populateAndRead(b, position, length, cacheFile, rangeToWrite);
final int bytesRead = populateCacheFuture.get();
final int bytesRead = populateAndRead(b, position, cacheFile, rangeToWrite).get();
assert bytesRead == length : bytesRead + " vs " + length;
}

Expand Down Expand Up @@ -191,7 +189,7 @@ public long prefetchPart(final int part, Supplier<Boolean> isCancelled) throws I
if (isCancelled.get()) {
return -1L;
}
final int bytesRead = readSafe(input, copyBuffer, range.start(), remainingBytes, cacheFileReference);
final int bytesRead = readSafe(input, copyBuffer, range.start(), remainingBytes);
// The range to prewarm in cache
final long readStart = range.start() + totalBytesRead;
final ByteRange rangeToWrite = ByteRange.of(readStart, readStart + bytesRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void readWithoutBlobCacheSlow(ByteBuffer b, long position, int length) t
length,
cacheFile
);
final int read = SharedBytes.readCacheFile(channel, pos, relativePos, len, byteBufferReference, cacheFile);
final int read = SharedBytes.readCacheFile(channel, pos, relativePos, len, byteBufferReference);
stats.addCachedBytesRead(read);
return read;
}, (channel, channelPos, relativePos, len, progressUpdater) -> {
Expand All @@ -156,8 +156,7 @@ private void readWithoutBlobCacheSlow(ByteBuffer b, long position, int length) t
relativePos,
len,
progressUpdater,
writeBuffer.get().clear(),
cacheFile
writeBuffer.get().clear()
);
final long endTimeNanos = stats.currentTimeNanos();
stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos);
Expand Down
Loading

0 comments on commit 0f6fd0f

Please sign in to comment.