diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 57f71b31894e..912a3ab524fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -222,13 +222,8 @@ public class BucketCache implements BlockCache, HeapSize { */ transient final IdReadWriteLock offsetLock; - final NavigableSet blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> { - int nameComparison = a.getHfileName().compareTo(b.getHfileName()); - if (nameComparison != 0) { - return nameComparison; - } - return Long.compare(a.getOffset(), b.getOffset()); - }); + NavigableSet blocksByHFile = new ConcurrentSkipListSet<>( + Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); /** Statistics thread schedule pool (for heavy debugging, could remove) */ private transient final ScheduledExecutorService scheduleThreadPool = @@ -1471,8 +1466,11 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String } private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { - backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), - this::createRecycler); + Pair, NavigableSet> pair = + BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), + this::createRecycler); + backingMap = pair.getFirst(); + blocksByHFile = pair.getSecond(); fullyCachedFiles.clear(); fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); if (proto.hasChecksum()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 7cc5050506e4..4b42414fb9c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.IOException; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; +import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.function.Function; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; @@ -121,10 +124,12 @@ private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) { } } - static ConcurrentHashMap fromPB(Map deserializers, - BucketCacheProtos.BackingMap backingMap, Function createRecycler) - throws IOException { + static Pair, NavigableSet> fromPB( + Map deserializers, BucketCacheProtos.BackingMap backingMap, + Function createRecycler) throws IOException { ConcurrentHashMap result = new ConcurrentHashMap<>(); + NavigableSet resultSet = new ConcurrentSkipListSet<>(Comparator + .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { BucketCacheProtos.BlockCacheKey protoKey = entry.getKey(); BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), @@ -153,8 +158,9 @@ static ConcurrentHashMap fromPB(Map throw new IOException("Unknown deserializer class found: " + deserializerClass); } result.put(key, value); + resultSet.add(key); } - return result; + return new Pair<>(result, resultSet); } private static BlockType fromPb(BucketCacheProtos.BlockType blockType) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index ad91d01f8cfd..63ff334826d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -108,6 +108,39 @@ public void testBucketCacheRecovery() throws Exception { TEST_UTIL.cleanupTestDir(); } + @Test + public void testBucketCacheEvictByHFileAfterRecovery() throws Exception { + HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + // Disables the persister thread by setting its interval to MAX_VALUE + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4); + + // Add four blocks + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); + // saves the current state of the cache + bucketCache.persistToFile(); + + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + Thread.sleep(100); + assertEquals(4, newBucketCache.backingMap.size()); + newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName()); + assertEquals(3, newBucketCache.backingMap.size()); + TEST_UTIL.cleanupTestDir(); + } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {