From ece8d014afa940f382d4c7247d9ccfbf7f01965c Mon Sep 17 00:00:00 2001 From: Kota-SH Date: Thu, 4 May 2023 10:00:32 -0500 Subject: [PATCH] HBASE-27752: Update the list of prefetched files upon region movement (#5194) Co-authored-by: Shanmukha Kota --- .../hbase/io/hfile/PrefetchExecutor.java | 5 +- .../hbase/io/hfile/bucket/BucketCache.java | 4 + .../hadoop/hbase/regionserver/HRegion.java | 45 ++++- .../handler/CloseRegionHandler.java | 2 +- .../TestBlockEvictionOnRegionMovement.java | 179 ++++++++++++++++++ 5 files changed, 228 insertions(+), 7 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index b30150fcb6d6..d3064e066a12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -123,7 +123,7 @@ public static void request(Path path, Runnable runnable) { public static void complete(Path path) { prefetchFutures.remove(path); prefetchCompleted.put(path.getName(), true); - LOG.debug("Prefetch completed for {}", path); + LOG.debug("Prefetch completed for {}", path.getName()); } public static void cancel(Path path) { @@ -134,7 +134,8 @@ public static void cancel(Path path) { prefetchFutures.remove(path); LOG.debug("Prefetch cancelled for {}", path); } - prefetchCompleted.remove(path.getName()); + LOG.debug("Removing filename from the prefetched persistence list: {}", path.getName()); + removePrefetchedFileWhileEvict(path.getName()); } public static boolean isCompleted(Path path) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index f0028e556d25..8c9a24b79b7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -427,6 +427,10 @@ private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String } } + public boolean isCachePersistenceEnabled() { + return (prefetchedFileListPath != null) && (persistencePath != null); + } + /** * Cache the block with the specified name and buffer. * @param cacheKey block's cache key diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c3e6bfdc2fff..809f2bc5029e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -135,7 +135,9 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -1596,12 +1598,31 @@ public Map> close(boolean abort) throws IOException { return close(abort, false); } + /** + * Close this HRegion. + * @param abort true if server is aborting (only during testing) + * @param ignoreStatus true if ignore the status (won't be showed on task list) + * @return Vector of all the storage files that the HRegion's component HStores make use of. It's + * a list of StoreFile objects. Can be null if we are not to close at this time, or we are + * already closed. + * @throws IOException e + * @throws DroppedSnapshotException Thrown when replay of wal is required because a Snapshot was + * not properly persisted. The region is put in closing mode, and + * the caller MUST abort after this. + */ + public Map> close(boolean abort, boolean ignoreStatus) + throws IOException { + return close(abort, ignoreStatus, false); + } + /** * Close down this HRegion. Flush the cache unless abort parameter is true, Shut down each HStore, * don't service any more calls. This method could take some time to execute, so don't call it * from a time-sensitive thread. - * @param abort true if server is aborting (only during testing) - * @param ignoreStatus true if ignore the status (wont be showed on task list) + * @param abort true if server is aborting (only during testing) + * @param ignoreStatus true if ignore the status (wont be showed on task list) + * @param isGracefulStop true if region is being closed during graceful stop and the blocks in the + * BucketCache should not be evicted. * @return Vector of all the storage files that the HRegion's component HStores make use of. It's * a list of StoreFile objects. Can be null if we are not to close at this time or we are * already closed. @@ -1610,8 +1631,8 @@ public Map> close(boolean abort) throws IOException { * not properly persisted. The region is put in closing mode, and * the caller MUST abort after this. */ - public Map> close(boolean abort, boolean ignoreStatus) - throws IOException { + public Map> close(boolean abort, boolean ignoreStatus, + boolean isGracefulStop) throws IOException { // Only allow one thread to close at a time. Serialize them so dual // threads attempting to close will run up against each other. MonitoredTask status = TaskMonitor.get().createStatus( @@ -1620,6 +1641,22 @@ public Map> close(boolean abort, boolean ignoreStatus) status.setStatus("Waiting for close lock"); try { synchronized (closeLock) { + if (isGracefulStop && rsServices != null) { + rsServices.getBlockCache().ifPresent(blockCache -> { + if (blockCache instanceof CombinedBlockCache) { + BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache(); + if (l2 instanceof BucketCache) { + if (((BucketCache) l2).isCachePersistenceEnabled()) { + LOG.info( + "Closing region {} during a graceful stop, and cache persistence is on, " + + "so setting evict on close to false. ", + this.getRegionInfo().getRegionNameAsString()); + this.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(false)); + } + } + } + }); + } return doClose(abort, status); } } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java index 2301b9b8b494..e184cb42fb91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java @@ -102,7 +102,7 @@ public void process() throws IOException { } // Close the region - if (region.close(abort) == null) { + if (region.close(abort, false, true) == null) { // This region has already been closed. Should not happen (A unit test makes this // happen as a side effect, TestRegionObserverInterface.testPreWALAppendNotCalledOnMetaEdit) LOG.warn("Can't close {}; already closed", name); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java new file mode 100644 index 000000000000..66b2ca73ded8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.StartTestingClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ IOTests.class, MediumTests.class }) +public class TestBlockEvictionOnRegionMovement { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBlockEvictionOnRegionMovement.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestBlockEvictionOnRegionMovement.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private Configuration conf; + Path testDir; + MiniZooKeeperCluster zkCluster; + SingleProcessHBaseCluster cluster; + StartTestingClusterOption option = + StartTestingClusterOption.builder().numRegionServers(2).build(); + + @Before + public void setup() throws Exception { + conf = TEST_UTIL.getConfiguration(); + testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache"); + conf.setInt("hbase.bucketcache.size", 400); + conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence"); + conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence"); + conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100); + conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true); + zkCluster = TEST_UTIL.startMiniZKCluster(); + cluster = TEST_UTIL.startMiniHBaseCluster(option); + cluster.setConf(conf); + } + + @Test + public void testBlockEvictionOnRegionMove() throws Exception { + // Write to table and flush + TableName tableRegionMove = writeDataToTable(); + + HRegionServer regionServingRS = + cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1 + ? cluster.getRegionServer(1) + : cluster.getRegionServer(0); + assertTrue(regionServingRS.getBlockCache().isPresent()); + long oldUsedCacheSize = + regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize(); + assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount()); + + Admin admin = TEST_UTIL.getAdmin(); + RegionInfo regionToMove = regionServingRS.getRegions(tableRegionMove).get(0).getRegionInfo(); + admin.move(regionToMove.getEncodedNameAsBytes(), + TEST_UTIL.getOtherRegionServer(regionServingRS).getServerName()); + assertEquals(0, regionServingRS.getRegions(tableRegionMove).size()); + + long newUsedCacheSize = + regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize(); + assertTrue(oldUsedCacheSize > newUsedCacheSize); + assertEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount()); + } + + @Test + public void testBlockEvictionOnGracefulStop() throws Exception { + // Write to table and flush + TableName tableRegionClose = writeDataToTable(); + + HRegionServer regionServingRS = + cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1 + ? cluster.getRegionServer(1) + : cluster.getRegionServer(0); + + assertTrue(regionServingRS.getBlockCache().isPresent()); + long oldUsedCacheSize = + regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize(); + assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount()); + + cluster.stopRegionServer(regionServingRS.getServerName()); + Thread.sleep(500); + cluster.startRegionServer(); + Thread.sleep(500); + + long newUsedCacheSize = + regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize(); + assertEquals(oldUsedCacheSize, newUsedCacheSize); + assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount()); + } + + public TableName writeDataToTable() throws IOException, InterruptedException { + TableName tableName = TableName.valueOf("table1"); + byte[] row0 = Bytes.toBytes("row1"); + byte[] row1 = Bytes.toBytes("row2"); + byte[] family = Bytes.toBytes("family"); + byte[] qf1 = Bytes.toBytes("qf1"); + byte[] qf2 = Bytes.toBytes("qf2"); + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + Table table = TEST_UTIL.createTable(td, null); + try { + // put data + Put put0 = new Put(row0); + put0.addColumn(family, qf1, 1, value1); + table.put(put0); + Put put1 = new Put(row1); + put1.addColumn(family, qf2, 1, value2); + table.put(put1); + TEST_UTIL.flush(tableName); + } finally { + Thread.sleep(1000); + } + assertEquals(1, cluster.getRegions(tableName).size()); + return tableName; + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir)); + if (zkCluster != null) { + zkCluster.shutdown(); + } + } +}