Skip to content

Commit

Permalink
HBASE-27752: Update the list of prefetched files upon region movement (
Browse files Browse the repository at this point in the history
…#5194) (#5222)

Signed-off-by: Wellington Chevreuil <[email protected]>

(cherry picked from commit ece8d01)
  • Loading branch information
Kota-SH authored May 9, 2023
1 parent d89bb1c commit a4eb499
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static void request(Path path, Runnable runnable) {
public static void complete(Path path) {
prefetchFutures.remove(path);
prefetchCompleted.put(path.getName(), true);
LOG.debug("Prefetch completed for {}", path);
LOG.debug("Prefetch completed for {}", path.getName());
}

public static void cancel(Path path) {
Expand All @@ -134,7 +134,8 @@ public static void cancel(Path path) {
prefetchFutures.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
prefetchCompleted.remove(path.getName());
LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName());
removePrefetchedFileWhileEvict(path.getName());
}

public static boolean isCompleted(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String
}
}

public boolean isCachePersistenceEnabled() {
return (prefetchedFileListPath != null) && (persistencePath != null);
}

/**
* Cache the block with the specified name and buffer.
* @param cacheKey block's cache key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
Expand Down Expand Up @@ -1550,11 +1552,35 @@ public Map<byte[], List<HStoreFile>> close() throws IOException {
public static final String CLOSE_WAIT_INTERVAL = "hbase.regionserver.close.wait.interval.ms";
public static final long DEFAULT_CLOSE_WAIT_INTERVAL = 10000; // 10 seconds

public Map<byte[], List<HStoreFile>> close(boolean abort) throws IOException {
return close(abort, false);
}

/**
* Close this HRegion.
* @param abort true if server is aborting (only during testing)
* @param ignoreStatus true if ignore the status (won't be showed on task list)
* @return Vector of all the storage files that the HRegion's component HStores make use of. It's
* a list of StoreFile objects. Can be null if we are not to close at this time, or we are
* already closed.
* @throws IOException e
* @throws DroppedSnapshotException Thrown when replay of wal is required because a Snapshot was
* not properly persisted. The region is put in closing mode, and
* the caller MUST abort after this.
*/
public Map<byte[], List<HStoreFile>> close(boolean abort, boolean ignoreStatus)
throws IOException {
return close(abort, ignoreStatus, false);
}

/**
* Close down this HRegion. Flush the cache unless abort parameter is true, Shut down each HStore,
* don't service any more calls. This method could take some time to execute, so don't call it
* from a time-sensitive thread.
* @param abort true if server is aborting (only during testing)
* @param abort true if server is aborting (only during testing)
* @param ignoreStatus true if ignore the status (wont be showed on task list)
* @param isGracefulStop true if region is being closed during graceful stop and the blocks in the
* BucketCache should not be evicted.
* @return Vector of all the storage files that the HRegion's component HStores make use of. It's
* a list of StoreFile objects. Can be null if we are not to close at this time or we are
* already closed.
Expand All @@ -1563,7 +1589,8 @@ public Map<byte[], List<HStoreFile>> close() throws IOException {
* not properly persisted. The region is put in closing mode, and
* the caller MUST abort after this.
*/
public Map<byte[], List<HStoreFile>> close(boolean abort) throws IOException {
public Map<byte[], List<HStoreFile>> close(boolean abort, boolean ignoreStatus,
boolean isGracefulStop) throws IOException {
// Only allow one thread to close at a time. Serialize them so dual
// threads attempting to close will run up against each other.
MonitoredTask status = TaskMonitor.get().createStatus(
Expand All @@ -1572,6 +1599,22 @@ public Map<byte[], List<HStoreFile>> close(boolean abort) throws IOException {
status.setStatus("Waiting for close lock");
try {
synchronized (closeLock) {
if (isGracefulStop && rsServices != null) {
rsServices.getBlockCache().ifPresent(blockCache -> {
if (blockCache instanceof CombinedBlockCache) {
BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache();
if (l2 instanceof BucketCache) {
if (((BucketCache) l2).isCachePersistenceEnabled()) {
LOG.info(
"Closing region {} during a graceful stop, and cache persistence is on, "
+ "so setting evict on close to false. ",
this.getRegionInfo().getRegionNameAsString());
this.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(false));
}
}
}
});
}
return doClose(abort, status);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void process() throws IOException {
}

// Close the region
if (region.close(abort) == null) {
if (region.close(abort, false, true) == null) {
// This region has already been closed. Should not happen (A unit test makes this
// happen as a side effect, TestRegionObserverInterface.testPreWALAppendNotCalledOnMetaEdit)
LOG.warn("Can't close {}; already closed", name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ IOTests.class, MediumTests.class })
public class TestBlockEvictionOnRegionMovement {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockEvictionOnRegionMovement.class);

private static final Logger LOG =
LoggerFactory.getLogger(TestBlockEvictionOnRegionMovement.class);

private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

private Configuration conf;
Path testDir;
MiniZooKeeperCluster zkCluster;
MiniHBaseCluster cluster;
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();

@Before
public void setup() throws Exception {
conf = TEST_UTIL.getConfiguration();
testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);

conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
conf.setInt("hbase.bucketcache.size", 400);
conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence");
conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence");
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100);
conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true);
zkCluster = TEST_UTIL.startMiniZKCluster();
cluster = TEST_UTIL.startMiniHBaseCluster(option);
cluster.setConf(conf);
}

@Test
public void testBlockEvictionOnRegionMove() throws Exception {
// Write to table and flush
TableName tableRegionMove = writeDataToTable();

HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1
? cluster.getRegionServer(1)
: cluster.getRegionServer(0);
assertTrue(regionServingRS.getBlockCache().isPresent());
long oldUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());

Admin admin = TEST_UTIL.getAdmin();
RegionInfo regionToMove = regionServingRS.getRegions(tableRegionMove).get(0).getRegionInfo();
admin.move(regionToMove.getEncodedNameAsBytes(),
TEST_UTIL.getOtherRegionServer(regionServingRS).getServerName());
assertEquals(0, regionServingRS.getRegions(tableRegionMove).size());

long newUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertTrue(oldUsedCacheSize > newUsedCacheSize);
assertEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
}

@Test
public void testBlockEvictionOnGracefulStop() throws Exception {
// Write to table and flush
TableName tableRegionClose = writeDataToTable();

HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1
? cluster.getRegionServer(1)
: cluster.getRegionServer(0);

assertTrue(regionServingRS.getBlockCache().isPresent());
long oldUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());

cluster.stopRegionServer(regionServingRS.getServerName());
Thread.sleep(500);
cluster.startRegionServer();
Thread.sleep(500);

long newUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertEquals(oldUsedCacheSize, newUsedCacheSize);
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
}

public TableName writeDataToTable() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf("table1");
byte[] row0 = Bytes.toBytes("row1");
byte[] row1 = Bytes.toBytes("row2");
byte[] family = Bytes.toBytes("family");
byte[] qf1 = Bytes.toBytes("qf1");
byte[] qf2 = Bytes.toBytes("qf2");
byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");

TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
Table table = TEST_UTIL.createTable(td, null);
try {
// put data
Put put0 = new Put(row0);
put0.addColumn(family, qf1, 1, value1);
table.put(put0);
Put put1 = new Put(row1);
put1.addColumn(family, qf2, 1, value2);
table.put(put1);
TEST_UTIL.flush(tableName);
} finally {
Thread.sleep(1000);
}
assertEquals(1, cluster.getRegions(tableName).size());
return tableName;
}

@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
if (zkCluster != null) {
zkCluster.shutdown();
}
}
}

0 comments on commit a4eb499

Please sign in to comment.