From f28001c4a91dbeb9e692903a2a43ab6bcb7df395 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Fri, 5 Apr 2024 10:56:06 +0100 Subject: [PATCH] CDPD-68550: Backport [ADDENDUM] HBASE-28458 BucketCache.notifyFileCachingCompleted may incorrectly consider a file fully cached (#5777) (#5791) Signed-off-by: Peter Somogyi (cherry picked from commit d7566abd5de915e8f55a4f1f1939f6be38891657) Change-Id: I27599d2c82cf2dea074bba8caa1e6ba5b14b764a --- .../hbase/io/hfile/bucket/BucketCache.java | 48 +++++++++----- .../io/hfile/TestCombinedBlockCache.java | 9 ++- .../hfile/bucket/TestPrefetchPersistence.java | 64 +++---------------- 3 files changed, 47 insertions(+), 74 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 5e544a0007e4..dfae8a91aa5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -734,7 +734,8 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, } else { return bucketEntryToUse.withWriteLock(offsetLock, () -> { if (backingMap.remove(cacheKey, bucketEntryToUse)) { - LOG.debug("removed key {} from back map in the evict process", cacheKey); + LOG.debug("removed key {} from back map with offset lock {} in the evict process", + cacheKey, bucketEntryToUse.offset()); blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess); return true; } @@ -1637,19 +1638,21 @@ protected String getAlgorithm() { @Override public int evictBlocksByHfileName(String hfileName) { fileNotFullyCached(hfileName); - Set keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), - true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true); - + Set keySet = getAllCacheKeysForFile(hfileName); int numEvicted = 0; for (BlockCacheKey key : keySet) { if (evictBlock(key)) { ++numEvicted; } } - return numEvicted; } + private Set getAllCacheKeysForFile(String hfileName) { + return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, + new BlockCacheKey(hfileName, Long.MAX_VALUE), true); + } + /** * Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each * priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate @@ -2062,26 +2065,33 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d entry.getKey().getHfileName().equals(fileName.getName()) && entry.getKey().getBlockType().equals(BlockType.DATA) ) { - LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", - fileName.getName(), entry.getKey().getOffset()); - ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset()); + long offsetToLock = entry.getValue().offset(); + LOG.debug("found block {} in the backing map. Acquiring read lock for offset {}", + entry.getKey(), offsetToLock); + ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock); lock.readLock().lock(); locks.add(lock); // rechecks the given key is still there (no eviction happened before the lock acquired) if (backingMap.containsKey(entry.getKey())) { count.increment(); + } else { + lock.readLock().unlock(); + locks.remove(lock); + LOG.debug("found block {}, but when locked and tried to count, it was gone."); } } }); + int metaCount = totalBlockCount - dataBlockCount; // BucketCache would only have data blocks if (dataBlockCount == count.getValue()) { LOG.debug("File {} has now been fully cached.", fileName); fileCacheCompleted(fileName, size); } else { LOG.debug( - "Prefetch executor completed for {}, but only {} blocks were cached. " - + "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.", - fileName.getName(), count.getValue(), dataBlockCount); + "Prefetch executor completed for {}, but only {} data blocks were cached. " + + "Total data blocks for file: {}. " + + "Checking for blocks pending cache in cache writer queue.", + fileName, count.getValue(), dataBlockCount); if (ramCache.hasBlocksForFile(fileName.getName())) { for (ReentrantReadWriteLock lock : locks) { lock.readLock().unlock(); @@ -2090,11 +2100,17 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d + "and try the verification again.", fileName.getName()); Thread.sleep(100); notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); - } else { - LOG.info("We found only {} blocks cached from a total of {} for file {}, " - + "but no blocks pending caching. Maybe cache is full or evictions " - + "happened concurrently to cache prefetch.", count, totalBlockCount, fileName); - } + } else + if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) == dataBlockCount) { + LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " + + "the cache write queue but we now found that total cached blocks for file {} " + + "is equal to data block count.", count, dataBlockCount, fileName.getName()); + fileCacheCompleted(fileName, size); + } else { + LOG.info("We found only {} data blocks cached from a total of {} for file {}, " + + "but no blocks pending caching. Maybe cache is full or evictions " + + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName); + } } } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java index 8f78c5fb418d..2fe56b06d2f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java @@ -21,6 +21,9 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; @@ -154,10 +157,10 @@ public void testCombinedBlockCacheStats(BlockType type, int expectedL1Miss, int ByteBuff.wrap(ByteBuffer.wrap(byteArr, 0, size)), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP); blockCache.cacheBlock(key, blk); - blockCache.getBlock(key, true, false, true); + assertNotNull(blockCache.getBlock(key, true, false, true)); assertEquals(0, blockCache.getStats().getMissCount()); - blockCache.evictBlock(key); - blockCache.getBlock(key, true, false, true); + assertTrue(blockCache.evictBlock(key)); + assertNull(blockCache.getBlock(key, true, false, true)); assertEquals(1, blockCache.getStats().getMissCount()); assertEquals(expectedL1Miss, blockCache.getFirstLevelCache().getStats().getMissCount()); assertEquals(expectedL2Miss, blockCache.getSecondLevelCache().getStats().getMissCount()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 6210c5ff2605..6d213ac8b40b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; -import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; @@ -35,14 +33,10 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; -import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; @@ -121,10 +115,10 @@ public void testPrefetchPersistence() throws Exception { assertEquals(0, usedSize); assertTrue(new File(testDir + "/bucket.cache").exists()); // Load Cache - Path storeFile = writeStoreFile("Region0", "TestPrefetch0"); - Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1"); - readStoreFile(storeFile, 0); - readStoreFile(storeFile2, 0); + Path storeFile = writeStoreFile("TestPrefetch0"); + Path storeFile2 = writeStoreFile("TestPrefetch1"); + readStoreFile(storeFile); + readStoreFile(storeFile2); usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); @@ -135,50 +129,22 @@ public void testPrefetchPersistence() throws Exception { testDir + "/bucket.persistence", 60 * 1000, conf); cacheConf = new CacheConfig(conf, bucketCache); assertTrue(usedSize != 0); - readStoreFile(storeFile, 0); - readStoreFile(storeFile2, 0); - // Test Close Store File - closeStoreFile(storeFile2); + assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); + assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName())); TEST_UTIL.cleanupTestDir(); } - public void closeStoreFile(Path path) throws Exception { - HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf); - assertTrue(bucketCache.fullyCachedFiles.containsKey(path.getName())); - int initialRegionPrefetchInfoSize = bucketCache.getRegionCachedInfo().get().size(); - assertTrue(initialRegionPrefetchInfoSize > 0); - reader.close(true); - assertFalse(bucketCache.fullyCachedFiles.containsKey(path.getName())); - int newRegionPrefetchInfoSize = bucketCache.getRegionCachedInfo().get().size(); - assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1); - } - - public void readStoreFile(Path storeFilePath, long offset) throws Exception { + public void readStoreFile(Path storeFilePath) throws Exception { // Open the file HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); - while (!reader.prefetchComplete()) { // Sleep for a bit Thread.sleep(1000); } - HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null); - BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); - BucketEntry be = bucketCache.backingMap.get(blockCacheKey); - boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null; - - if ( - block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX - || block.getBlockType() == BlockType.INTERMEDIATE_INDEX - ) { - assertTrue(isCached); - } } - public Path writeStoreFile(String regionName, String fname) throws IOException { - // Create store files as per the following directory structure - // // - Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName); - Path storeFileParentDir = new Path(regionDir, fname); + public Path writeStoreFile(String fname) throws IOException { + Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) .withOutputDir(storeFileParentDir).withFileContext(meta).build(); @@ -194,18 +160,6 @@ public Path writeStoreFile(String regionName, String fname) throws IOException { } sfw.close(); - - // Create a dummy .regioninfo file as the PrefetchExecutor needs it to figure out the region - // name to be added to the prefetch file list - Path regionInfoFilePath = new Path(storeFileParentDir, REGION_INFO_FILE); - File regionInfoFile = new File(regionInfoFilePath.toString()); - try { - if (!regionInfoFile.createNewFile()) { - assertFalse("Unable to create .regioninfo file", true); - } - } catch (IOException e) { - e.printStackTrace(); - } return sfw.getPath(); }