diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 7660a42ba5e2..8419ccb6c1cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -34,6 +34,19 @@ public interface BlockCache extends Iterable { */ void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory); + /** + * Add block to cache. + * @param cacheKey The block's cache key. + * @param buf The block contents wrapped in a ByteBuffer. + * @param inMemory Whether block should be treated as in-memory + * @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is + * configured. + */ + default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { + cacheBlock(cacheKey, buf, inMemory); + } + /** * Add block to cache (defaults to not in-memory). * @param cacheKey The block's cache key. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 6cd40b0858f5..d616d6f40d9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -53,11 +53,17 @@ public long heapSize() { @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { + cacheBlock(cacheKey, buf, inMemory, false); + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { boolean metaBlock = isMetaBlock(buf.getBlockType()); if (metaBlock) { l1Cache.cacheBlock(cacheKey, buf, inMemory); } else { - l2Cache.cacheBlock(cacheKey, buf, inMemory); + l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index d47db18bad9d..1caca6abf4e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1328,7 +1328,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); + cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly); } }); @@ -1341,8 +1341,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, - cacheConf.isInMemory()); + // Using the wait on cache during compaction and prefetching. + cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly); } }); if (unpacked != hfileBlock) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 6b7cf3caaa39..b33d471ae499 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -550,7 +550,7 @@ private void doCacheOnWrite(long offset) { HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); try { cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), - cacheFormatBlock); + cacheFormatBlock, cacheConf.isInMemory(), true); } finally { // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent cacheFormatBlock.release(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 2e2ae4865653..18295f285c49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -175,13 +175,6 @@ public class BucketCache implements BlockCache, HeapSize { private static final int DEFAULT_CACHE_WAIT_TIME = 50; - /** - * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip - * some blocks when caching. If the flag is true, we will wait until blocks are flushed to - * IOEngine. - */ - boolean wait_when_cache = false; - private final BucketCacheStats cacheStats = new BucketCacheStats(); private final String persistencePath; @@ -248,6 +241,10 @@ public class BucketCache implements BlockCache, HeapSize { "hbase.bucketcache.persistent.file.integrity.check.algorithm"; private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; + private static final String QUEUE_ADDITION_WAIT_TIME = + "hbase.bucketcache.queue.addition.waittime"; + private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; + private long queueAdditionWaitTime; /** * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file * integrity, default algorithm is MD5 @@ -288,6 +285,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); + this.queueAdditionWaitTime = + conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); sanityCheckConfigs(); @@ -432,7 +431,19 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { */ @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { - cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); + cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); + } + + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, + boolean waitWhenCache) { + cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); } /** @@ -491,7 +502,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach boolean successfulAddition = false; if (wait) { try { - successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); + successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 908d6ce73267..a132673e8464 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -119,7 +120,6 @@ public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[ int writerThreads, int writerQLen, String persistencePath) throws IOException { super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, persistencePath); - super.wait_when_cache = true; } @Override @@ -241,8 +241,8 @@ public static void waitUntilAllFlushedToBucket(BucketCache cache) throws Interru // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer // threads will flush it to the bucket and put reference entry in backingMap. private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, - Cacheable block) throws InterruptedException { - cache.cacheBlock(cacheKey, block); + Cacheable block, boolean waitWhenCache) throws InterruptedException { + cache.cacheBlock(cacheKey, block, false, waitWhenCache); waitUntilFlushedToBucket(cache, cacheKey); } @@ -250,7 +250,7 @@ private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey c public void testMemoryLeak() throws Exception { final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); cacheAndWaitUntilFlushedToBucket(cache, cacheKey, - new CacheTestUtils.ByteArrayCacheable(new byte[10])); + new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); long lockId = cache.backingMap.get(cacheKey).offset(); ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); lock.writeLock().lock(); @@ -265,7 +265,7 @@ public void run() { cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); assertEquals(0, cache.getBlockCount()); cacheAndWaitUntilFlushedToBucket(cache, cacheKey, - new CacheTestUtils.ByteArrayCacheable(new byte[10])); + new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); assertEquals(1, cache.getBlockCount()); lock.writeLock().unlock(); evictThread.join(); @@ -341,7 +341,8 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); } for (HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), + false); } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); @@ -403,7 +404,8 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception { bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); } for (HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), + false); } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); @@ -786,7 +788,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), - hfileBlockPair.getBlock()); + hfileBlockPair.getBlock(), false); } usedByteSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedByteSize); @@ -811,4 +813,63 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { } } + /** + * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file + * could not be freed even if corresponding {@link HFileBlock} is evicted from + * {@link BucketCache}. + */ + @Test + public void testBlockAdditionWaitWhenCache() throws Exception { + try { + final Path dataTestDir = createAndGetTestDir(); + + String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; + String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; + + BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, 1, 1, persistencePath); + long usedByteSize = bucketCache.getAllocator().getUsedSize(); + assertEquals(0, usedByteSize); + + HFileBlockPair[] hfileBlockPairs = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); + // Add blocks + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, + true); + } + + // Max wait for 10 seconds. + long timeout = 10000; + // Wait for blocks size to match the number of blocks. + while (bucketCache.backingMap.size() != 10) { + if (timeout <= 0) break; + Threads.sleep(100); + timeout = -100; + } + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); + } + usedByteSize = bucketCache.getAllocator().getUsedSize(); + assertNotEquals(0, usedByteSize); + // persist cache to file + bucketCache.shutdown(); + assertTrue(new File(persistencePath).exists()); + + // restore cache from file + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); + + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); + bucketCache.evictBlock(blockCacheKey); + } + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + HBASE_TESTING_UTILITY.cleanupTestDir(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java index 44a398bda44b..a3f291b7949c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java @@ -110,8 +110,6 @@ private void disableWriter() { // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2> public void testBlockInRAMCache() throws IOException { cache = create(1, 1000); - // Set this to true; - cache.wait_when_cache = true; disableWriter(); final String prefix = "testBlockInRamCache"; try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 6a5c9dcf2db3..771ab0158f61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -117,7 +117,6 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); - bucketCache.wait_when_cache = true; cacheConf = new CacheConfig(conf, bucketCache); long usedSize = bucketCache.getAllocator().getUsedSize(); @@ -137,7 +136,6 @@ public void testPrefetchPersistence() throws Exception { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); - bucketCache.wait_when_cache = true; assertFalse(new File(testDir + "/bucket.persistence").exists()); assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertTrue(usedSize != 0); diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml index 0b6f1d59a0e9..5544ae787b33 100644 --- a/hbase-server/src/test/resources/hbase-site.xml +++ b/hbase-server/src/test/resources/hbase-site.xml @@ -277,4 +277,9 @@ 3 Default is unbounded + + hbase.bucketcache.queue.addition.waittime + 100 + Default is 0 +