Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27365 Minimise block addition failures due to no space in bucke… #4805

Merged
merged 4 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ public interface BlockCache extends Iterable<CachedBlock> {
*/
void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);

/**
* Add block to cache.
* @param cacheKey The block's cache key.
* @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory
* @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache
* is configured.
*/
default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
cacheBlock(cacheKey, buf, inMemory);
}

/**
* Add block to cache (defaults to not in-memory).
* @param cacheKey The block's cache key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ public long heapSize() {

@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
cacheBlock(cacheKey, buf, inMemory, false);
}

@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
boolean metaBlock = isMetaBlock(buf.getBlockType());
if (metaBlock) {
l1Cache.cacheBlock(cacheKey, buf, inMemory);
} else {
l2Cache.cacheBlock(cacheKey, buf, inMemory);
l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(),
isCompaction || cacheOnly);
}
});

Expand All @@ -1341,8 +1342,9 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
cacheConf.isInMemory(), isCompaction || cacheOnly);
chrajeshbabu marked this conversation as resolved.
Show resolved Hide resolved
}
});
if (unpacked != hfileBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private void doCacheOnWrite(long offset) {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock);
cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
cacheFormatBlock.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,6 @@ public class BucketCache implements BlockCache, HeapSize {

private static final int DEFAULT_CACHE_WAIT_TIME = 50;

/**
* Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
* some blocks when caching. If the flag is true, we will wait until blocks are flushed to
* IOEngine.
*/
boolean wait_when_cache = false;

private final BucketCacheStats cacheStats = new BucketCacheStats();

private final String persistencePath;
Expand Down Expand Up @@ -248,6 +241,10 @@ public class BucketCache implements BlockCache, HeapSize {
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

private static final String QUEUE_ADDITION_WAIT_TIME =
"hbase.bucketcache.queue.addition.waittime";
private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
* integrity, default algorithm is MD5
Expand Down Expand Up @@ -288,6 +285,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME,
DEFAULT_QUEUE_ADDITION_WAIT_TIME);
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);

sanityCheckConfigs();
Expand Down Expand Up @@ -432,7 +431,19 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
}

/**
* Cache the block with the specified name and buffer.
* @param cacheKey block's cache key
* @param cachedItem block buffer
* @param inMemory if block is in-memory
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean waitWhenCache) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
}

/**
Expand Down Expand Up @@ -491,7 +502,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
boolean successfulAddition = false;
if (wait) {
try {
successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -119,7 +120,6 @@ public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[
int writerThreads, int writerQLen, String persistencePath) throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
persistencePath);
super.wait_when_cache = true;
}

@Override
Expand Down Expand Up @@ -241,16 +241,16 @@ public static void waitUntilAllFlushedToBucket(BucketCache cache) throws Interru
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
Cacheable block) throws InterruptedException {
cache.cacheBlock(cacheKey, block);
Cacheable block, boolean waitWhenCache) throws InterruptedException {
cache.cacheBlock(cacheKey, block, false, waitWhenCache);
waitUntilFlushedToBucket(cache, cacheKey);
}

@Test
public void testMemoryLeak() throws Exception {
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
long lockId = cache.backingMap.get(cacheKey).offset();
ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
lock.writeLock().lock();
Expand All @@ -265,7 +265,7 @@ public void run() {
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
assertEquals(0, cache.getBlockCount());
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
Expand Down Expand Up @@ -341,7 +341,7 @@ private void testRetrievalUtils(Path testDir, String ioEngineName)
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), false);
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
Expand Down Expand Up @@ -403,7 +403,7 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), false);
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
Expand Down Expand Up @@ -786,7 +786,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception {

for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
hfileBlockPair.getBlock());
hfileBlockPair.getBlock(), false);
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
Expand All @@ -811,4 +811,63 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception {
}
}

/**
* This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
* could not be freed even if corresponding {@link HFileBlock} is evicted from
* {@link BucketCache}.
*/
@Test
public void testBlockAdditionWaitWhenCache() throws Exception {
try {
final Path dataTestDir = createAndGetTestDir();

String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";

BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, 1, 1, persistencePath);
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);

HFileBlockPair[] hfileBlockPairs =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
// Add blocks
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(),
false, true);
}

// Max wait for 10 seconds.
long timeout = 10000;
// Wait for blocks size to match the number of blocks.
while(bucketCache.backingMap.size() != 10 ) {
if (timeout <= 0) break;
Threads.sleep(100);
timeout=-100;
}
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());

// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());

for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
bucketCache.evictBlock(blockCacheKey);
}
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
} finally {
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ private void disableWriter() {
// Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
public void testBlockInRAMCache() throws IOException {
cache = create(1, 1000);
// Set this to true;
cache.wait_when_cache = true;
disableWriter();
final String prefix = "testBlockInRamCache";
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public void testPrefetchPersistence() throws Exception {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
bucketCache.wait_when_cache = true;
cacheConf = new CacheConfig(conf, bucketCache);

long usedSize = bucketCache.getAllocator().getUsedSize();
Expand All @@ -137,7 +136,6 @@ public void testPrefetchPersistence() throws Exception {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
bucketCache.wait_when_cache = true;
assertFalse(new File(testDir + "/bucket.persistence").exists());
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertTrue(usedSize != 0);
Expand Down
5 changes: 5 additions & 0 deletions hbase-server/src/test/resources/hbase-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,9 @@
<value>3</value>
<description>Default is unbounded</description>
</property>
<property>
<name>hbase.bucketcache.queue.addition.waittime</name>
<value>100</value>
<description>Default is 0</description>
</property>
</configuration>