Skip to content

Commit

Permalink
Add CFS index caching support for full_copy searchable snapshots (#70646
Browse files Browse the repository at this point in the history
)

Adds support for caching header and footer of logical files within compound files in the snapshot blob index cache.
  • Loading branch information
ywelsch authored Mar 23, 2021
1 parent 7dd96ef commit 093cd95
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {

Expand Down Expand Up @@ -258,7 +257,7 @@ public void testBlobStoreCache() throws Exception {
ensureGreen(restoredAgainIndex);

logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex);
checkNoBlobStoreAccess(storage);
checkNoBlobStoreAccess();

logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex);
assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
Expand Down Expand Up @@ -295,7 +294,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
ensureGreen(restoredAgainIndex);

logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex);
checkNoBlobStoreAccess(storage);
checkNoBlobStoreAccess();

logger.info("--> verifying that no cached blobs were indexed in system index [{}] after restart", SNAPSHOT_BLOB_CACHE_INDEX);
assertHitCount(
Expand All @@ -320,19 +319,13 @@ public Settings onNodeStopped(String nodeName) throws Exception {
// TODO also test when prewarming is enabled
}

private void checkNoBlobStoreAccess(Storage storage) {
private void checkNoBlobStoreAccess() {
for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
new SearchableSnapshotsStatsRequest()
).actionGet().getStats()) {
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
assertThat(
Strings.toString(indexInputStats),
indexInputStats.getBlobStoreBytesRequested().getCount(),
storage == Storage.SHARED_CACHE ? equalTo(0L)
: indexInputStats.getFileExt().equals("cfs") ? greaterThanOrEqualTo(0L)
: equalTo(0L)
);
assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
*/
protected final ByteRange headerBlobCacheByteRange;

/**
* Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be
* required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method
* {@link BaseSearchableSnapshotIndexInput#maybeReadChecksumFromFileInfo}).
*/
protected final ByteRange footerBlobCacheByteRange;

// the following are only mutable so they can be adjusted after cloning/slicing
protected volatile boolean isClone;
private AtomicBoolean closed;
Expand All @@ -57,7 +64,8 @@ public BaseSearchableSnapshotIndexInput(
IndexInputStats stats,
long offset,
long length,
ByteRange blobCacheByteRange
ByteRange headerBlobCacheByteRange,
ByteRange footerBlobCacheByteRange
) {
super(name, context);
this.name = Objects.requireNonNull(name);
Expand All @@ -68,7 +76,8 @@ public BaseSearchableSnapshotIndexInput(
this.context = Objects.requireNonNull(context);
assert fileInfo.metadata().hashEqualsContents() == false
: "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')';
this.headerBlobCacheByteRange = Objects.requireNonNull(blobCacheByteRange);
this.headerBlobCacheByteRange = Objects.requireNonNull(headerBlobCacheByteRange);
this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange);
this.stats = Objects.requireNonNull(stats);
this.offset = offset;
this.length = length;
Expand Down Expand Up @@ -144,8 +153,11 @@ private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException {
}

protected ByteRange maybeReadFromBlobCache(long position, int length) {
if (headerBlobCacheByteRange.contains(position, position + length)) {
final long end = position + length;
if (headerBlobCacheByteRange.contains(position, end)) {
return headerBlobCacheByteRange;
} else if (footerBlobCacheByteRange.contains(position, end)) {
return footerBlobCacheByteRange;
}
return ByteRange.EMPTY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
Expand Down Expand Up @@ -60,6 +62,12 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
private final int defaultRangeSize;
private final int recoveryRangeSize;

/**
* If > 0, represents a logical file within a compound (CFS) file or is a slice thereof represents the offset of the logical
* compound file within the physical CFS file
*/
private final long compoundFileOffset;

// last read position is kept around in order to detect (non)contiguous reads for stats
private long lastReadPosition;
// last seek position is kept around in order to detect forward/backward seeks for stats
Expand All @@ -81,11 +89,13 @@ public CachedBlobContainerIndexInput(
context,
stats,
0L,
0L,
fileInfo.length(),
new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()),
rangeSize,
recoveryRangeSize,
directory.getBlobCacheByteRange(name, fileInfo.length())
directory.getBlobCacheByteRange(name, fileInfo.length()),
ByteRange.EMPTY
);
assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth
stats.incrementOpenCount();
Expand All @@ -98,18 +108,21 @@ private CachedBlobContainerIndexInput(
IOContext context,
IndexInputStats stats,
long offset,
long compoundFileOffset,
long length,
CacheFileReference cacheFileReference,
int rangeSize,
int recoveryRangeSize,
ByteRange headerBlobCacheByteRange
ByteRange headerBlobCacheByteRange,
ByteRange footerBlobCacheByteRange
) {
super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange);
super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange, footerBlobCacheByteRange);
this.cacheFileReference = cacheFileReference;
this.lastReadPosition = this.offset;
this.lastSeekPosition = this.offset;
this.defaultRangeSize = rangeSize;
this.recoveryRangeSize = recoveryRangeSize;
this.compoundFileOffset = compoundFileOffset;
}

@Override
Expand Down Expand Up @@ -161,7 +174,7 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
final ByteRange indexCacheMiss; // null if not a miss

final ByteRange blobCacheByteRange = maybeReadFromBlobCache(position, length);
if (blobCacheByteRange != ByteRange.EMPTY) {
if (blobCacheByteRange.isEmpty() == false) {
final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange);
assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position;
assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length();
Expand Down Expand Up @@ -245,7 +258,11 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
final ByteRange startRangeToWrite = computeRange(position);
final ByteRange endRangeToWrite = computeRange(position + length - 1);
assert startRangeToWrite.end() <= endRangeToWrite.end() : startRangeToWrite + " vs " + endRangeToWrite;
final ByteRange rangeToWrite = startRangeToWrite.minEnvelope(endRangeToWrite).minEnvelope(indexCacheMiss);

// only request what's needed to fill the cache to make sure the cache is always fully populated during recovery
final ByteRange rangeToWrite = indexCacheMiss != null && directory.isRecoveryFinalized() == false
? indexCacheMiss
: startRangeToWrite.minEnvelope(endRangeToWrite);

final ByteRange rangeToRead = ByteRange.of(position, position + length);
assert rangeToRead.isSubRangeOf(rangeToWrite) : rangeToRead + " vs " + rangeToWrite;
Expand All @@ -260,39 +277,11 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
);

if (indexCacheMiss != null) {
final Releasable onCacheFillComplete = stats.addIndexCacheFill();
final Future<Integer> readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> {
final int indexCacheMissLength = toIntBytes(indexCacheMiss.length());

// We assume that we only cache small portions of blobs so that we do not need to:
// - use a BigArrays for allocation
// - use an intermediate copy buffer to read the file in sensibly-sized chunks
// - release the buffer once the indexing operation is complete

final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer);
// NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
byteBuffer.flip();
final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener<>() {
@Override
public void onResponse(Void response) {
onCacheFillComplete.close();
}

@Override
public void onFailure(Exception e1) {
onCacheFillComplete.close();
}
});
return indexCacheMissLength;
});

