From 0f6fd0fe03aa158d97c4994b0d012dd2d7693b6e Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 9 Aug 2023 14:21:54 +0200 Subject: [PATCH] Document Frozen and Cold Indexinputs better and refactor slightly (#98312) Move things into a couple more methods and add some documentation. Also, some minor cleanups around logger usage and inlining some single-use, never-skipped methods to make things easier to follow. --- .../blobcache/BlobCacheUtils.java | 9 +- .../blobcache/common/SparseFileTracker.java | 42 +-- .../blobcache/shared/SharedBytes.java | 12 +- .../blobcache/BlobCacheUtilsTests.java | 2 +- .../store/SearchableSnapshotDirectory.java | 46 ++- .../input/CachedBlobContainerIndexInput.java | 6 +- .../store/input/FrozenIndexInput.java | 5 +- .../input/MetadataCachingIndexInput.java | 299 ++++++++++-------- 8 files changed, 226 insertions(+), 195 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java index 7d3b1eef5afd4..c4dff2cb4457b 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheUtils.java @@ -31,8 +31,8 @@ public static int toIntBytes(long l) { return ByteSizeUnit.BYTES.toIntBytes(l); } - public static void throwEOF(long channelPos, long len, Object file) throws EOFException { - throw new EOFException(format("unexpected EOF reading [%d-%d] from %s", channelPos, channelPos + len, file)); + public static void throwEOF(long channelPos, long len) throws EOFException { + throw new EOFException(format("unexpected EOF reading [%d-%d]", channelPos, channelPos + len)); } public static void ensureSeek(long pos, IndexInput input) throws IOException { @@ -74,12 +74,11 @@ public static void ensureSlice(String sliceName, long sliceOffset, long sliceLen * * Most of its arguments are there simply to make the message of the {@link EOFException} more informative. */ - public static int readSafe(InputStream inputStream, ByteBuffer copyBuffer, long rangeStart, long remaining, Object cacheFileReference) - throws IOException { + public static int readSafe(InputStream inputStream, ByteBuffer copyBuffer, long rangeStart, long remaining) throws IOException { final int len = (remaining < copyBuffer.remaining()) ? toIntBytes(remaining) : copyBuffer.remaining(); final int bytesRead = Streams.read(inputStream, copyBuffer, len); if (bytesRead <= 0) { - throwEOF(rangeStart, remaining, cacheFileReference); + throwEOF(rangeStart, remaining); } return bytesRead; } diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java index 45352ed716428..345b498262129 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java @@ -74,30 +74,32 @@ public SparseFileTracker(String description, long length, SortedSet r if (length < 0) { throw new IllegalArgumentException("Length [" + length + "] must be equal to or greater than 0 for [" + description + "]"); } + this.initialLength = ranges.isEmpty() ? 0 : addInitialRanges(length, ranges); + } + + private long addInitialRanges(long length, SortedSet ranges) { long initialLength = 0; - if (ranges.isEmpty() == false) { - synchronized (mutex) { - Range previous = null; - for (ByteRange next : ranges) { - if (next.length() == 0L) { - throw new IllegalArgumentException("Range " + next + " cannot be empty"); - } - if (length < next.end()) { - throw new IllegalArgumentException("Range " + next + " is exceeding maximum length [" + length + ']'); - } - final Range range = new Range(next); - if (previous != null && range.start <= previous.end) { - throw new IllegalArgumentException("Range " + range + " is overlapping a previous range " + previous); - } - final boolean added = this.ranges.add(range); - assert added : range + " already exist in " + this.ranges; - previous = range; - initialLength += range.end - range.start; + synchronized (mutex) { + Range previous = null; + for (ByteRange next : ranges) { + if (next.isEmpty()) { + throw new IllegalArgumentException("Range " + next + " cannot be empty"); } - assert invariant(); + if (length < next.end()) { + throw new IllegalArgumentException("Range " + next + " is exceeding maximum length [" + length + ']'); + } + final Range range = new Range(next); + if (previous != null && range.start <= previous.end) { + throw new IllegalArgumentException("Range " + range + " is overlapping a previous range " + previous); + } + final boolean added = this.ranges.add(range); + assert added : range + " already exist in " + this.ranges; + previous = range; + initialLength += range.end - range.start; } + assert invariant(); } - this.initialLength = initialLength; + return initialLength; } public long getLength() { diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java index df329f4cb81ea..4e07e26299d0e 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java @@ -153,7 +153,6 @@ public static Path findCacheSnapshotCacheFilePath(NodeEnvironment environment, l * @param length number of bytes to copy * @param progressUpdater callback to invoke with the number of copied bytes as they are copied * @param buf bytebuffer to use for writing - * @param cacheFile object that describes the cached file, only used in logging and exception throwing as context information * @throws IOException on failure */ public static void copyToCacheFileAligned( @@ -163,13 +162,12 @@ public static void copyToCacheFileAligned( long relativePos, long length, LongConsumer progressUpdater, - ByteBuffer buf, - final Object cacheFile + ByteBuffer buf ) throws IOException { long bytesCopied = 0L; long remaining = length; while (remaining > 0L) { - final int bytesRead = BlobCacheUtils.readSafe(input, buf, relativePos, remaining, cacheFile); + final int bytesRead = BlobCacheUtils.readSafe(input, buf, relativePos, remaining); if (buf.hasRemaining()) { break; } @@ -207,7 +205,6 @@ private static int positionalWrite(IO fc, long start, ByteBuffer byteBuffer) thr * @param relativePos position in {@code byteBufferReference} * @param length number of bytes to read * @param byteBufferReference buffer reference - * @param cacheFile cache file reference used for exception messages only * @return number of bytes read * @throws IOException on failure */ @@ -216,8 +213,7 @@ public static int readCacheFile( long channelPos, long relativePos, long length, - final ByteBufferReference byteBufferReference, - Object cacheFile + final ByteBufferReference byteBufferReference ) throws IOException { if (length == 0L) { return 0; @@ -228,7 +224,7 @@ public static int readCacheFile( try { bytesRead = fc.read(dup, channelPos); if (bytesRead == -1) { - BlobCacheUtils.throwEOF(channelPos, dup.remaining(), cacheFile); + BlobCacheUtils.throwEOF(channelPos, dup.remaining()); } } finally { byteBufferReference.release(); diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/BlobCacheUtilsTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/BlobCacheUtilsTests.java index 7625a213b4591..2f78797e556ac 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/BlobCacheUtilsTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/BlobCacheUtilsTests.java @@ -17,6 +17,6 @@ public class BlobCacheUtilsTests extends ESTestCase { public void testReadSafeThrows() { final ByteBuffer buffer = ByteBuffer.allocate(randomIntBetween(1, 1025)); final int remaining = randomIntBetween(1, 1025); - expectThrows(EOFException.class, () -> BlobCacheUtils.readSafe(BytesArray.EMPTY.streamInput(), buffer, 0, remaining, null)); + expectThrows(EOFException.class, () -> BlobCacheUtils.readSafe(BytesArray.EMPTY.streamInput(), buffer, 0, remaining)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java index 61089ca9a1028..dc1b28aa6098d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java @@ -502,6 +502,7 @@ private void prewarmCache(ActionListener listener, Supplier cance try { final IndexInput input = openInput(file.physicalName(), CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT); assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass(); + CachedBlobContainerIndexInput cachedIndexInput = (CachedBlobContainerIndexInput) input; final AtomicBoolean alreadyCached = new AtomicBoolean(); try (var fileListener = new RefCountingListener(ActionListener.runBefore(completionListener.acquire().map(v -> { @@ -511,21 +512,16 @@ private void prewarmCache(ActionListener listener, Supplier cance recoveryState.getIndex().addRecoveredFromSnapshotBytesToFile(file.physicalName(), file.length()); } return v; - }), () -> IOUtils.closeWhileHandlingException(input)))) { - - if (input instanceof CachedBlobContainerIndexInput cachedIndexInput) { - if (cachedIndexInput.getPersistentCacheInitialLength() == file.length()) { - alreadyCached.set(true); - logger.trace( - () -> format( - "%s file [%s] is already available in cache (%d bytes)", - shardId, - file.physicalName(), - file.length() - ) - ); - continue; - } + }), () -> IOUtils.closeWhileHandlingException(cachedIndexInput)))) { + if (cachedIndexInput.getPersistentCacheInitialLength() == file.length()) { + alreadyCached.set(true); + logger.trace( + "{} file [{}] is already available in cache ({} bytes)", + shardId, + file.physicalName(), + file.length() + ); + continue; } for (int p = 0; p < file.numberOfParts(); p++) { @@ -534,18 +530,16 @@ private void prewarmCache(ActionListener listener, Supplier cance try (releasable) { var fileName = file.physicalName(); final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong(); - var prefetchedPartBytes = ((CachedBlobContainerIndexInput) input).prefetchPart(part, cancelPreWarming); - if (prefetchedPartBytes > -1L) { + var prefetchedPartBytes = cachedIndexInput.prefetchPart(part, cancelPreWarming); + if (prefetchedPartBytes > -1L && logger.isTraceEnabled()) { logger.trace( - () -> format( - "%s part [%s/%s] of [%s] warmed in [%s] ms (%d bytes)", - shardId, - part + 1, - file.numberOfParts(), - fileName, - timeValueNanos(statsCurrentTimeNanosSupplier.getAsLong() - startTimeInNanos).millis(), - prefetchedPartBytes - ) + "{} part [{}/{}] of [{}] warmed in [{}] ms ({} bytes)", + shardId, + part + 1, + file.numberOfParts(), + fileName, + timeValueNanos(statsCurrentTimeNanosSupplier.getAsLong() - startTimeInNanos).millis(), + prefetchedPartBytes ); } return null; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/CachedBlobContainerIndexInput.java index 55926225bd46a..81cf205c13dd2 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/CachedBlobContainerIndexInput.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; import java.util.function.Supplier; @@ -118,8 +117,7 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception { assert rangeToRead.isSubRangeOf(rangeToWrite) : rangeToRead + " vs " + rangeToWrite; assert rangeToRead.length() == b.remaining() : b.remaining() + " vs " + rangeToRead; - final Future populateCacheFuture = populateAndRead(b, position, length, cacheFile, rangeToWrite); - final int bytesRead = populateCacheFuture.get(); + final int bytesRead = populateAndRead(b, position, cacheFile, rangeToWrite).get(); assert bytesRead == length : bytesRead + " vs " + length; } @@ -191,7 +189,7 @@ public long prefetchPart(final int part, Supplier isCancelled) throws I if (isCancelled.get()) { return -1L; } - final int bytesRead = readSafe(input, copyBuffer, range.start(), remainingBytes, cacheFileReference); + final int bytesRead = readSafe(input, copyBuffer, range.start(), remainingBytes); // The range to prewarm in cache final long readStart = range.start() + totalBytesRead; final ByteRange rangeToWrite = ByteRange.of(readStart, readStart + bytesRead); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java index d8dfd45b61522..ea9ed833dfe64 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java @@ -134,7 +134,7 @@ private void readWithoutBlobCacheSlow(ByteBuffer b, long position, int length) t length, cacheFile ); - final int read = SharedBytes.readCacheFile(channel, pos, relativePos, len, byteBufferReference, cacheFile); + final int read = SharedBytes.readCacheFile(channel, pos, relativePos, len, byteBufferReference); stats.addCachedBytesRead(read); return read; }, (channel, channelPos, relativePos, len, progressUpdater) -> { @@ -156,8 +156,7 @@ private void readWithoutBlobCacheSlow(ByteBuffer b, long position, int length) t relativePos, len, progressUpdater, - writeBuffer.get().clear(), - cacheFile + writeBuffer.get().clear() ); final long endTimeNanos = stats.currentTimeNanos(); stats.addCachedBytesWritten(len, endTimeNanos - startTimeNanos); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java index 1a9dd5bf651a7..061930ebaced1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java @@ -27,6 +27,7 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService; @@ -44,7 +45,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.LongConsumer; import static org.elasticsearch.blobcache.BlobCacheUtils.throwEOF; import static org.elasticsearch.blobcache.BlobCacheUtils.toIntBytes; @@ -231,7 +231,6 @@ private ByteRange rangeToReadFromBlobCache(long position, int readLength) { private void readWithBlobCache(ByteBuffer b, ByteRange blobCacheByteRange) throws Exception { final long position = getAbsolutePosition(); final int length = b.remaining(); - final CacheFile cacheFile = cacheFileReference.get(); // Can we serve the read directly from disk? If so, do so and don't worry about anything else. @@ -240,103 +239,167 @@ private void readWithBlobCache(ByteBuffer b, ByteRange blobCacheByteRange) throw assert read == length : read + " vs " + length; return read; }); - if (waitingForRead != null) { + // yes, serving directly from disk because we had the bytes on disk already or had an ongoing download running for them final Integer read = waitingForRead.get(); assert read == length; return; } + // we did not find the bytes on disk, try the cache index before falling back to the blob store final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); - assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; - assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); - if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { - // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested - // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of - // {start, end} where positions are relative to the whole file. - - // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. - // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. - final Future populateCacheFuture = populateAndRead(b, position, length, cacheFile, blobCacheByteRange); - - fillIndexCache(cacheFile, blobCacheByteRange); - if (compoundFileOffset > 0L - && blobCacheByteRange.equals(headerBlobCacheByteRange) - && footerBlobCacheByteRange.isEmpty() == false) { - fillIndexCache(cacheFile, footerBlobCacheByteRange); - } - - final int bytesRead = populateCacheFuture.get(); - assert bytesRead == length : bytesRead + " vs " + length; + fillBlobCacheIndex(b, blobCacheByteRange, position, cacheFile); } else { - final int sliceOffset = toIntBytes(position - cachedBlob.from()); - assert sliceOffset + length <= cachedBlob.to() - : "reading " + length + " bytes from " + sliceOffset + " exceed cached blob max position " + cachedBlob.to(); - - logger.trace("reading [{}] bytes of file [{}] at position [{}] using cache index", length, fileInfo.physicalName(), position); - final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(sliceOffset, length).iterator(); - BytesRef bytesRef; - int copiedBytes = 0; - while ((bytesRef = cachedBytesIterator.next()) != null) { - b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); - copiedBytes += bytesRef.length; - } - assert copiedBytes == length : "copied " + copiedBytes + " but expected " + length; - stats.addIndexCacheBytesRead(cachedBlob.length()); + readAndCopyFromBlobCacheIndex(b, position, cacheFile, cachedBlob); + } + } - try { - final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); - cacheFile.populateAndRead( - cachedRange, - cachedRange, - channel -> cachedBlob.length(), - (channel, from, to, progressUpdater) -> { - final long startTimeNanos = stats.currentTimeNanos(); - final BytesRefIterator iterator = cachedBlob.bytes() - .slice(toIntBytes(from - cachedBlob.from()), toIntBytes(to - from)) - .iterator(); - long writePosition = from; - BytesRef current; - while ((current = iterator.next()) != null) { - final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length); - while (byteBuffer.remaining() > 0) { - writePosition += positionalWrite(channel, writePosition, byteBuffer); - progressUpdater.accept(writePosition); - } - } - assert writePosition == to : writePosition + " vs " + to; - final long endTimeNanos = stats.currentTimeNanos(); - stats.addCachedBytesWritten(to - from, endTimeNanos - startTimeNanos); - logger.trace("copied bytes [{}-{}] of file [{}] from cache index to disk", from, to, fileInfo); - }, - directory.cacheFetchAsyncExecutor() - ); - } catch (Exception e) { - logger.debug( - () -> format( - "failed to store bytes [%s-%s] of file [%s] obtained from index cache", - cachedBlob.from(), - cachedBlob.to(), - fileInfo - ), - e - ); - // oh well, no big deal, at least we can return them to the caller. - } + /** + * Fills {@code b} starting at {@code position} the same way {@link #readAndCopyFromBlobCacheIndex} does but reads directly from the + * blob store. + * Caches the requested {@code blobCacheByteRange} containing {@code position} from the blob-store and stores it in the internal cache + * index as well as writes it to local disk into a {@link CacheFile}. This is used for ranges that should be stored in the internal + * cache index but haven't been found during a read attempt. + * + * @param b buffer to fill with the request byte range + * @param blobCacheByteRange byte range to cache in the internal index + * @param position position in the file to start reading from + * @param cacheFile cache file for this operation + * @throws Exception on failure + */ + private void fillBlobCacheIndex(ByteBuffer b, ByteRange blobCacheByteRange, long position, CacheFile cacheFile) throws Exception { + // We would have liked to find a cached entry, but we did not find anything: the cache on the disk will be requested, + // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of + // {start, end} where positions are relative to the whole file. + final int length = b.remaining(); + // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. + // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. + final Future populateCacheFuture = populateAndRead(b, position, cacheFile, blobCacheByteRange); + + fillIndexCache(cacheFile, blobCacheByteRange); + if (compoundFileOffset > 0L && blobCacheByteRange.equals(headerBlobCacheByteRange) && footerBlobCacheByteRange.isEmpty() == false) { + fillIndexCache(cacheFile, footerBlobCacheByteRange); + } + + final int bytesRead = populateCacheFuture.get(); + assert bytesRead == length : bytesRead + " vs " + length; + } + + /** + * Copies a byte range found in the internal cache index to a local {@link CacheFile} and reads the section starting from + * {@code position} into {@code b}, reading as many bytes as are remaining in {code b}. This is used whenever a range that should be + * stored in a local cache file wasn't found in the cache file but could be read from the cache index without having to reach out to + * the blob store itself. + * + * @param b buffer to fill with the request byte range + * @param position position in the file to start reading from + * @param cacheFile cache file for this operation + * @param cachedBlob cached blob found in the internal cache index + * @throws IOException on failure + */ + private void readAndCopyFromBlobCacheIndex(ByteBuffer b, long position, CacheFile cacheFile, CachedBlob cachedBlob) throws IOException { + final int length = b.remaining(); + final int sliceOffset = toIntBytes(position - cachedBlob.from()); + assert sliceOffset + length <= cachedBlob.to() + : "reading " + length + " bytes from " + sliceOffset + " exceed cached blob max position " + cachedBlob.to(); + + logger.trace("reading [{}] bytes of file [{}] at position [{}] using cache index", length, fileInfo.physicalName(), position); + final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(sliceOffset, length).iterator(); + BytesRef bytesRef; + int copiedBytes = 0; + while ((bytesRef = cachedBytesIterator.next()) != null) { + b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + copiedBytes += bytesRef.length; + } + assert copiedBytes == length : "copied " + copiedBytes + " but expected " + length; + stats.addIndexCacheBytesRead(cachedBlob.length()); + + copyToCacheFile(cacheFile, cachedBlob); + } + + /** + * Copy a {@code cachedBlob} from internal cache index to the given {@code cacheFile}. + * @param cacheFile file to copy to + * @param cachedBlob cache blob to copy from + */ + private void copyToCacheFile(CacheFile cacheFile, CachedBlob cachedBlob) { + try { + final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); + cacheFile.populateAndRead(cachedRange, cachedRange, channel -> cachedBlob.length(), (channel, from, to, progressUpdater) -> { + final long startTimeNanos = stats.currentTimeNanos(); + final BytesRefIterator iterator = cachedBlob.bytes() + .slice(toIntBytes(from - cachedBlob.from()), toIntBytes(to - from)) + .iterator(); + long writePosition = from; + BytesRef current; + while ((current = iterator.next()) != null) { + final ByteBuffer byteBuffer = ByteBuffer.wrap(current.bytes, current.offset, current.length); + while (byteBuffer.remaining() > 0) { + writePosition += positionalWrite(channel, writePosition, byteBuffer); + progressUpdater.accept(writePosition); + } + } + assert writePosition == to : writePosition + " vs " + to; + final long endTimeNanos = stats.currentTimeNanos(); + stats.addCachedBytesWritten(to - from, endTimeNanos - startTimeNanos); + logger.trace("copied bytes [{}-{}] of file [{}] from cache index to disk", from, to, fileInfo); + }, directory.cacheFetchAsyncExecutor()); + } catch (Exception e) { + // ignore exceptions during copying, we already read the bytes on another thread so failing to copy to local disk does not + // break anything functionally + logger.debug( + () -> format( + "failed to store bytes [%s-%s] of file [%s] obtained from index cache", + cachedBlob.from(), + cachedBlob.to(), + fileInfo + ), + e + ); } } - protected Future populateAndRead(ByteBuffer b, long position, int length, CacheFile cacheFile, ByteRange rangeToWrite) { - final ByteRange rangeToRead = ByteRange.of(position, position + length); + /** + * Read from given {@code cacheFile} into {@code b}, starting at the given {@code position} and reading as many bytes as are remaining + * {@code b} while storing {@code rangeToWrite} in the cache file. + * + * @param b buffer to read into + * @param position position in the file to start reading from + * @param cacheFile file to read from + * @param rangeToWrite range to read from the blob store and store in the cache file + * @return future that resolves to the number of bytes read + */ + protected Future populateAndRead(ByteBuffer b, long position, CacheFile cacheFile, ByteRange rangeToWrite) { + final ByteRange rangeToRead = ByteRange.of(position, position + b.remaining()); assert rangeToRead.isSubRangeOf(rangeToWrite) : rangeToRead + " vs " + rangeToWrite; - assert rangeToRead.length() == b.remaining() : b.remaining() + " vs " + rangeToRead; return cacheFile.populateAndRead( rangeToWrite, rangeToRead, channel -> readCacheFile(channel, position, b), - this::writeCacheFile, + (fc, start, end, progressUpdater) -> { + assert assertFileChannelOpen(fc); + assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + final ByteBuffer copyBuffer = writeBuffer.get().clear(); + logger.trace("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference); + + long bytesCopied = 0L; + long remaining = end - start; + final long startTimeNanos = stats.currentTimeNanos(); + try (InputStream input = openInputStreamFromBlobStore(start, end - start)) { + while (remaining > 0L) { + final int bytesRead = BlobCacheUtils.readSafe(input, copyBuffer, start, remaining); + positionalWrite(fc, start + bytesCopied, copyBuffer.flip()); + copyBuffer.clear(); + bytesCopied += bytesRead; + remaining -= bytesRead; + progressUpdater.accept(start + bytesCopied); + } + final long endTimeNanos = stats.currentTimeNanos(); + stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos); + } + }, directory.cacheFetchAsyncExecutor() ); } @@ -351,37 +414,12 @@ private int readCacheFile(final FileChannel fc, final long position, final ByteB assert assertFileChannelOpen(fc); final int bytesRead = Channels.readFromFileChannel(fc, position, buffer); if (bytesRead == -1) { - throwEOF(position, buffer.remaining(), cacheFileReference); + throwEOF(position, buffer.remaining()); } stats.addCachedBytesRead(bytesRead); return bytesRead; } - private void writeCacheFile(final FileChannel fc, final long start, final long end, final LongConsumer progressUpdater) - throws IOException { - assert assertFileChannelOpen(fc); - assert ThreadPool.assertCurrentThreadPool(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); - final long length = end - start; - final ByteBuffer copyBuffer = writeBuffer.get().clear(); - logger.trace("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference); - - long bytesCopied = 0L; - long remaining = end - start; - final long startTimeNanos = stats.currentTimeNanos(); - try (InputStream input = openInputStreamFromBlobStore(start, length)) { - while (remaining > 0L) { - final int bytesRead = BlobCacheUtils.readSafe(input, copyBuffer, start, remaining, cacheFileReference); - positionalWrite(fc, start + bytesCopied, copyBuffer.flip()); - copyBuffer.clear(); - bytesCopied += bytesRead; - remaining -= bytesRead; - progressUpdater.accept(start + bytesCopied); - } - final long endTimeNanos = stats.currentTimeNanos(); - stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos); - } - } - private void fillIndexCache(CacheFile cacheFile, ByteRange indexCacheMiss) { final Releasable onCacheFillComplete = stats.addIndexCacheFill(); final Future readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> { @@ -432,7 +470,7 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e try (InputStream input = openInputStreamFromBlobStore(position, length)) { final int bytesRead = Streams.read(input, b, length); if (bytesRead < length) { - throwEOF(position, length - bytesRead, cacheFileReference); + throwEOF(position, length - bytesRead); } final long endTimeNanos = stats.currentTimeNanos(); stats.addDirectBytesRead(bytesRead, endTimeNanos - startTimeNanos); @@ -442,12 +480,12 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e e.addSuppressed(inner); } } - throw new IOException("failed to read data from cache", e); + throw new IOException("failed to read data from cache for [" + cacheFileReference + "]", e); } /** * Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range - * spans multiple blobs then this stream will request them in turn. + * spans multiple blobs then this stream will request them in turn using. * * @param position The start of the range of bytes to read, relative to the start of the corresponding Lucene file. * @param readLength The number of bytes to read @@ -459,32 +497,37 @@ protected InputStream openInputStreamFromBlobStore(final long position, final lo : "cannot read [" + position + "-" + (position + readLength) + "] from [" + fileInfo + "]"; stats.addBlobStoreBytesRequested(readLength); return directory.blobContainer().readBlob(fileInfo.name(), position, readLength); - } else { - final int startPart = getPartNumberForPosition(position); - final int endPart = getPartNumberForPosition(position + readLength - 1); + } + return openInputStreamMultipleParts(position, readLength); + } - for (int currentPart = startPart; currentPart <= endPart; currentPart++) { + /** + * Used by {@link #openInputStreamFromBlobStore} when reading a range that is split across multiple file parts/blobs. + * See {@link BlobStoreRepository#chunkSize()}. + */ + private SlicedInputStream openInputStreamMultipleParts(long position, long readLength) { + final int startPart = getPartNumberForPosition(position); + final int endPart = getPartNumberForPosition(position + readLength - 1); + + for (int currentPart = startPart; currentPart <= endPart; currentPart++) { + final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; + final long endInPart; + endInPart = currentPart == endPart ? getRelativePositionInPart(position + readLength - 1) + 1 : fileInfo.partBytes(currentPart); + stats.addBlobStoreBytesRequested(endInPart - startInPart); + } + + return new SlicedInputStream(endPart - startPart + 1) { + @Override + protected InputStream openSlice(int slice) throws IOException { + final int currentPart = startPart + slice; final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; final long endInPart; endInPart = currentPart == endPart ? getRelativePositionInPart(position + readLength - 1) + 1 : fileInfo.partBytes(currentPart); - stats.addBlobStoreBytesRequested(endInPart - startInPart); + return directory.blobContainer().readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart); } - - return new SlicedInputStream(endPart - startPart + 1) { - @Override - protected InputStream openSlice(int slice) throws IOException { - final int currentPart = startPart + slice; - final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L; - final long endInPart; - endInPart = currentPart == endPart - ? getRelativePositionInPart(position + readLength - 1) + 1 - : fileInfo.partBytes(currentPart); - return directory.blobContainer().readBlob(fileInfo.partName(currentPart), startInPart, endInPart - startInPart); - } - }; - } + }; } /**