Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDENDUM] HBASE-28458 BucketCache.notifyFileCachingCompleted may incorrectly consider a file fully cached (#5777) #5791

Merged
merged 2 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
} else {
return bucketEntryToUse.withWriteLock(offsetLock, () -> {
if (backingMap.remove(cacheKey, bucketEntryToUse)) {
LOG.debug("removed key {} from back map in the evict process", cacheKey);
LOG.debug("removed key {} from back map with offset lock {} in the evict process",
cacheKey, bucketEntryToUse.offset());
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
return true;
}
Expand Down Expand Up @@ -1658,19 +1659,21 @@ protected String getAlgorithm() {
@Override
public int evictBlocksByHfileName(String hfileName) {
fileNotFullyCached(hfileName);
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);

Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName);
int numEvicted = 0;
for (BlockCacheKey key : keySet) {
if (evictBlock(key)) {
++numEvicted;
}
}

return numEvicted;
}

private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName) {
return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
}

/**
* Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
* priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
Expand Down Expand Up @@ -2083,25 +2086,32 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
entry.getKey().getHfileName().equals(fileName.getName())
&& entry.getKey().getBlockType().equals(BlockType.DATA)
) {
LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
fileName, entry.getKey().getOffset());
ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset());
long offsetToLock = entry.getValue().offset();
LOG.debug("found block {} in the backing map. Acquiring read lock for offset {}",
entry.getKey(), offsetToLock);
ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock);
lock.readLock().lock();
locks.add(lock);
// rechecks the given key is still there (no eviction happened before the lock acquired)
if (backingMap.containsKey(entry.getKey())) {
count.increment();
} else {
lock.readLock().unlock();
locks.remove(lock);
LOG.debug("found block {}, but when locked and tried to count, it was gone.");
}
}
});
int metaCount = totalBlockCount - dataBlockCount;
// BucketCache would only have data blocks
if (dataBlockCount == count.getValue()) {
LOG.debug("File {} has now been fully cached.", fileName);
fileCacheCompleted(fileName, size);
} else {
LOG.debug(
"Prefetch executor completed for {}, but only {} blocks were cached. "
+ "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.",
"Prefetch executor completed for {}, but only {} data blocks were cached. "
+ "Total data blocks for file: {}. "
+ "Checking for blocks pending cache in cache writer queue.",
fileName, count.getValue(), dataBlockCount);
if (ramCache.hasBlocksForFile(fileName.getName())) {
for (ReentrantReadWriteLock lock : locks) {
Expand All @@ -2111,11 +2121,17 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
+ "and try the verification again.", fileName);
Thread.sleep(100);
notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
} else {
LOG.info("We found only {} blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full or evictions "
+ "happened concurrently to cache prefetch.", count, totalBlockCount, fileName);
}
} else
if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) == dataBlockCount) {
LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in "
+ "the cache write queue but we now found that total cached blocks for file {} "
+ "is equal to data block count.", count, dataBlockCount, fileName.getName());
fileCacheCompleted(fileName, size);
} else {
LOG.info("We found only {} data blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full or evictions "
+ "happened concurrently to cache prefetch.", count, dataBlockCount, fileName);
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

Expand All @@ -34,11 +33,8 @@
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
Expand Down Expand Up @@ -121,8 +117,8 @@ public void testPrefetchPersistence() throws Exception {
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch0");
Path storeFile2 = writeStoreFile("TestPrefetch1");
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
readStoreFile(storeFile);
readStoreFile(storeFile2);
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);

Expand All @@ -133,39 +129,18 @@ public void testPrefetchPersistence() throws Exception {
testDir + "/bucket.persistence", 60 * 1000, conf);
cacheConf = new CacheConfig(conf, bucketCache);
assertTrue(usedSize != 0);
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
// Test Close Store File
closeStoreFile(storeFile2);
assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName()));
TEST_UTIL.cleanupTestDir();
}

public void closeStoreFile(Path path) throws Exception {
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
assertTrue(bucketCache.fullyCachedFiles.containsKey(path.getName()));
reader.close(true);
assertFalse(bucketCache.fullyCachedFiles.containsKey(path.getName()));
}

public void readStoreFile(Path storeFilePath, long offset) throws Exception {
public void readStoreFile(Path storeFilePath) throws Exception {
// Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);

while (!reader.prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
BucketEntry be = bucketCache.backingMap.get(blockCacheKey);
boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null;

if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(isCached);
}
}

public Path writeStoreFile(String fname) throws IOException {
Expand Down