if (readFuture == null) {
// Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
// possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
// case, simply move on.
onCacheFillComplete.close();
fillIndexCache(cacheFile, indexCacheMiss);
if (compoundFileOffset > 0L
&& indexCacheMiss.equals(headerBlobCacheByteRange)
&& footerBlobCacheByteRange.isEmpty() == false) {
fillIndexCache(cacheFile, footerBlobCacheByteRange);
}
}

Expand All @@ -311,6 +300,43 @@ public void onFailure(Exception e1) {
readComplete(position, length);
}

private void fillIndexCache(CacheFile cacheFile, ByteRange indexCacheMiss) {
final Releasable onCacheFillComplete = stats.addIndexCacheFill();
final Future<Integer> readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> {
final int indexCacheMissLength = toIntBytes(indexCacheMiss.length());

// We assume that we only cache small portions of blobs so that we do not need to:
// - use a BigArrays for allocation
// - use an intermediate copy buffer to read the file in sensibly-sized chunks
// - release the buffer once the indexing operation is complete

final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer);
// NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
byteBuffer.flip();
final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
directory.putCachedBlob(fileInfo.physicalName(), indexCacheMiss.start(), content, new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
onCacheFillComplete.close();
}

@Override
public void onFailure(Exception e1) {
onCacheFillComplete.close();
}
});
return indexCacheMissLength;
});

if (readFuture == null) {
// Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
// possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
// case, simply move on.
onCacheFillComplete.close();
}
}

