Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27365 Minimise block addition failures due to no space in bucke… #4805

Merged
merged 4 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory)
cacheBlock(cacheKey, buf);
}

@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
cacheBlock(cacheKey, buf, inMemory);
}

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public interface BlockCache extends Iterable<CachedBlock> {
*/
void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);

/**
* Add block to cache.
* @param cacheKey The block's cache key.
* @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory
* @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache
* is configured.
*/
void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, boolean waitWhenCache);

/**
* Add block to cache (defaults to not in-memory).
* @param cacheKey The block's cache key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,16 @@ public long heapSize() {

@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
cacheBlock(cacheKey, buf, inMemory, false);
}

@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
boolean metaBlock = isMetaBlock(buf.getBlockType());
if (metaBlock) {
l1Cache.cacheBlock(cacheKey, buf, inMemory);
l1Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
} else {
l2Cache.cacheBlock(cacheKey, buf, inMemory);
l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1341,8 +1341,10 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
cacheConf.isInMemory(), isCompaction ||
(pread && expectedBlockType == null && expectedDataBlockEncoding == null));
chrajeshbabu marked this conversation as resolved.
Show resolved Hide resolved
}
});
if (unpacked != hfileBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private void doCacheOnWrite(long offset) {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock);
cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
cacheFormatBlock.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory)
}
}

@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Make this the default impl in the interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good idea and done in new changes.

boolean waitWhenCache) {
cacheBlock(cacheKey, buf, inMemory);
}

/**
* Sanity-checking for parity between actual block cache content and metrics. Intended only for
* use with TRACE level logging and -ea JVM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
cacheBlock(cacheKey, buf, false);
}

@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
cacheBlock(cacheKey, buf, inMemory);
}

/**
* Helper function that updates the local size counter and also updates any per-cf or
* per-blocktype metrics it can discern from given {@link LruCachedBlock}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory
cacheBlock(cacheKey, value);
}

@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory,
boolean waitWhenCache) {
cacheBlock(cacheKey, value);
}

@Override
public void cacheBlock(BlockCacheKey key, Cacheable value) {
if (value.heapSize() > maxBlockSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ public class BucketCache implements BlockCache, HeapSize {
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

private static final String QUEUE_ADDITION_WAIT_TIME =
"hbase.bucketcache.queue.addition.waittime";
private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
* integrity, default algorithm is MD5
Expand Down Expand Up @@ -288,6 +292,8 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME,
DEFAULT_QUEUE_ADDITION_WAIT_TIME);
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);

sanityCheckConfigs();
Expand Down Expand Up @@ -435,6 +441,18 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inM
cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
}

/**
* Cache the block with the specified name and buffer.
* @param cacheKey block's cache key
* @param cachedItem block buffer
* @param inMemory if block is in-memory
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean waitWhenCache) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
}

/**
* Cache the block to ramCache
* @param cacheKey block's cache key
Expand Down Expand Up @@ -491,7 +509,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
boolean successfulAddition = false;
if (wait) {
try {
successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -811,4 +812,56 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception {
}
}

/**
* This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
* could not be freed even if corresponding {@link HFileBlock} is evicted from
* {@link BucketCache}.
*/
@Test
public void testBlockAdditionWaitWhenCache() throws Exception {
try {
final Path dataTestDir = createAndGetTestDir();

String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";

BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, 1, 1, persistencePath);
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);

HFileBlockPair[] hfileBlockPairs =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
// Add blocks
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(),
false, true);
}

// Wait for blocks size to match the number of blocks.
while(bucketCache.backingMap.size() != 10) {
Threads.sleep(100);
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());

// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());

for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
bucketCache.evictBlock(blockCacheKey);
}
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
} finally {
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,11 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {

}

@Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {

}

@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
Expand Down
5 changes: 5 additions & 0 deletions hbase-server/src/test/resources/hbase-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,9 @@
<value>3</value>
<description>Default is unbounded</description>
</property>
<property>
<name>hbase.bucketcache.queue.addition.waittime</name>
<value>100</value>
<description>Default is 0</description>
</property>
</configuration>