Skip to content

Commit

Permalink
CDPD-68550: Backport [ADDENDUM] HBASE-28458 BucketCache.notifyFileCac…
Browse files Browse the repository at this point in the history
…hingCompleted may incorrectly consider a file fully cached (apache#5777) (apache#5791)

Signed-off-by: Peter Somogyi <[email protected]>
(cherry picked from commit d7566ab)
Change-Id: I27599d2c82cf2dea074bba8caa1e6ba5b14b764a
  • Loading branch information
wchevreuil authored and Wellington Ramos Chevreuil committed May 10, 2024
1 parent b5a9f7c commit f28001c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
} else {
return bucketEntryToUse.withWriteLock(offsetLock, () -> {
if (backingMap.remove(cacheKey, bucketEntryToUse)) {
LOG.debug("removed key {} from back map in the evict process", cacheKey);
LOG.debug("removed key {} from back map with offset lock {} in the evict process",
cacheKey, bucketEntryToUse.offset());
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, evictedByEvictionProcess);
return true;
}
Expand Down Expand Up @@ -1637,19 +1638,21 @@ protected String getAlgorithm() {
@Override
public int evictBlocksByHfileName(String hfileName) {
fileNotFullyCached(hfileName);
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);

Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName);
int numEvicted = 0;
for (BlockCacheKey key : keySet) {
if (evictBlock(key)) {
++numEvicted;
}
}

return numEvicted;
}

private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName) {
return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true,
new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
}

/**
* Used to group bucket entries into priority buckets. There will be a BucketEntryGroup for each
* priority (single, multi, memory). Once bucketed, the eviction algorithm takes the appropriate
Expand Down Expand Up @@ -2062,26 +2065,33 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
entry.getKey().getHfileName().equals(fileName.getName())
&& entry.getKey().getBlockType().equals(BlockType.DATA)
) {
LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
fileName.getName(), entry.getKey().getOffset());
ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset());
long offsetToLock = entry.getValue().offset();
LOG.debug("found block {} in the backing map. Acquiring read lock for offset {}",
entry.getKey(), offsetToLock);
ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock);
lock.readLock().lock();
locks.add(lock);
// rechecks the given key is still there (no eviction happened before the lock acquired)
if (backingMap.containsKey(entry.getKey())) {
count.increment();
} else {
lock.readLock().unlock();
locks.remove(lock);
LOG.debug("found block {}, but when locked and tried to count, it was gone.");
}
}
});
int metaCount = totalBlockCount - dataBlockCount;
// BucketCache would only have data blocks
if (dataBlockCount == count.getValue()) {
LOG.debug("File {} has now been fully cached.", fileName);
fileCacheCompleted(fileName, size);
} else {
LOG.debug(
"Prefetch executor completed for {}, but only {} blocks were cached. "
+ "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.",
fileName.getName(), count.getValue(), dataBlockCount);
"Prefetch executor completed for {}, but only {} data blocks were cached. "
+ "Total data blocks for file: {}. "
+ "Checking for blocks pending cache in cache writer queue.",
fileName, count.getValue(), dataBlockCount);
if (ramCache.hasBlocksForFile(fileName.getName())) {
for (ReentrantReadWriteLock lock : locks) {
lock.readLock().unlock();
Expand All @@ -2090,11 +2100,17 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
+ "and try the verification again.", fileName.getName());
Thread.sleep(100);
notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
} else {
LOG.info("We found only {} blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full or evictions "
+ "happened concurrently to cache prefetch.", count, totalBlockCount, fileName);
}
} else
if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) == dataBlockCount) {
LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in "
+ "the cache write queue but we now found that total cached blocks for file {} "
+ "is equal to data block count.", count, dataBlockCount, fileName.getName());
fileCacheCompleted(fileName, size);
} else {
LOG.info("We found only {} data blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full or evictions "
+ "happened concurrently to cache prefetch.", count, dataBlockCount, fileName);
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -154,10 +157,10 @@ public void testCombinedBlockCacheStats(BlockType type, int expectedL1Miss, int
ByteBuff.wrap(ByteBuffer.wrap(byteArr, 0, size)), HFileBlock.FILL_HEADER, -1, 52, -1, meta,
HEAP);
blockCache.cacheBlock(key, blk);
blockCache.getBlock(key, true, false, true);
assertNotNull(blockCache.getBlock(key, true, false, true));
assertEquals(0, blockCache.getStats().getMissCount());
blockCache.evictBlock(key);
blockCache.getBlock(key, true, false, true);
assertTrue(blockCache.evictBlock(key));
assertNull(blockCache.getBlock(key, true, false, true));
assertEquals(1, blockCache.getStats().getMissCount());
assertEquals(expectedL1Miss, blockCache.getFirstLevelCache().getStats().getMissCount());
assertEquals(expectedL2Miss, blockCache.getSecondLevelCache().getStats().getMissCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

Expand All @@ -35,14 +33,10 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
Expand Down Expand Up @@ -121,10 +115,10 @@ public void testPrefetchPersistence() throws Exception {
assertEquals(0, usedSize);
assertTrue(new File(testDir + "/bucket.cache").exists());
// Load Cache
Path storeFile = writeStoreFile("Region0", "TestPrefetch0");
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1");
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
Path storeFile = writeStoreFile("TestPrefetch0");
Path storeFile2 = writeStoreFile("TestPrefetch1");
readStoreFile(storeFile);
readStoreFile(storeFile2);
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);

Expand All @@ -135,50 +129,22 @@ public void testPrefetchPersistence() throws Exception {
testDir + "/bucket.persistence", 60 * 1000, conf);
cacheConf = new CacheConfig(conf, bucketCache);
assertTrue(usedSize != 0);
readStoreFile(storeFile, 0);
readStoreFile(storeFile2, 0);
// Test Close Store File
closeStoreFile(storeFile2);
assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile2.getName()));
TEST_UTIL.cleanupTestDir();
}

public void closeStoreFile(Path path) throws Exception {
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
assertTrue(bucketCache.fullyCachedFiles.containsKey(path.getName()));
int initialRegionPrefetchInfoSize = bucketCache.getRegionCachedInfo().get().size();
assertTrue(initialRegionPrefetchInfoSize > 0);
reader.close(true);
assertFalse(bucketCache.fullyCachedFiles.containsKey(path.getName()));
int newRegionPrefetchInfoSize = bucketCache.getRegionCachedInfo().get().size();
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
}

public void readStoreFile(Path storeFilePath, long offset) throws Exception {
public void readStoreFile(Path storeFilePath) throws Exception {
// Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);

while (!reader.prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
BucketEntry be = bucketCache.backingMap.get(blockCacheKey);
boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) != null;

if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
assertTrue(isCached);
}
}

public Path writeStoreFile(String regionName, String fname) throws IOException {
// Create store files as per the following directory structure
// <region name>/<column family>/<hFile>
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
Path storeFileParentDir = new Path(regionDir, fname);
public Path writeStoreFile(String fname) throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
Expand All @@ -194,18 +160,6 @@ public Path writeStoreFile(String regionName, String fname) throws IOException {
}

sfw.close();

// Create a dummy .regioninfo file as the PrefetchExecutor needs it to figure out the region
// name to be added to the prefetch file list
Path regionInfoFilePath = new Path(storeFileParentDir, REGION_INFO_FILE);
File regionInfoFile = new File(regionInfoFilePath.toString());
try {
if (!regionInfoFile.createNewFile()) {
assertFalse("Unable to create .regioninfo file", true);
}
} catch (IOException e) {
e.printStackTrace();
}
return sfw.getPath();
}

Expand Down

0 comments on commit f28001c

Please sign in to comment.