diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index d7377adf3a10..91c237b74f4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.master.procedure; +import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT; +import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY; +import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_SIZE_MAX_DISABLED; +import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.PROGRESSIVE_BATCH_SIZE_MAX_KEY; + import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -25,6 +30,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ConcurrentTableModificationException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -147,7 +153,13 @@ protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableS break; case MODIFY_TABLE_REOPEN_ALL_REGIONS: if (isTableEnabled(env)) { - addChildProcedure(new ReopenTableRegionsProcedure(getTableName())); + Configuration conf = env.getMasterConfiguration(); + long backoffMillis = conf.getLong(PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY, + PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT); + int batchSizeMax = + conf.getInt(PROGRESSIVE_BATCH_SIZE_MAX_KEY, PROGRESSIVE_BATCH_SIZE_MAX_DISABLED); + addChildProcedure( + new ReopenTableRegionsProcedure(getTableName(), backoffMillis, batchSizeMax)); } setNextState(ModifyTableState.MODIFY_TABLE_ASSIGN_NEW_REPLICAS); break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java index 4efb1768b0ce..353636e6ddd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ReopenTableRegionsProcedure.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.procedure; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -53,6 +54,17 @@ public class ReopenTableRegionsProcedure private static final Logger LOG = LoggerFactory.getLogger(ReopenTableRegionsProcedure.class); + public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY = + "hbase.reopen.table.regions.progressive.batch.backoff.ms"; + public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L; + public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY = + "hbase.reopen.table.regions.progressive.batch.size.max"; + public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1; + private static final int PROGRESSIVE_BATCH_SIZE_MAX_DEFAULT_VALUE = Integer.MAX_VALUE; + + // this minimum prevents a max which would break this procedure + private static final int MINIMUM_BATCH_SIZE_MAX = 1; + private TableName tableName; // Specify specific regions of a table to reopen. @@ -61,20 +73,46 @@ public class ReopenTableRegionsProcedure private List regions = Collections.emptyList(); + private List currentRegionBatch = Collections.emptyList(); + private RetryCounter retryCounter; + private long reopenBatchBackoffMillis; + private int reopenBatchSize; + private int reopenBatchSizeMax; + private long regionsReopened = 0; + private long batchesProcessed = 0; + public ReopenTableRegionsProcedure() { - regionNames = Collections.emptyList(); + this(null); } public ReopenTableRegionsProcedure(TableName tableName) { - this.tableName = tableName; - this.regionNames = Collections.emptyList(); + this(tableName, Collections.emptyList()); } public ReopenTableRegionsProcedure(final TableName tableName, final List regionNames) { + this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT, + PROGRESSIVE_BATCH_SIZE_MAX_DISABLED); + } + + public ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis, + int reopenBatchSizeMax) { + this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax); + } + + public ReopenTableRegionsProcedure(final TableName tableName, final List regionNames, + long reopenBatchBackoffMillis, int reopenBatchSizeMax) { this.tableName = tableName; this.regionNames = regionNames; + this.reopenBatchBackoffMillis = reopenBatchBackoffMillis; + if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) { + this.reopenBatchSize = Integer.MAX_VALUE; + this.reopenBatchSizeMax = Integer.MAX_VALUE; + } else { + this.reopenBatchSize = 1; + this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX); + } } @Override @@ -87,6 +125,30 @@ public TableOperationType getTableOperationType() { return TableOperationType.REGION_EDIT; } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public long getRegionsReopened() { + return regionsReopened; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public long getBatchesProcessed() { + return batchesProcessed; + } + + @RestrictedApi(explanation = "Should only be called internally or in tests", link = "", + allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java") + protected int progressBatchSize() { + int previousBatchSize = reopenBatchSize; + reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize); + if (reopenBatchSize < previousBatchSize) { + // the batch size should never decrease. this must be overflow, so just use max + reopenBatchSize = reopenBatchSizeMax; + } + return reopenBatchSize; + } + private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) { if (loc.getSeqNum() < 0) { return false; @@ -114,7 +176,13 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); return Flow.HAS_MORE_STATE; case REOPEN_TABLE_REGIONS_REOPEN_REGIONS: - for (HRegionLocation loc : regions) { + // if we didn't finish reopening the last batch yet, let's keep trying until we do. + // at that point, the batch will be empty and we can generate a new batch + if (!regions.isEmpty() && currentRegionBatch.isEmpty()) { + currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList()); + batchesProcessed++; + } + for (HRegionLocation loc : currentRegionBatch) { RegionStateNode regionNode = env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion()); // this possible, maybe the region has already been merged or split, see HBASE-20921 @@ -133,39 +201,72 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState regionNode.unlock(); } addChildProcedure(proc); + regionsReopened++; } setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED); return Flow.HAS_MORE_STATE; case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED: - regions = regions.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened) - .filter(l -> l != null).collect(Collectors.toList()); + // update region lists based on what's been reopened + regions = filterReopened(env, regions); + currentRegionBatch = filterReopened(env, currentRegionBatch); + + // existing batch didn't fully reopen, so try to resolve that first. + // since this is a retry, don't do the batch backoff + if (!currentRegionBatch.isEmpty()) { + return reopenIfSchedulable(env, currentRegionBatch, false); + } + if (regions.isEmpty()) { return Flow.NO_MORE_STATE; } - if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) { - retryCounter = null; - setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); - return Flow.HAS_MORE_STATE; - } - // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry - // again. - if (retryCounter == null) { - retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); - } - long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); - LOG.info( - "There are still {} region(s) which need to be reopened for table {} are in " - + "OPENING state, suspend {}secs and try again later", - regions.size(), tableName, backoff / 1000); - setTimeout(Math.toIntExact(backoff)); - setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); - skipPersistence(); - throw new ProcedureSuspendedException(); + + // current batch is finished, schedule more regions + return reopenIfSchedulable(env, regions, true); default: throw new UnsupportedOperationException("unhandled state=" + state); } } + private List filterReopened(MasterProcedureEnv env, + List regionsToCheck) { + return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened) + .filter(l -> l != null).collect(Collectors.toList()); + } + + private Flow reopenIfSchedulable(MasterProcedureEnv env, List regionsToReopen, + boolean shouldBatchBackoff) throws ProcedureSuspendedException { + if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) { + retryCounter = null; + setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); + if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) { + progressBatchSize(); + setBackoffState(reopenBatchBackoffMillis); + throw new ProcedureSuspendedException(); + } else { + return Flow.HAS_MORE_STATE; + } + } + + // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry + // again. + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.info( + "There are still {} region(s) which need to be reopened for table {}. {} are in " + + "OPENING state, suspend {}secs and try again later", + regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000); + setBackoffState(backoffMillis); + throw new ProcedureSuspendedException(); + } + + private void setBackoffState(long millis) { + setTimeout(Math.toIntExact(millis)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + } + private List getRegionLocationsForReopen(List tableRegionsForReopen) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java new file mode 100644 index 000000000000..fbabb1fa22cc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatchBackoff.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Confirm that we will rate limit reopen batches when reopening all table regions. This can avoid + * the pain associated with reopening too many regions at once. + */ +@Category({ MasterTests.class, MediumTests.class }) +public class TestReopenTableRegionsProcedureBatchBackoff { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBatchBackoff.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + private static TableName TABLE_NAME = TableName.valueOf("BatchBackoff"); + private static final int BACKOFF_MILLIS_PER_RS = 3_000; + private static final int REOPEN_BATCH_SIZE = 1; + + private static byte[] CF = Bytes.toBytes("cf"); + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); + UTIL.startMiniCluster(1); + UTIL.createMultiRegionTable(TABLE_NAME, CF, 10); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testRegionBatchBackoff() throws IOException { + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + List regions = UTIL.getAdmin().getRegions(TABLE_NAME); + assertTrue(10 <= regions.size()); + ReopenTableRegionsProcedure proc = + new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, REOPEN_BATCH_SIZE); + procExec.submitProcedure(proc); + Instant startedAt = Instant.now(); + ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000); + Instant stoppedAt = Instant.now(); + assertTrue(Duration.between(startedAt, stoppedAt).toMillis() + > (long) regions.size() * BACKOFF_MILLIS_PER_RS); + } + + @Test + public void testRegionBatchNoBackoff() throws IOException { + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + List regions = UTIL.getAdmin().getRegions(TABLE_NAME); + assertTrue(10 <= regions.size()); + int noBackoffMillis = 0; + ReopenTableRegionsProcedure proc = + new ReopenTableRegionsProcedure(TABLE_NAME, noBackoffMillis, REOPEN_BATCH_SIZE); + procExec.submitProcedure(proc); + ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, + (long) regions.size() * BACKOFF_MILLIS_PER_RS); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java new file mode 100644 index 000000000000..8ea9b3c6a309 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestReopenTableRegionsProcedureBatching.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +/** + * Confirm that we will batch region reopens when reopening all table regions. This can avoid the + * pain associated with reopening too many regions at once. + */ +@Category({ MasterTests.class, MediumTests.class }) +public class TestReopenTableRegionsProcedureBatching { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReopenTableRegionsProcedureBatching.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + private static final int BACKOFF_MILLIS_PER_RS = 0; + private static final int REOPEN_BATCH_SIZE_MAX = 1; + + private static TableName TABLE_NAME = TableName.valueOf("Batching"); + + private static byte[] CF = Bytes.toBytes("cf"); + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); + UTIL.startMiniCluster(1); + UTIL.createMultiRegionTable(TABLE_NAME, CF); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testSmallMaxBatchSize() throws IOException { + AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + List regions = UTIL.getAdmin().getRegions(TABLE_NAME); + assertTrue(2 <= regions.size()); + Set stuckRegions = + regions.stream().map(r -> stickRegion(am, procExec, r)).collect(Collectors.toSet()); + ReopenTableRegionsProcedure proc = + new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, REOPEN_BATCH_SIZE_MAX); + procExec.submitProcedure(proc); + UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT); + + // the first batch should be small + confirmBatchSize(1, stuckRegions, proc); + ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000); + + // other batches should also be small + assertTrue(proc.getBatchesProcessed() >= regions.size()); + + // all regions should only be opened once + assertEquals(proc.getRegionsReopened(), regions.size()); + } + + @Test + public void testDefaultMaxBatchSize() throws IOException { + AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + List regions = UTIL.getAdmin().getRegions(TABLE_NAME); + assertTrue(2 <= regions.size()); + Set stuckRegions = + regions.stream().map(r -> stickRegion(am, procExec, r)).collect(Collectors.toSet()); + ReopenTableRegionsProcedure proc = new ReopenTableRegionsProcedure(TABLE_NAME); + procExec.submitProcedure(proc); + UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT); + + // the first batch should be large + confirmBatchSize(regions.size(), stuckRegions, proc); + ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000); + + // all regions should only be opened once + assertEquals(proc.getRegionsReopened(), regions.size()); + } + + @Test + public void testNegativeBatchSizeDoesNotBreak() throws IOException { + AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); + ProcedureExecutor procExec = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + List regions = UTIL.getAdmin().getRegions(TABLE_NAME); + assertTrue(2 <= regions.size()); + Set stuckRegions = + regions.stream().map(r -> stickRegion(am, procExec, r)).collect(Collectors.toSet()); + ReopenTableRegionsProcedure proc = + new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, -100); + procExec.submitProcedure(proc); + UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT); + + // the first batch should be small + confirmBatchSize(1, stuckRegions, proc); + ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000); + + // other batches should also be small + assertTrue(proc.getBatchesProcessed() >= regions.size()); + + // all regions should only be opened once + assertEquals(proc.getRegionsReopened(), regions.size()); + } + + @Test + public void testBatchSizeDoesNotOverflow() { + ReopenTableRegionsProcedure proc = + new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, Integer.MAX_VALUE); + int currentBatchSize = 1; + while (currentBatchSize < Integer.MAX_VALUE) { + currentBatchSize = proc.progressBatchSize(); + assertTrue(currentBatchSize > 0); + } + } + + private void confirmBatchSize(int expectedBatchSize, Set stuckRegions, + ReopenTableRegionsProcedure proc) { + while (true) { + if (proc.getBatchesProcessed() == 0) { + continue; + } + stuckRegions.forEach(this::unstickRegion); + UTIL.waitFor(5000, () -> expectedBatchSize == proc.getRegionsReopened()); + break; + } + } + + static class StuckRegion { + final TransitRegionStateProcedure trsp; + final RegionStateNode regionNode; + final long openSeqNum; + + public StuckRegion(TransitRegionStateProcedure trsp, RegionStateNode regionNode, + long openSeqNum) { + this.trsp = trsp; + this.regionNode = regionNode; + this.openSeqNum = openSeqNum; + } + } + + private StuckRegion stickRegion(AssignmentManager am, + ProcedureExecutor procExec, RegionInfo regionInfo) { + RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(regionInfo); + TransitRegionStateProcedure trsp = + TransitRegionStateProcedure.unassign(procExec.getEnvironment(), regionInfo); + regionNode.lock(); + long openSeqNum; + try { + openSeqNum = regionNode.getOpenSeqNum(); + regionNode.setState(State.OPENING); + regionNode.setOpenSeqNum(-1L); + regionNode.setProcedure(trsp); + } finally { + regionNode.unlock(); + } + return new StuckRegion(trsp, regionNode, openSeqNum); + } + + private void unstickRegion(StuckRegion stuckRegion) { + stuckRegion.regionNode.lock(); + try { + stuckRegion.regionNode.setState(State.OPEN); + stuckRegion.regionNode.setOpenSeqNum(stuckRegion.openSeqNum); + stuckRegion.regionNode.unsetProcedure(stuckRegion.trsp); + } finally { + stuckRegion.regionNode.unlock(); + } + } +}