Skip to content

Commit

Permalink
HBASE-27365 Minimise block addition failures due to no space in bucke…
Browse files Browse the repository at this point in the history
…t cache writers queue by introducing wait time (#4805)

Co-authored-by: Rajeshbabu Chintaguntla <[email protected]>
  • Loading branch information
chrajeshbabu and chrajeshbabu authored Oct 3, 2022
1 parent 23a5633 commit 14b3899
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ public interface BlockCache extends Iterable<CachedBlock> {
*/
void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);

/**
* Add block to cache.
* @param cacheKey The block's cache key.
* @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory
* @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is
* configured.
*/
default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
cacheBlock(cacheKey, buf, inMemory);
}

/**
* Add block to cache (defaults to not in-memory).
* @param cacheKey The block's cache key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,17 @@ public long heapSize() {

@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
cacheBlock(cacheKey, buf, inMemory, false);
}

@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
boolean metaBlock = isMetaBlock(buf.getBlockType());
if (metaBlock) {
l1Cache.cacheBlock(cacheKey, buf, inMemory);
} else {
l2Cache.cacheBlock(cacheKey, buf, inMemory);
l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
}
});

Expand All @@ -1341,8 +1341,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly);
}
});
if (unpacked != hfileBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private void doCacheOnWrite(long offset) {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock);
cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
cacheFormatBlock.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,6 @@ public class BucketCache implements BlockCache, HeapSize {

private static final int DEFAULT_CACHE_WAIT_TIME = 50;

/**
* Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
* some blocks when caching. If the flag is true, we will wait until blocks are flushed to
* IOEngine.
*/
boolean wait_when_cache = false;

private final BucketCacheStats cacheStats = new BucketCacheStats();

private final String persistencePath;
Expand Down Expand Up @@ -248,6 +241,10 @@ public class BucketCache implements BlockCache, HeapSize {
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

private static final String QUEUE_ADDITION_WAIT_TIME =
"hbase.bucketcache.queue.addition.waittime";
private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
* integrity, default algorithm is MD5
Expand Down Expand Up @@ -288,6 +285,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);

sanityCheckConfigs();
Expand Down Expand Up @@ -432,7 +431,19 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
}

/**
* Cache the block with the specified name and buffer.
* @param cacheKey block's cache key
* @param cachedItem block buffer
* @param inMemory if block is in-memory
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean waitWhenCache) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
}

/**
Expand Down Expand Up @@ -491,7 +502,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
boolean successfulAddition = false;
if (wait) {
try {
successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -119,7 +120,6 @@ public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[
int writerThreads, int writerQLen, String persistencePath) throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
persistencePath);
super.wait_when_cache = true;
}

@Override
Expand Down Expand Up @@ -241,16 +241,16 @@ public static void waitUntilAllFlushedToBucket(BucketCache cache) throws Interru
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
Cacheable block) throws InterruptedException {
cache.cacheBlock(cacheKey, block);
Cacheable block, boolean waitWhenCache) throws InterruptedException {
cache.cacheBlock(cacheKey, block, false, waitWhenCache);
waitUntilFlushedToBucket(cache, cacheKey);
}

@Test
public void testMemoryLeak() throws Exception {
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
long lockId = cache.backingMap.get(cacheKey).offset();
ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
lock.writeLock().lock();
Expand All @@ -265,7 +265,7 @@ public void run() {
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
assertEquals(0, cache.getBlockCount());
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
Expand Down Expand Up @@ -341,7 +341,8 @@ private void testRetrievalUtils(Path testDir, String ioEngineName)
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
false);
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
Expand Down Expand Up @@ -403,7 +404,8 @@ public void testRetrieveFromFileWithoutPersistence() throws Exception {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
false);
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
Expand Down Expand Up @@ -786,7 +788,7 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception {

for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
hfileBlockPair.getBlock());
hfileBlockPair.getBlock(), false);
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
Expand All @@ -811,4 +813,63 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception {
}
}

/**
* This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
* could not be freed even if corresponding {@link HFileBlock} is evicted from
* {@link BucketCache}.
*/
@Test
public void testBlockAdditionWaitWhenCache() throws Exception {
try {
final Path dataTestDir = createAndGetTestDir();

String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";

BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, 1, 1, persistencePath);
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);

HFileBlockPair[] hfileBlockPairs =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
// Add blocks
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
true);
}

// Max wait for 10 seconds.
long timeout = 10000;
// Wait for blocks size to match the number of blocks.
while (bucketCache.backingMap.size() != 10) {
if (timeout <= 0) break;
Threads.sleep(100);
timeout = -100;
}
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());

// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());

for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
bucketCache.evictBlock(blockCacheKey);
}
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
} finally {
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ private void disableWriter() {
// Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
public void testBlockInRAMCache() throws IOException {
cache = create(1, 1000);
// Set this to true;
cache.wait_when_cache = true;
disableWriter();
final String prefix = "testBlockInRamCache";
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public void testPrefetchPersistence() throws Exception {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
bucketCache.wait_when_cache = true;
cacheConf = new CacheConfig(conf, bucketCache);

long usedSize = bucketCache.getAllocator().getUsedSize();
Expand All @@ -137,7 +136,6 @@ public void testPrefetchPersistence() throws Exception {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
bucketCache.wait_when_cache = true;
assertFalse(new File(testDir + "/bucket.persistence").exists());
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertTrue(usedSize != 0);
Expand Down
5 changes: 5 additions & 0 deletions hbase-server/src/test/resources/hbase-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,9 @@
<value>3</value>
<description>Default is unbounded</description>
</property>
<property>
<name>hbase.bucketcache.queue.addition.waittime</name>
<value>100</value>
<description>Default is 0</description>
</property>
</configuration>

0 comments on commit 14b3899

Please sign in to comment.