Skip to content

Commit

Permalink
HBASE-27370 Avoid decompressing blocks when reading from bucket cache… (
Browse files Browse the repository at this point in the history
apache#4781)

Co-authored-by: Josh Elser <[email protected]>
Signed-off-by: Peter Somogyi <[email protected]>
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
  • Loading branch information
wchevreuil and Josh Elser committed Sep 20, 2022
1 parent 393068c commit e25b2a7
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ public interface CachingBlockReader {
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread,
final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException;

HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread,
final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException;
}

/** An interface used by clients to open and iterate an {@link HFile}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void run() {
// next header, will not have happened...so, pass in the onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely I'd say.
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
/* pread= */true, false, false, null, null);
/* pread= */true, false, false, null, null, true);
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ public void setConf(Configuration conf) {
* and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary.
*/
private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
// Check cache for block. If found return.
BlockCache cache = cacheConf.getBlockCache().orElse(null);
Expand Down Expand Up @@ -1187,7 +1187,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws

cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
HFileBlock cachedBlock =
getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null);
getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META, null);
if (cachedBlock != null) {
assert cachedBlock.isUnpacked() : "Packed block leak.";
// Return a distinct 'shallow copy' of the block,
Expand Down Expand Up @@ -1234,6 +1234,15 @@ private boolean shouldUseHeap(BlockType expectedBlockType) {
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException {
return readBlock(dataBlockOffset, onDiskBlockSize, cacheBlock, pread, isCompaction,
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding, false);
}

@Override
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly)
throws IOException {
if (dataBlockIndexReader == null) {
throw new IOException(path + " block index not loaded");
}
Expand All @@ -1257,17 +1266,18 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
try (TraceScope traceScope = TraceUtil.createTrace("HFileReaderImpl.readBlock")) {
while (true) {
// Check cache for block. If found return.
if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
if (cacheConf.shouldReadBlockFromCache(expectedBlockType) && !cacheOnly) {
if (useLock) {
lockEntry = offsetLock.getLockEntry(dataBlockOffset);
}
// Try and get the block from the block cache. If the useLock variable is true then this
// is the second time through the loop and it should not be counted as a block cache miss.
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics,
expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("From Cache " + cachedBlock);
LOG.trace("Block for file {} is coming from Cache {}",
Bytes.toString(cachedBlock.getHFileContext().getTableName()), cachedBlock);
}
TraceUtil.addTimelineAnnotation("blockCacheHit");
assert cachedBlock.isUnpacked() : "Packed block leak.";
Expand Down Expand Up @@ -1304,14 +1314,30 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType));
validateBlockType(hfileBlock, expectedBlockType);
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
final boolean cacheCompressed = cacheConf.shouldCacheCompressed(category);
final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);

// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
LOG.debug("Skipping decompression of block in prefetch");
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
}
});

if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
HFile.DATABLOCK_READ_COUNT.increment();
}
return hfileBlock;
}
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey,
cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@ public BlockReaderWrapper(HFileBlock.FSReader realReader) {
public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread,
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
return readBlock(offset, onDiskSize, cacheBlock, pread, isCompaction, updateCacheMetrics,
expectedBlockType, expectedDataBlockEncoding, false);
}

@Override
public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread,
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException {
if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread == prevPread) {
hitCount += 1;
return prevBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -34,6 +38,7 @@
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
Expand Down Expand Up @@ -112,36 +117,88 @@ private void readStoreFileLikeScanner(Path storeFilePath) throws Exception {
}

private void readStoreFile(Path storeFilePath) throws Exception {
readStoreFile(storeFilePath, (r, o) -> {
HFileBlock block = null;
try {
block = r.readBlock(o, -1, false, true, false, true, null, null);
} catch (IOException e) {
fail(e.getMessage());
}
return block;
}, (key, block) -> {
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(isCached);
}
});
}

private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
readStoreFile(storeFilePath, (r, o) -> {
HFileBlock block = null;
try {
block = r.readBlock(o, -1, false, true, false, true, null, null, true);
} catch (IOException e) {
fail(e.getMessage());
}
return block;
}, (key, block) -> {
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
if (block.getBlockType() == BlockType.DATA) {
assertFalse(block.isUnpacked());
} else if (
block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(block.isUnpacked());
}
assertTrue(isCached);
});
}

private void readStoreFile(Path storeFilePath,
BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
// Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);

while (!reader.prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}

// Check that all of the data blocks were preloaded
BlockCache blockCache = cacheConf.getBlockCache().get();
long offset = 0;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
HFileBlock block = readFunction.apply(reader, offset);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(isCached);
}
validationFunction.accept(blockCacheKey, block);
offset += block.getOnDiskSizeWithHeader();
}
}

@Test
public void testPrefetchCompressed() throws Exception {
conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
cacheConf = new CacheConfig(conf, blockCache);
HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
readStoreFileCacheOnly(storeFile);
conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);

}

private Path writeStoreFile(String fname) throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
return writeStoreFile(fname, meta);
}

private Path writeStoreFile(String fname, HFileContext context) throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
.withOutputDir(storeFileParentDir).withFileContext(context).build();
Random rand = ThreadLocalRandom.current();
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) {
Expand Down

0 comments on commit e25b2a7

Please sign in to comment.