private void readComplete(long position, int length) {
stats.incrementBytesRead(lastReadPosition, position, length);
lastReadPosition = position + length;
Expand Down Expand Up @@ -579,18 +605,45 @@ public IndexInput slice(String sliceName, long sliceOffset, long sliceLength) {
+ this
);
}

// Are we creating a slice from a CFS file?
final boolean sliceCompoundFile = IndexFileNames.matchesExtension(name, "cfs")
&& IndexFileNames.getExtension(sliceName) != null
&& compoundFileOffset == 0L // not already a compound file
&& isClone == false; // tests aggressively clone and slice

final ByteRange sliceHeaderByteRange;
final ByteRange sliceFooterByteRange;
final long sliceCompoundFileOffset;

if (sliceCompoundFile) {
sliceCompoundFileOffset = this.offset + sliceOffset;
sliceHeaderByteRange = directory.getBlobCacheByteRange(sliceName, sliceLength).shift(sliceCompoundFileOffset);
if (sliceHeaderByteRange.length() < sliceLength) {
sliceFooterByteRange = ByteRange.of(sliceLength - CodecUtil.footerLength(), sliceLength).shift(sliceCompoundFileOffset);
} else {
sliceFooterByteRange = ByteRange.EMPTY;
}
} else {
sliceCompoundFileOffset = this.compoundFileOffset;
sliceHeaderByteRange = ByteRange.EMPTY;
sliceFooterByteRange = ByteRange.EMPTY;
}

final CachedBlobContainerIndexInput slice = new CachedBlobContainerIndexInput(
sliceName,
directory,
fileInfo,
context,
stats,
this.offset + sliceOffset,
sliceCompoundFileOffset,
sliceLength,
cacheFileReference,
defaultRangeSize,
recoveryRangeSize,
headerBlobCacheByteRange
sliceHeaderByteRange,
sliceFooterByteRange
);
slice.isClone = true;
return slice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
Expand All @@ -61,13 +60,6 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput {
*/
private final long compoundFileOffset;

/**
* Range of bytes that should be cached in the blob cache for the current index input's footer. This footer byte range should only be
* required for slices of CFS files; regular files already have their footers extracted from the {@link FileInfo} (see method
* {@link BaseSearchableSnapshotIndexInput#maybeReadChecksumFromFileInfo}).
*/
private final ByteRange footerBlobCacheByteRange;

// last read position is kept around in order to detect (non)contiguous reads for stats
private long lastReadPosition;
// last seek position is kept around in order to detect forward/backward seeks for stats
Expand Down Expand Up @@ -116,13 +108,12 @@ private FrozenIndexInput(
ByteRange headerBlobCacheByteRange,
ByteRange footerBlobCacheByteRange
) {
super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange);
super(logger, name, directory, fileInfo, context, stats, offset, length, headerBlobCacheByteRange, footerBlobCacheByteRange);
this.frozenCacheFile = frozenCacheFile;
this.lastReadPosition = this.offset;
this.lastSeekPosition = this.offset;
this.defaultRangeSize = rangeSize;
this.recoveryRangeSize = recoveryRangeSize;
this.footerBlobCacheByteRange = Objects.requireNonNull(footerBlobCacheByteRange);
this.compoundFileOffset = compoundFileOffset;
assert offset >= compoundFileOffset;
}
Expand Down Expand Up @@ -454,17 +445,6 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e
throw new IOException("failed to read data from cache", e);
}

@Override
protected ByteRange maybeReadFromBlobCache(long position, int length) {
final long end = position + length;
if (headerBlobCacheByteRange.contains(position, end)) {
return headerBlobCacheByteRange;
} else if (footerBlobCacheByteRange.contains(position, end)) {
return footerBlobCacheByteRange;
}
return ByteRange.EMPTY;
}

private static int positionalWrite(SharedBytes.IO fc, long start, ByteBuffer byteBuffer) throws IOException {
assert assertCurrentThreadMayWriteCacheFile();
byteBuffer.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private DirectBlobContainerIndexInput(
long sequentialReadSize,
int bufferSize
) {
super(logger, name, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY); // TODO should use blob cache
super(logger, name, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY, ByteRange.EMPTY); // TODO should use blob
// cache
this.position = position;
assert sequentialReadSize >= 0;
this.sequentialReadSize = sequentialReadSize;
Expand Down
Loading

0 comments on commit 093cd95

Please sign in to comment.