Skip to content

Commit

Permalink
HBASE-27686: Recovery of BucketCache and Prefetched data after RS Cra…
Browse files Browse the repository at this point in the history
…sh (#5080)

Signed-off-by: Wellington Ramos Chevreuil <[email protected]>
(cherry picked from commit 58cb1f4)
  • Loading branch information
Kota-SH authored and wchevreuil committed Mar 16, 2023
1 parent 59434e9 commit d136c6d
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public class CacheConfig {

public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path";

/**
* Configuration key to set interval for persisting bucket cache to disk.
*/
public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
"hbase.bucketcache.persist.intervalinmillis";

// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static void persistToFile(String path) throws IOException {
throw new IOException("Error persisting prefetched HFiles set!");
}
if (!prefetchCompleted.isEmpty()) {
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) {
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) {
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;

import java.io.File;
Expand Down Expand Up @@ -173,6 +174,7 @@ public class BucketCache implements BlockCache, HeapSize {
private final BucketCacheStats cacheStats = new BucketCacheStats();

private final String persistencePath;
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
private final long cacheCapacity;
/** Approximate block size */
private final long blockSize;
Expand Down Expand Up @@ -233,6 +235,8 @@ public class BucketCache implements BlockCache, HeapSize {

private String prefetchedFileListPath;

private long bucketcachePersistInterval;

private static final String FILE_VERIFY_ALGORITHM =
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
Expand Down Expand Up @@ -278,6 +282,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);

sanityCheckConfigs();

Expand All @@ -303,6 +308,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);

if (ioEngine.isPersistent() && persistencePath != null) {
startBucketCachePersisterThread();
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
Expand Down Expand Up @@ -359,6 +365,12 @@ protected void startWriterThreads() {
}
}

void startBucketCachePersisterThread() {
BucketCachePersister cachePersister =
new BucketCachePersister(this, bucketcachePersistInterval);
cachePersister.start();
}

boolean isCacheEnabled() {
return this.cacheEnabled;
}
Expand Down Expand Up @@ -586,6 +598,9 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre
if (evictedByEvictionProcess) {
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
}
if (ioEngine.isPersistent()) {
setCacheInconsistent(true);
}
}

/**
Expand Down Expand Up @@ -710,6 +725,14 @@ protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
});
}

public boolean isCacheInconsistent() {
return isCacheInconsistent.get();
}

public void setCacheInconsistent(boolean setCacheInconsistent) {
isCacheInconsistent.set(setCacheInconsistent);
}

/*
* Statistics thread. Periodically output cache statistics to the log.
*/
Expand Down Expand Up @@ -1156,6 +1179,9 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
// Only add if non-null entry.
if (bucketEntries[i] != null) {
putIntoBackingMap(key, bucketEntries[i]);
if (ioEngine.isPersistent()) {
setCacheInconsistent(true);
}
}
// Always remove from ramCache even if we failed adding it to the block cache above.
boolean existed = ramCache.remove(key, re -> {
Expand Down Expand Up @@ -1205,8 +1231,7 @@ static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
justification = "false positive, try-with-resources ensures close is called.")
private void persistToFile() throws IOException {
assert !cacheEnabled;
void persistToFile() throws IOException {
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class BucketCachePersister extends Thread {
private final BucketCache cache;
private final long intervalMillis;
private static final Logger LOG = LoggerFactory.getLogger(BucketCachePersister.class);

public BucketCachePersister(BucketCache cache, long intervalMillis) {
super("bucket-cache-persister");
this.cache = cache;
this.intervalMillis = intervalMillis;
LOG.info("BucketCachePersister started with interval: " + intervalMillis);
}

public void run() {
while (true) {
try {
Thread.sleep(intervalMillis);
if (cache.isCacheInconsistent()) {
LOG.debug("Cache is inconsistent, persisting to disk");
cache.persistToFile();
cache.setCacheInconsistent(false);
}
} catch (IOException | InterruptedException e) {
LOG.warn("Exception in BucketCachePersister" + e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void setup() throws Exception {
}

@Test
public void testRegionClosePrefetchPersistence() throws Exception {
public void testPrefetchPersistence() throws Exception {

// Write to table and flush
TableName tableName = TableName.valueOf("table1");
byte[] row0 = Bytes.toBytes("row1");
Expand All @@ -106,8 +107,14 @@ public void testRegionClosePrefetchPersistence() throws Exception {
table.put(put1);
TEST_UTIL.flush(tableName);
} finally {
Thread.sleep(1000);
Thread.sleep(1500);
}

// Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files
// should exist.
assertTrue(new File(testDir + "/bucket.persistence").exists());
assertTrue(new File(testDir + "/prefetch.persistence").exists());

// Stop the RS
cluster.stopRegionServer(0);
LOG.info("Stopped Region Server 0.");
Expand All @@ -117,27 +124,14 @@ public void testRegionClosePrefetchPersistence() throws Exception {

// Start the RS and validate
cluster.startRegionServer();
Thread.sleep(1000);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
}

@Test
public void testPrefetchPersistenceNegative() throws Exception {
cluster.stopRegionServer(0);
LOG.info("Stopped Region Server 0.");
Thread.sleep(1000);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertTrue(new File(testDir + "/bucket.persistence").exists());
cluster.startRegionServer();
Thread.sleep(1000);
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
}

@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
if (zkCluster != null) {
zkCluster.shutdown();
}
Expand Down
Loading

0 comments on commit d136c6d

Please sign in to comment.