diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 89a407b3389d..73346e8ae4ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -373,6 +373,10 @@ public interface CachingBlockReader { HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread, final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException; + + HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread, + final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException; } /** An interface used by clients to open and iterate an {@link HFile}. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 98401c46bee7..0eb2aa7db000 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -57,7 +57,7 @@ public void run() { // next header, will not have happened...so, pass in the onDiskSize gotten from the // cached block. This 'optimization' triggers extremely rarely I'd say. HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true, - /* pread= */true, false, false, null, null); + /* pread= */true, false, false, null, null, true); try { onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); offset += block.getOnDiskSizeWithHeader(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 63d1cee2b13f..2cf1c3df6773 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1084,7 +1084,7 @@ public void setConf(Configuration conf) { * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary. */ private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, - boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, + boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { // Check cache for block. If found return. BlockCache cache = cacheConf.getBlockCache().orElse(null); @@ -1189,7 +1189,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory()); HFileBlock cachedBlock = - getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null); + getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META, null); if (cachedBlock != null) { assert cachedBlock.isUnpacked() : "Packed block leak."; // Return a distinct 'shallow copy' of the block, @@ -1236,6 +1236,15 @@ private boolean shouldUseHeap(BlockType expectedBlockType) { public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock, boolean pread, final boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { + return readBlock(dataBlockOffset, onDiskBlockSize, cacheBlock, pread, isCompaction, + updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding, false); + } + + @Override + public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock, + boolean pread, final boolean isCompaction, boolean updateCacheMetrics, + BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) + throws IOException { if (dataBlockIndexReader == null) { throw new IOException(path + " block index not loaded"); } @@ -1261,17 +1270,18 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo try { while (true) { // Check cache for block. If found return. - if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) { + if (cacheConf.shouldReadBlockFromCache(expectedBlockType) && !cacheOnly) { if (useLock) { lockEntry = offsetLock.getLockEntry(dataBlockOffset); } // Try and get the block from the block cache. If the useLock variable is true then this // is the second time through the loop and it should not be counted as a block cache miss. - HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, - updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); + HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics, + expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { if (LOG.isTraceEnabled()) { - LOG.trace("From Cache {}", cachedBlock); + LOG.trace("Block for file {} is coming from Cache {}", + Bytes.toString(cachedBlock.getHFileContext().getTableName()), cachedBlock); } span.addEvent("block cache hit", attributes); assert cachedBlock.isUnpacked() : "Packed block leak."; @@ -1308,14 +1318,30 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction, shouldUseHeap(expectedBlockType)); validateBlockType(hfileBlock, expectedBlockType); - HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); + final boolean cacheCompressed = cacheConf.shouldCacheCompressed(category); + final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category); + + // Don't need the unpacked block back and we're storing the block in the cache compressed + if (cacheOnly && cacheCompressed && cacheOnRead) { + LOG.debug("Skipping decompression of block in prefetch"); + // Cache the block if necessary + cacheConf.getBlockCache().ifPresent(cache -> { + if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); + } + }); + if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { + HFile.DATABLOCK_READ_COUNT.increment(); + } + return hfileBlock; + } + HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cache.cacheBlock(cacheKey, - cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked, + cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheConf.isInMemory()); } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 4aedfb959a90..2009c97ab55f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -175,6 +175,14 @@ public BlockReaderWrapper(HFileBlock.FSReader realReader) { public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { + return readBlock(offset, onDiskSize, cacheBlock, pread, isCompaction, updateCacheMetrics, + expectedBlockType, expectedDataBlockEncoding, false); + } + + @Override + public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread, + boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, + DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException { if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread == prevPread) { hitCount += 1; return prevBlock; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 1e4f675b2382..9844ebbf42fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.hasItem; @@ -26,6 +27,7 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.trace.data.SpanData; @@ -34,6 +36,8 @@ import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -47,6 +51,7 @@ import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -148,6 +153,51 @@ private void readStoreFileLikeScanner(Path storeFilePath) throws Exception { } private void readStoreFile(Path storeFilePath) throws Exception { + readStoreFile(storeFilePath, (r, o) -> { + HFileBlock block = null; + try { + block = r.readBlock(o, -1, false, true, false, true, null, null); + } catch (IOException e) { + fail(e.getMessage()); + } + return block; + }, (key, block) -> { + boolean isCached = blockCache.getBlock(key, true, false, true) != null; + if ( + block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + assertTrue(isCached); + } + }); + } + + private void readStoreFileCacheOnly(Path storeFilePath) throws Exception { + readStoreFile(storeFilePath, (r, o) -> { + HFileBlock block = null; + try { + block = r.readBlock(o, -1, false, true, false, true, null, null, true); + } catch (IOException e) { + fail(e.getMessage()); + } + return block; + }, (key, block) -> { + boolean isCached = blockCache.getBlock(key, true, false, true) != null; + if (block.getBlockType() == BlockType.DATA) { + assertFalse(block.isUnpacked()); + } else if ( + block.getBlockType() == BlockType.ROOT_INDEX + || block.getBlockType() == BlockType.INTERMEDIATE_INDEX + ) { + assertTrue(block.isUnpacked()); + } + assertTrue(isCached); + }); + } + + private void readStoreFile(Path storeFilePath, + BiFunction readFunction, + BiConsumer validationFunction) throws Exception { // Open the file HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); @@ -155,29 +205,36 @@ private void readStoreFile(Path storeFilePath) throws Exception { // Sleep for a bit Thread.sleep(1000); } - - // Check that all of the data blocks were preloaded - BlockCache blockCache = cacheConf.getBlockCache().get(); long offset = 0; while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { - HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); + HFileBlock block = readFunction.apply(reader, offset); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); - boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; - if ( - block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX - || block.getBlockType() == BlockType.INTERMEDIATE_INDEX - ) { - assertTrue(isCached); - } + validationFunction.accept(blockCacheKey, block); offset += block.getOnDiskSizeWithHeader(); } } + @Test + public void testPrefetchCompressed() throws Exception { + conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true); + cacheConf = new CacheConfig(conf, blockCache); + HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ) + .withBlockSize(DATA_BLOCK_SIZE).build(); + Path storeFile = writeStoreFile("TestPrefetchCompressed", context); + readStoreFileCacheOnly(storeFile); + conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); + + } + private Path writeStoreFile(String fname) throws IOException { - Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); + return writeStoreFile(fname, meta); + } + + private Path writeStoreFile(String fname, HFileContext context) throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) - .withOutputDir(storeFileParentDir).withFileContext(meta).build(); + .withOutputDir(storeFileParentDir).withFileContext(context).build(); Random rand = ThreadLocalRandom.current(); final int rowLen = 32; for (int i = 0; i < NUM_KV; ++i) {