Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28458 BucketCache.notifyFileCachingCompleted may incorrectly consider a file fully cached #5777

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2071,25 +2071,29 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
// so we need to count all blocks for this file in the backing map under
// a read lock for the block offset
final List<ReentrantReadWriteLock> locks = new ArrayList<>();
LOG.debug("Notifying caching completed for file {}, with total blocks {}", fileName,
dataBlockCount);
LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}",
fileName, totalBlockCount, dataBlockCount);
try {
final MutableInt count = new MutableInt();
LOG.debug("iterating over {} entries in the backing map", backingMap.size());
backingMap.entrySet().stream().forEach(entry -> {
if (entry.getKey().getHfileName().equals(fileName.getName())) {
if (
entry.getKey().getHfileName().equals(fileName.getName())
&& entry.getKey().getBlockType().equals(BlockType.DATA)
) {
LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
fileName, entry.getKey().getOffset());
ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset());
lock.readLock().lock();
locks.add(lock);
// rechecks the given key is still there (no eviction happened before the lock acquired)
if (backingMap.containsKey(entry.getKey())) {
count.increment();
}
}
});
// We may either place only data blocks on the BucketCache or all type of blocks
if (dataBlockCount == count.getValue() || totalBlockCount == count.getValue()) {
// BucketCache would only have data blocks
if (dataBlockCount == count.getValue()) {
LOG.debug("File {} has now been fully cached.", fileName);
fileCacheCompleted(fileName, size);
} else {
Expand All @@ -2098,15 +2102,17 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
+ "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.",
fileName, count.getValue(), dataBlockCount);
if (ramCache.hasBlocksForFile(fileName.getName())) {
for (ReentrantReadWriteLock lock : locks) {
lock.readLock().unlock();
}
LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms "
+ "and try the verification again.", fileName);
Thread.sleep(100);
notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
} else {
LOG.info(
"We found only {} blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full?",
count, dataBlockCount, fileName);
LOG.info("We found only {} blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full or evictions "
+ "happened concurrently to cache prefetch.", count, totalBlockCount, fileName);
}
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void setup() throws Exception {
@Test
public void testBlockEvictionOnRegionMove() throws Exception {
// Write to table and flush
TableName tableRegionMove = writeDataToTable();
TableName tableRegionMove = writeDataToTable("testBlockEvictionOnRegionMove");

HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1
Expand All @@ -115,7 +116,7 @@ public void testBlockEvictionOnRegionMove() throws Exception {
@Test
public void testBlockEvictionOnGracefulStop() throws Exception {
// Write to table and flush
TableName tableRegionClose = writeDataToTable();
TableName tableRegionClose = writeDataToTable("testBlockEvictionOnGracefulStop");

HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1
Expand All @@ -138,8 +139,8 @@ public void testBlockEvictionOnGracefulStop() throws Exception {
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
}

public TableName writeDataToTable() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf("table1");
public TableName writeDataToTable(String testName) throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(testName + EnvironmentEdgeManager.currentTime());
byte[] row0 = Bytes.toBytes("row1");
byte[] row1 = Bytes.toBytes("row2");
byte[] family = Bytes.toBytes("family");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,21 @@ public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception {
// Load Blocks in cache
Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
HFile.createReader(fs, storeFile, cacheConf, true, conf);
while (bucketCache.backingMap.size() == 0) {
boolean evicted = false;
while (!PrefetchExecutor.isCompleted(storeFile)) {
if (bucketCache.backingMap.size() > 0 && !evicted) {
Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
bucketCache.backingMap.entrySet().iterator();
// Evict a data block from cache
Map.Entry<BlockCacheKey, BucketEntry> entry = it.next();
while (it.hasNext() && !evicted) {
if (entry.getKey().getBlockType().equals(BlockType.DATA)) {
evicted = bucketCache.evictBlock(it.next().getKey());
}
}
}
Thread.sleep(10);
}
Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
bucketCache.backingMap.entrySet().iterator();
// Evict Blocks from cache
bucketCache.evictBlock(it.next().getKey());
bucketCache.evictBlock(it.next().getKey());
int retries = 0;
while (!PrefetchExecutor.isCompleted(storeFile) && retries < 5) {
Thread.sleep(500);
retries++;
}
assertTrue(retries < 5);
assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
cleanupBucketCache(bucketCache);
}
Expand Down