diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index 4b03f72855d07..e3964b1c200e3 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -61,7 +61,6 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -258,7 +257,7 @@ public void testBlobStoreCache() throws Exception { ensureGreen(restoredAgainIndex); logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); - checkNoBlobStoreAccess(storage); + checkNoBlobStoreAccess(); logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); @@ -295,7 +294,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { ensureGreen(restoredAgainIndex); logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); - checkNoBlobStoreAccess(storage); + checkNoBlobStoreAccess(); logger.info("--> verifying that no cached blobs were indexed in system index [{}] after restart", SNAPSHOT_BLOB_CACHE_INDEX); assertHitCount( @@ -320,19 +319,13 @@ public Settings onNodeStopped(String nodeName) throws Exception { // TODO also test when prewarming is enabled } - private void checkNoBlobStoreAccess(Storage storage) { + private void checkNoBlobStoreAccess() { for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, new SearchableSnapshotsStatsRequest() ).actionGet().getStats()) { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - assertThat( - Strings.toString(indexInputStats), - indexInputStats.getBlobStoreBytesRequested().getCount(), - storage == Storage.SHARED_CACHE ? equalTo(0L) - : indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L) - : equalTo(0L) - ); + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); } } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index 46d7ef60adc60..86243ce6060ae 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -44,6 +44,13 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu */ protected final ByteRange headerBlobCacheByteRange; + /** + * Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be + * required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method + * {@link BaseSearchableSnapshotIndexInput#maybeReadChecksumFromFileInfo}). + */ + protected final ByteRange footerBlobCacheByteRange; + // the following are only mutable so they can be adjusted after cloning/slicing protected volatile boolean isClone; private AtomicBoolean closed; @@ -57,7 +64,8 @@ public BaseSearchableSnapshotIndexInput( IndexInputStats stats, long offset, long length, - ByteRange blobCacheByteRange + ByteRange headerBlobCacheByteRange, + ByteRange footerBlobCacheByteRange ) { super(name, context); this.name = Objects.requireNonNull(name); @@ -68,7 +76,8 @@ public BaseSearchableSnapshotIndexInput( this.context = Objects.requireNonNull(context); assert fileInfo.metadata().hashEqualsContents() == false : "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')'; - this.headerBlobCacheByteRange = Objects.requireNonNull(blobCacheByteRange); + this.headerBlobCacheByteRange = Objects.requireNonNull(headerBlobCacheByteRange); + this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange); this.stats = Objects.requireNonNull(stats); this.offset = offset; this.length = length; @@ -144,8 +153,11 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException { } protected ByteRange maybeReadFromBlobCache(long position, int length) { - if (headerBlobCacheByteRange.contains(position, position + length)) { + final long end = position + length; + if (headerBlobCacheByteRange.contains(position, end)) { return headerBlobCacheByteRange; + } else if (footerBlobCacheByteRange.contains(position, end)) { + return footerBlobCacheByteRange; } return ByteRange.EMPTY; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 8a59010884a3d..02bab3b0d57d6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -60,6 +62,12 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private final int defaultRangeSize; private final int recoveryRangeSize; + /** + * If > 0, represents a logical file within a compound (CFS) file or is a slice thereof represents the offset of the logical + * compound file within the physical CFS file + */ + private final long compoundFileOffset; + // last read position is kept around in order to detect (non)contiguous reads for stats private long lastReadPosition; // last seek position is kept around in order to detect forward/backward seeks for stats @@ -81,11 +89,13 @@ public CachedBlobContainerIndexInput( context, stats, 0L, + 0L, fileInfo.length(), new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize, recoveryRangeSize, - directory.getBlobCacheByteRange(name, fileInfo.length()) + directory.getBlobCacheByteRange(name, fileInfo.length()), + ByteRange.EMPTY ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -98,18 +108,21 @@ private CachedBlobContainerIndexInput( IOContext context, IndexInputStats stats, long offset, + long compoundFileOffset, long length, CacheFileReference cacheFileReference, int rangeSize, int recoveryRangeSize, - ByteRange headerBlobCacheByteRange + ByteRange headerBlobCacheByteRange, + ByteRange footerBlobCacheByteRange ) { - super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange); + super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange, footerBlobCacheByteRange); this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; this.defaultRangeSize = rangeSize; this.recoveryRangeSize = recoveryRangeSize; + this.compoundFileOffset = compoundFileOffset; } @Override @@ -161,7 +174,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException { final ByteRange indexCacheMiss; // null if not a miss final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length); - if (blobCacheByteRange != ByteRange.EMPTY) { + if (blobCacheByteRange.isEmpty() == false) { final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); @@ -245,7 +258,11 @@ protected void doReadInternal(ByteBuffer b) throws IOException { final ByteRange startRangeToWrite = computeRange(position); final ByteRange endRangeToWrite = computeRange(position + length - 1); assert startRangeToWrite.end() <= endRangeToWrite.end() : startRangeToWrite + " vs " + endRangeToWrite; - final ByteRange rangeToWrite = startRangeToWrite.minEnvelope(endRangeToWrite).minEnvelope(indexCacheMiss); + + // only request what's needed to fill the cache to make sure the cache is always fully populated during recovery + final ByteRange rangeToWrite = indexCacheMiss != null && directory.isRecoveryFinalized() == false + ? indexCacheMiss + : startRangeToWrite.minEnvelope(endRangeToWrite); final ByteRange rangeToRead = ByteRange.of(position, position + length); assert rangeToRead.isSubRangeOf(rangeToWrite) : rangeToRead + " vs " + rangeToWrite; @@ -260,39 +277,11 @@ protected void doReadInternal(ByteBuffer b) throws IOException { ); if (indexCacheMiss != null) { - final Releasable onCacheFillComplete = stats.addIndexCacheFill(); - final Future readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> { - final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); - - // We assume that we only cache small portions of blobs so that we do not need to: - // - use a BigArrays for allocation - // - use an intermediate copy buffer to read the file in sensibly-sized chunks - // - release the buffer once the indexing operation is complete - - final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); - Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer); - // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats - byteBuffer.flip(); - final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); - directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener<>() { - @Override - public void onResponse(Void response) { - onCacheFillComplete.close(); - } - - @Override - public void onFailure(Exception e1) { - onCacheFillComplete.close(); - } - }); - return indexCacheMissLength; - }); - - if (readFuture == null) { - // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically - // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that - // case, simply move on. - onCacheFillComplete.close(); + fillIndexCache(cacheFile, indexCacheMiss); + if (compoundFileOffset > 0L + && indexCacheMiss.equals(headerBlobCacheByteRange) + && footerBlobCacheByteRange.isEmpty() == false) { + fillIndexCache(cacheFile, footerBlobCacheByteRange); } } @@ -311,6 +300,43 @@ public void onFailure(Exception e1) { readComplete(position, length); } + private void fillIndexCache(CacheFile cacheFile, ByteRange indexCacheMiss) { + final Releasable onCacheFillComplete = stats.addIndexCacheFill(); + final Future readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> { + final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); + + // We assume that we only cache small portions of blobs so that we do not need to: + // - use a BigArrays for allocation + // - use an intermediate copy buffer to read the file in sensibly-sized chunks + // - release the buffer once the indexing operation is complete + + final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); + Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer); + // NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats + byteBuffer.flip(); + final BytesReference content = BytesReference.fromByteBuffer(byteBuffer); + directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener() { + @Override + public void onResponse(Void response) { + onCacheFillComplete.close(); + } + + @Override + public void onFailure(Exception e1) { + onCacheFillComplete.close(); + } + }); + return indexCacheMissLength; + }); + + if (readFuture == null) { + // Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically + // possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that + // case, simply move on. + onCacheFillComplete.close(); + } + } + private void readComplete(long position, int length) { stats.incrementBytesRead(lastReadPosition, position, length); lastReadPosition = position + length; @@ -579,6 +605,31 @@ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { + this ); } + + // Are we creating a slice from a CFS file? + final boolean sliceCompoundFile = IndexFileNames.matchesExtension(name, "cfs") + && IndexFileNames.getExtension(sliceName) != null + && compoundFileOffset == 0L // not already a compound file + && isClone == false; // tests aggressively clone and slice + + final ByteRange sliceHeaderByteRange; + final ByteRange sliceFooterByteRange; + final long sliceCompoundFileOffset; + + if (sliceCompoundFile) { + sliceCompoundFileOffset = this.offset + sliceOffset; + sliceHeaderByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength).shift(sliceCompoundFileOffset); + if (sliceHeaderByteRange.length() < sliceLength) { + sliceFooterByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength).shift(sliceCompoundFileOffset); + } else { + sliceFooterByteRange = ByteRange.EMPTY; + } + } else { + sliceCompoundFileOffset = this.compoundFileOffset; + sliceHeaderByteRange = ByteRange.EMPTY; + sliceFooterByteRange = ByteRange.EMPTY; + } + final CachedBlobContainerIndexInput slice = new CachedBlobContainerIndexInput( sliceName, directory, @@ -586,11 +637,13 @@ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) { context, stats, this.offset + sliceOffset, + sliceCompoundFileOffset, sliceLength, cacheFileReference, defaultRangeSize, recoveryRangeSize, - headerBlobCacheByteRange + sliceHeaderByteRange, + sliceFooterByteRange ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index 0c2c45cef99f9..9bd1748d0425e 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -37,7 +37,6 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Locale; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @@ -61,13 +60,6 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { */ private final long compoundFileOffset; - /** - * Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be - * required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method - * {@link BaseSearchableSnapshotIndexInput#maybeReadChecksumFromFileInfo}). - */ - private final ByteRange footerBlobCacheByteRange; - // last read position is kept around in order to detect (non)contiguous reads for stats private long lastReadPosition; // last seek position is kept around in order to detect forward/backward seeks for stats @@ -116,13 +108,12 @@ private FrozenIndexInput( ByteRange headerBlobCacheByteRange, ByteRange footerBlobCacheByteRange ) { - super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange); + super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange, footerBlobCacheByteRange); this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; this.defaultRangeSize = rangeSize; this.recoveryRangeSize = recoveryRangeSize; - this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange); this.compoundFileOffset = compoundFileOffset; assert offset >= compoundFileOffset; } @@ -454,17 +445,6 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e throw new IOException("failed to read data from cache", e); } - @Override - protected ByteRange maybeReadFromBlobCache(long position, int length) { - final long end = position + length; - if (headerBlobCacheByteRange.contains(position, end)) { - return headerBlobCacheByteRange; - } else if (footerBlobCacheByteRange.contains(position, end)) { - return footerBlobCacheByteRange; - } - return ByteRange.EMPTY; - } - private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException { assert assertCurrentThreadMayWriteCacheFile(); byteBuffer.flip(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index 48deca1bc29df..6d308d97dcd3d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -93,7 +93,8 @@ private DirectBlobContainerIndexInput( long sequentialReadSize, int bufferSize ) { - super(logger, name, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY); // TODO should use blob cache + super(logger, name, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY, ByteRange.EMPTY); // TODO should use blob + // cache this.position = position; assert sequentialReadSize >= 0; this.sequentialReadSize = sequentialReadSize; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java index 165634f5e0978..b8df989b03a75 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java @@ -69,6 +69,13 @@ public boolean contains(long start, long end) { return start() <= start && end <= end(); } + public ByteRange shift(long delta) { + if (this == EMPTY) { + return EMPTY; + } + return ByteRange.of(start + delta, end + delta); + } + @Override public int hashCode() { return 31 * Long.hashCode(start) + Long.hashCode(end); @@ -95,4 +102,8 @@ public String toString() { public int compareTo(ByteRange o) { return Long.compare(start, o.start); } + + public boolean isEmpty() { + return start == end; + } }