diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 9161a02c1800..81d16b2861ca 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -811,3 +811,12 @@ message CloseExcessRegionReplicasProcedureStateData { required TableName table_name = 1; required uint32 new_replica_count = 2; } + +enum CloseTableRegionsProcedureState { + CLOSE_TABLE_REGIONS_SCHEDULE = 1; + CLOSE_TABLE_REGIONS_CONFIRM = 2; +} + +message CloseTableRegionsProcedureStateData { + required TableName table_name = 1; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index bcfa50fe66d5..d05b1f9d3d34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; @@ -1084,25 +1085,16 @@ public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableN .toArray(TransitRegionStateProcedure[]::new); } - /** - * Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will - * skip submit unassign procedure if the region is in transition, so you may need to call this - * method multiple times. - * @param tableName the table for closing excess region replicas - * @param newReplicaCount the new replica count, should be less than current replica count - * @param submit for submitting procedure - * @return the number of regions in transition that we can not schedule unassign procedures - */ - public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName, - int newReplicaCount, Consumer submit) { + private int submitUnassignProcedure(TableName tableName, + Function shouldSubmit, Consumer logRIT, + Consumer submit) { int inTransitionCount = 0; for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) { regionNode.lock(); try { - if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) { + if (shouldSubmit.apply(regionNode)) { if (regionNode.isInTransition()) { - LOG.debug("skip scheduling unassign procedure for {} when closing excess region " - + "replicas since it is in transition", regionNode); + logRIT.accept(regionNode); inTransitionCount++; continue; } @@ -1119,12 +1111,46 @@ public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName table return inTransitionCount; } - public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) { + /** + * Called by DsiableTableProcedure to unassign all regions for a table. Will skip submit unassign + * procedure if the region is in transition, so you may need to call this method multiple times. + * @param tableName the table for closing excess region replicas + * @param submit for submitting procedure + * @return the number of regions in transition that we can not schedule unassign procedures + */ + public int submitUnassignProcedureForDisablingTable(TableName tableName, + Consumer submit) { + return submitUnassignProcedure(tableName, rn -> true, + rn -> LOG.debug("skip scheduling unassign procedure for {} when closing table regions " + + "for disabling since it is in transition", rn), + submit); + } + + /** + * Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will + * skip submit unassign procedure if the region is in transition, so you may need to call this + * method multiple times. + * @param tableName the table for closing excess region replicas + * @param newReplicaCount the new replica count, should be less than current replica count + * @param submit for submitting procedure + * @return the number of regions in transition that we can not schedule unassign procedures + */ + public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName, + int newReplicaCount, Consumer submit) { + return submitUnassignProcedure(tableName, + rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount, + rn -> LOG.debug("skip scheduling unassign procedure for {} when closing excess region " + + "replicas since it is in transition", rn), + submit); + } + + private int numberOfUnclosedRegions(TableName tableName, + Function shouldSubmit) { int unclosed = 0; for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) { regionNode.lock(); try { - if (regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) { + if (shouldSubmit.apply(regionNode)) { if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { unclosed++; } @@ -1136,6 +1162,15 @@ public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newRepl return unclosed; } + public int numberOfUnclosedRegionsForDisabling(TableName tableName) { + return numberOfUnclosedRegions(tableName, rn -> true); + } + + public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) { + return numberOfUnclosedRegions(tableName, + rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount); + } + public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, final byte[] splitKey) throws IOException { return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractCloseTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractCloseTableRegionsProcedure.java new file mode 100644 index 000000000000..bfee7dbf5b07 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractCloseTableRegionsProcedure.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.function.Consumer; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * Base class for unassigning table regions. + */ +@InterfaceAudience.Private +public abstract class AbstractCloseTableRegionsProcedure> + extends AbstractStateMachineTableProcedure { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractCloseTableRegionsProcedure.class); + + protected TableName tableName; + + private RetryCounter retryCounter; + + protected AbstractCloseTableRegionsProcedure() { + } + + protected AbstractCloseTableRegionsProcedure(TableName tableName) { + this.tableName = tableName; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_EDIT; + } + + private Flow schedule(MasterProcedureEnv env) throws ProcedureSuspendedException { + MutableBoolean submitted = new MutableBoolean(false); + int inTransitionCount = submitUnassignProcedure(env, p -> { + submitted.setTrue(); + addChildProcedure(p); + }); + if (inTransitionCount > 0 && submitted.isFalse()) { + // we haven't scheduled any unassign procedures and there are still regions in + // transition, sleep for a while and try again + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.info( + "There are still {} region(s) in transition for closing regions of table {}" + + " when executing {}, suspend {}secs and try again later", + inTransitionCount, tableName, getClass().getSimpleName(), backoffMillis / 1000); + suspend((int) backoffMillis, true); + } + setNextState(getConfirmState()); + return Flow.HAS_MORE_STATE; + } + + private Flow confirm(MasterProcedureEnv env) { + int unclosedCount = numberOfUnclosedRegions(env); + if (unclosedCount > 0) { + LOG.info( + "There are still {} unclosed region(s) for closing regions of table {}" + + " when executing {}, continue...", + unclosedCount, tableName, getClass().getSimpleName()); + setNextState(getInitialState()); + return Flow.HAS_MORE_STATE; + } else { + return Flow.NO_MORE_STATE; + } + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, TState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.trace("{} execute state={}", this, state); + if (state == getInitialState()) { + return schedule(env); + } else if (state == getConfirmState()) { + return confirm(env); + } else { + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, TState state) + throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + /** + * We have two state for this type of procedures, the initial state for scheduling unassign + * procedures, and the confirm state for checking whether we have unassigned all the regions. + * @return the confirm state + */ + protected abstract TState getConfirmState(); + + /** + * Submit TRSP for unassigning regions. Return the number of regions in RIT state that we can not + * schedule TRSP for them. + */ + protected abstract int submitUnassignProcedure(MasterProcedureEnv env, + Consumer submit); + + /** + * Return the number of uncloses regions. Returning {@code 0} means we are done. + */ + protected abstract int numberOfUnclosedRegions(MasterProcedureEnv env); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloseExcessRegionReplicasProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloseExcessRegionReplicasProcedure.java index bb5da2cc48e8..6dd9429aeabf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloseExcessRegionReplicasProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloseExcessRegionReplicasProcedure.java @@ -18,113 +18,33 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; -import org.apache.commons.lang3.mutable.MutableBoolean; +import java.util.function.Consumer; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; -import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; -import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseExcessRegionReplicasProcedureState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseExcessRegionReplicasProcedureStateData; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * Procedure for close excess region replicas. */ @InterfaceAudience.Private public class CloseExcessRegionReplicasProcedure - extends AbstractStateMachineTableProcedure { + extends AbstractCloseTableRegionsProcedure { - private static final Logger LOG = - LoggerFactory.getLogger(CloseExcessRegionReplicasProcedure.class); - - private TableName tableName; private int newReplicaCount; - private RetryCounter retryCounter; - public CloseExcessRegionReplicasProcedure() { } public CloseExcessRegionReplicasProcedure(TableName tableName, int newReplicaCount) { - this.tableName = tableName; + super(tableName); this.newReplicaCount = newReplicaCount; } - @Override - public TableName getTableName() { - return tableName; - } - - @Override - public TableOperationType getTableOperationType() { - return TableOperationType.REGION_EDIT; - } - - @Override - protected Flow executeFromState(MasterProcedureEnv env, - CloseExcessRegionReplicasProcedureState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { - LOG.trace("{} execute state={}", this, state); - switch (state) { - case CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE: - MutableBoolean submitted = new MutableBoolean(false); - int inTransitionCount = env.getAssignmentManager() - .submitUnassignProcedureForClosingExcessRegionReplicas(tableName, newReplicaCount, p -> { - submitted.setTrue(); - addChildProcedure(p); - }); - if (inTransitionCount > 0 && submitted.isFalse()) { - // we haven't scheduled any unassign procedures and there are still regions in - // transition, sleep for a while and try again - if (retryCounter == null) { - retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); - } - long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts(); - LOG.info( - "There are still {} region(s) in transition for table {} when closing excess" - + " region replicas, suspend {}secs and try again later", - inTransitionCount, tableName, backoffMillis / 1000); - suspend((int) backoffMillis, true); - } - setNextState(CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_CONFIRM); - return Flow.HAS_MORE_STATE; - case CLOSE_EXCESS_REGION_REPLICAS_CONFIRM: - int unclosedCount = env.getAssignmentManager() - .numberOfUnclosedExcessRegionReplicas(tableName, newReplicaCount); - if (unclosedCount > 0) { - LOG.info("There are still {} unclosed region(s) for table {} when closing excess" - + " region replicas, continue..."); - setNextState( - CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_SCHEDULE); - return Flow.HAS_MORE_STATE; - } else { - return Flow.NO_MORE_STATE; - } - default: - throw new UnsupportedOperationException("unhandled state=" + state); - } - } - - @Override - protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { - setState(ProcedureProtos.ProcedureState.RUNNABLE); - env.getProcedureScheduler().addFront(this); - return false; - } - - @Override - protected void rollbackState(MasterProcedureEnv env, - CloseExcessRegionReplicasProcedureState state) throws IOException, InterruptedException { - throw new UnsupportedOperationException(); - } - @Override protected CloseExcessRegionReplicasProcedureState getState(int stateId) { return CloseExcessRegionReplicasProcedureState.forNumber(stateId); @@ -158,4 +78,22 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws newReplicaCount = data.getNewReplicaCount(); } + @Override + protected CloseExcessRegionReplicasProcedureState getConfirmState() { + return CloseExcessRegionReplicasProcedureState.CLOSE_EXCESS_REGION_REPLICAS_CONFIRM; + } + + @Override + protected int submitUnassignProcedure(MasterProcedureEnv env, + Consumer submit) { + return env.getAssignmentManager() + .submitUnassignProcedureForClosingExcessRegionReplicas(tableName, newReplicaCount, submit); + } + + @Override + protected int numberOfUnclosedRegions(MasterProcedureEnv env) { + return env.getAssignmentManager().numberOfUnclosedExcessRegionReplicas(tableName, + newReplicaCount); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloseTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloseTableRegionsProcedure.java new file mode 100644 index 000000000000..0cfe0b41785c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloseTableRegionsProcedure.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.function.Consumer; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseTableRegionsProcedureState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseTableRegionsProcedureStateData; + +/** + * Procedure for closing all regions for a table. + */ +@InterfaceAudience.Private +public class CloseTableRegionsProcedure + extends AbstractCloseTableRegionsProcedure { + + public CloseTableRegionsProcedure() { + } + + public CloseTableRegionsProcedure(TableName tableName) { + super(tableName); + } + + @Override + protected int submitUnassignProcedure(MasterProcedureEnv env, + Consumer submit) { + return env.getAssignmentManager().submitUnassignProcedureForDisablingTable(tableName, submit); + } + + @Override + protected int numberOfUnclosedRegions(MasterProcedureEnv env) { + return env.getAssignmentManager().numberOfUnclosedRegionsForDisabling(tableName); + } + + @Override + protected CloseTableRegionsProcedureState getState(int stateId) { + return CloseTableRegionsProcedureState.forNumber(stateId); + } + + @Override + protected int getStateId(CloseTableRegionsProcedureState state) { + return state.getNumber(); + } + + @Override + protected CloseTableRegionsProcedureState getInitialState() { + return CloseTableRegionsProcedureState.CLOSE_TABLE_REGIONS_SCHEDULE; + } + + @Override + protected CloseTableRegionsProcedureState getConfirmState() { + return CloseTableRegionsProcedureState.CLOSE_TABLE_REGIONS_CONFIRM; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + CloseTableRegionsProcedureStateData data = CloseTableRegionsProcedureStateData.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); + serializer.serialize(data); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + CloseTableRegionsProcedureStateData data = + serializer.deserialize(CloseTableRegionsProcedureStateData.class); + tableName = ProtobufUtil.toTableName(data.getTableName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index a5c2a4122eae..e8999b886afd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -102,8 +102,7 @@ protected Flow executeFromState(final MasterProcedureEnv env, final DisableTable setNextState(DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE); break; case DISABLE_TABLE_MARK_REGIONS_OFFLINE: - addChildProcedure( - env.getAssignmentManager().createUnassignProceduresForDisabling(tableName)); + addChildProcedure(new CloseTableRegionsProcedure(tableName)); setNextState(DisableTableState.DISABLE_TABLE_ADD_REPLICATION_BARRIER); break; case DISABLE_TABLE_ADD_REPLICATION_BARRIER: @@ -214,14 +213,6 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws skipTableStateCheck = disableTableMsg.getSkipTableStateCheck(); } - // For disabling a table, we does not care whether a region can be online so hold the table xlock - // for ever. This will simplify the logic as we will not be conflict with procedures other than - // SCP. - @Override - protected boolean holdLock(MasterProcedureEnv env) { - return true; - } - @Override public TableName getTableName() { return tableName; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index cdf13064e24a..3fa5358635ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -614,20 +614,6 @@ private void assignRegions(MasterProcedureEnv env, List regions) }); continue; } - if ( - env.getMasterServices().getTableStateManager().isTableState(regionNode.getTable(), - TableState.State.DISABLING) - ) { - // We need to change the state here otherwise the TRSP scheduled by DTP will try to - // close the region from a dead server and will never succeed. Please see HBASE-23636 - // for more details. - ProcedureFutureUtil.suspendIfNecessary(this, this::setUpdateMetaFuture, - env.getAssignmentManager().regionClosedAbnormally(regionNode), env, () -> { - }); - LOG.info("{} found table disabling for region {}, set it state to ABNORMALLY_CLOSED.", - this, regionNode); - continue; - } if ( env.getMasterServices().getTableStateManager().isTableState(regionNode.getTable(), TableState.State.DISABLED) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndDTP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestRaceBetweenSCPAndDTP.java similarity index 57% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndDTP.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestRaceBetweenSCPAndDTP.java index e6a40b74e892..075ae8b9506a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndDTP.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestRaceBetweenSCPAndDTP.java @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.master.assignment; +package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; -import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -28,16 +28,13 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; -import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; -import org.apache.hadoop.hbase.master.region.MasterRegion; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -47,58 +44,55 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + /** - * Testcase for HBASE-23636. + * Testcase for HBASE-28522. + *

+ * We used to have test with the same name but in different package for HBASE-23636, where DTP will + * hold the exclusive lock all the time, and it will reset TRSPs which has been attached to + * RegionStateNodes, so we need special logic in SCP to deal with it. + *

+ * After HBASE-28522, DTP will not reset TRSPs any more, so SCP does not need to take care of this + * special case, thues we removed the special logic in SCP and also the UT for HBASE-22636 is not + * valid any more, so we just removed the old one and introduce a new one with the same name here. */ @Category({ MasterTests.class, MediumTests.class }) public class TestRaceBetweenSCPAndDTP { - private static final Logger LOG = LoggerFactory.getLogger(TestRaceBetweenSCPAndDTP.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestRaceBetweenSCPAndDTP.class); + private static final Logger LOG = LoggerFactory.getLogger(TestRaceBetweenSCPAndDTP.class); + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); private static TableName NAME = TableName.valueOf("Race"); private static byte[] CF = Bytes.toBytes("cf"); - private static CountDownLatch ARRIVE_GET_REGIONS_ON_TABLE; + private static CountDownLatch ARRIVE_GET_REPLICATION_PEER_MANAGER; - private static CountDownLatch RESUME_GET_REGIONS_ON_SERVER; + private static CountDownLatch RESUME_GET_REPLICATION_PEER_MANAGER; - private static final class AssignmentManagerForTest extends AssignmentManager { + public static final class HMasterForTest extends HMaster { - public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion) { - super(master, masterRegion); + public HMasterForTest(Configuration conf) throws IOException, KeeperException { + super(conf); } @Override - public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { - if (ARRIVE_GET_REGIONS_ON_TABLE != null) { - ARRIVE_GET_REGIONS_ON_TABLE.countDown(); - ARRIVE_GET_REGIONS_ON_TABLE = null; + public ReplicationPeerManager getReplicationPeerManager() { + if (ARRIVE_GET_REPLICATION_PEER_MANAGER != null) { + ARRIVE_GET_REPLICATION_PEER_MANAGER.countDown(); + ARRIVE_GET_REPLICATION_PEER_MANAGER = null; try { - RESUME_GET_REGIONS_ON_SERVER.await(); + RESUME_GET_REPLICATION_PEER_MANAGER.await(); } catch (InterruptedException e) { } } - TransitRegionStateProcedure[] procs = super.createUnassignProceduresForDisabling(tableName); - return procs; - } - } - - public static final class HMasterForTest extends HMaster { - - public HMasterForTest(Configuration conf) throws IOException, KeeperException { - super(conf); - } - - @Override - protected AssignmentManager createAssignmentManager(MasterServices master, - MasterRegion masterRegion) { - return new AssignmentManagerForTest(master, masterRegion); + return super.getReplicationPeerManager(); } } @@ -116,43 +110,43 @@ public static void tearDown() throws Exception { UTIL.shutdownMiniCluster(); } + private boolean wasExecuted(Procedure proc) { + // RUNNABLE is not enough to make sure that the DTP has acquired the table lock, as we will set + // procedure to RUNNABLE first and then acquire the execution lock + return proc.wasExecuted() || proc.getState() == ProcedureState.WAITING_TIMEOUT + || proc.getState() == ProcedureState.WAITING; + } + @Test - public void test() throws Exception { + public void testRace() throws Exception { RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo(); AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager(); ServerName sn = am.getRegionStates().getRegionState(region).getServerName(); LOG.info("ServerName={}, region={}", sn, region); - ARRIVE_GET_REGIONS_ON_TABLE = new CountDownLatch(1); - RESUME_GET_REGIONS_ON_SERVER = new CountDownLatch(1); + ARRIVE_GET_REPLICATION_PEER_MANAGER = new CountDownLatch(1); + RESUME_GET_REPLICATION_PEER_MANAGER = new CountDownLatch(1); // Assign to local variable because this static gets set to null in above running thread and // so NPE. - CountDownLatch cdl = ARRIVE_GET_REGIONS_ON_TABLE; - UTIL.getAdmin().disableTableAsync(NAME); + CountDownLatch cdl = ARRIVE_GET_REPLICATION_PEER_MANAGER; + UTIL.getMiniHBaseCluster().stopRegionServer(sn); cdl.await(); + Future future = UTIL.getAdmin().disableTableAsync(NAME); ProcedureExecutor procExec = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); - UTIL.getMiniHBaseCluster().stopRegionServer(sn); - long pid = Procedure.NO_PROC_ID; - do { - Threads.sleep(1); - pid = getSCPPID(procExec); - } while (pid != Procedure.NO_PROC_ID); - final long scppid = pid; - UTIL.waitFor(60000, () -> procExec.isFinished(scppid)); - RESUME_GET_REGIONS_ON_SERVER.countDown(); - - long dtpProcId = - procExec.getProcedures().stream().filter(p -> p instanceof DisableTableProcedure) - .map(p -> (DisableTableProcedure) p).findAny().get().getProcId(); - UTIL.waitFor(60000, () -> procExec.isFinished(dtpProcId)); - } - - /** Returns Returns {@link Procedure#NO_PROC_ID} if no SCP found else actual pid. */ - private long getSCPPID(ProcedureExecutor e) { - Optional optional = e.getProcedures().stream() - .filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p).findAny(); - return optional.isPresent() ? optional.get().getProcId() : Procedure.NO_PROC_ID; + // make sure the DTP has been executed + UTIL.waitFor(60000, + () -> procExec.getProcedures().stream().filter(p -> p instanceof DisableTableProcedure) + .map(p -> (DisableTableProcedure) p).filter(p -> p.getTableName().equals(NAME)) + .anyMatch(this::wasExecuted)); + RESUME_GET_REPLICATION_PEER_MANAGER.countDown(); + + // make sure the DTP can finish + future.get(); + + // also make sure all SCPs are finished + UTIL.waitFor(60000, () -> procExec.getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isFinished)); } }