Skip to content

Commit

Permalink
support increasing batch size
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Nov 29, 2023
1 parent 32e08b2 commit 36f7ee1
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.REOPEN_BATCH_BACKOFF_MILLIS_DEFAULT;
import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.REOPEN_BATCH_BACKOFF_MILLIS_KEY;
import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.REOPEN_BATCH_SIZE_DEFAULT;
import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.REOPEN_BATCH_SIZE_KEY;
import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.REOPEN_BATCH_SIZE_MAX_DEFAULT;
import static org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure.REOPEN_BATCH_SIZE_MAX_KEY;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -156,9 +156,10 @@ protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableS
Configuration conf = env.getMasterConfiguration();
long backoffMillis =
conf.getLong(REOPEN_BATCH_BACKOFF_MILLIS_KEY, REOPEN_BATCH_BACKOFF_MILLIS_DEFAULT);
int batchSize = conf.getInt(REOPEN_BATCH_SIZE_KEY, REOPEN_BATCH_SIZE_DEFAULT);
int batchSizeMax =
conf.getInt(REOPEN_BATCH_SIZE_MAX_KEY, REOPEN_BATCH_SIZE_MAX_DEFAULT);
addChildProcedure(
new ReopenTableRegionsProcedure(getTableName(), backoffMillis, batchSize));
new ReopenTableRegionsProcedure(getTableName(), backoffMillis, batchSizeMax));
}
setNextState(ModifyTableState.MODIFY_TABLE_ASSIGN_NEW_REPLICAS);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ public class ReopenTableRegionsProcedure
public static final String REOPEN_BATCH_BACKOFF_MILLIS_KEY =
"hbase.table.regions.reopen.batch.backoff.ms";
public static final long REOPEN_BATCH_BACKOFF_MILLIS_DEFAULT = 0L;
public static final String REOPEN_BATCH_SIZE_KEY = "hbase.table.regions.reopen.batch.size";
public static final int REOPEN_BATCH_SIZE_DEFAULT = Integer.MAX_VALUE;
public static final String REOPEN_BATCH_SIZE_MAX_KEY =
"hbase.table.regions.reopen.batch.size.max";
public static final int REOPEN_BATCH_SIZE_MAX_DEFAULT = Integer.MAX_VALUE;

// this minimum prevents a max which would break this procedure
private static final int MINIMUM_BATCH_SIZE_MAX = 1;

private TableName tableName;

Expand All @@ -75,6 +79,9 @@ public class ReopenTableRegionsProcedure

private long reopenBatchBackoffMillis;
private int reopenBatchSize;
private int reopenBatchSizeMax;
private long regionsReopened = 0;
private long batchesProcessed = 0;

public ReopenTableRegionsProcedure() {
this(null);
Expand All @@ -85,20 +92,22 @@ public ReopenTableRegionsProcedure(TableName tableName) {
}

public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames) {
this(tableName, regionNames, REOPEN_BATCH_BACKOFF_MILLIS_DEFAULT, REOPEN_BATCH_SIZE_DEFAULT);
this(tableName, regionNames, REOPEN_BATCH_BACKOFF_MILLIS_DEFAULT,
REOPEN_BATCH_SIZE_MAX_DEFAULT);
}

public ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis,
int reopenBatchSize) {
this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSize);
int reopenBatchSizeMax) {
this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax);
}

public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames,
long reopenBatchBackoffMillis, int reopenBatchSize) {
long reopenBatchBackoffMillis, int reopenBatchSizeMax) {
this.tableName = tableName;
this.regionNames = regionNames;
this.reopenBatchBackoffMillis = reopenBatchBackoffMillis;
this.reopenBatchSize = reopenBatchSize;
this.reopenBatchSize = 1;
this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX);
}

@Override
Expand All @@ -113,8 +122,14 @@ public TableOperationType getTableOperationType() {

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public List<HRegionLocation> getCurrentRegionBatch() {
return new ArrayList<>(currentRegionBatch);
public long getRegionsReopened() {
return regionsReopened;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public long getBatchesProcessed() {
return batchesProcessed;
}

private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) {
Expand Down Expand Up @@ -144,6 +159,9 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
if (!regions.isEmpty()) {
batchesProcessed++;
}
currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
for (HRegionLocation loc : currentRegionBatch) {
RegionStateNode regionNode =
Expand All @@ -164,6 +182,7 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
regionNode.unlock();
}
addChildProcedure(proc);
regionsReopened++;
}
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
return Flow.HAS_MORE_STATE;
Expand All @@ -183,19 +202,22 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
return Flow.NO_MORE_STATE;
} else {
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
if (reopenBatchBackoffMillis > 0) {
backoff(reopenBatchBackoffMillis);
setBackoffStateAndSuspend(reopenBatchBackoffMillis);
} else {
return Flow.HAS_MORE_STATE;
}
return Flow.HAS_MORE_STATE;
}
}
if (currentRegionBatch.stream().anyMatch(loc -> canSchedule(env, loc))) {
retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
if (reopenBatchBackoffMillis > 0) {
backoff(reopenBatchBackoffMillis);
setBackoffStateAndSuspend(reopenBatchBackoffMillis);
} else {
return Flow.HAS_MORE_STATE;
}
return Flow.HAS_MORE_STATE;
}
// We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
// again.
Expand All @@ -207,14 +229,13 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
"There are still {} region(s) which need to be reopened for table {}. {} are in "
+ "OPENING state, suspend {}secs and try again later",
regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000);
backoff(backoffMillis);
throw new ProcedureSuspendedException();
setBackoffStateAndSuspend(backoffMillis);
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}

private void backoff(long millis) throws ProcedureSuspendedException {
private void setBackoffStateAndSuspend(long millis) throws ProcedureSuspendedException {
setTimeout(Math.toIntExact(millis));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionState.State;
Expand Down Expand Up @@ -60,7 +59,7 @@ public class TestReopenTableRegionsProcedureBatching {

private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static final int BACKOFF_MILLIS_PER_RS = 0;
private static final int REOPEN_BATCH_SIZE = 1;
private static final int REOPEN_BATCH_SIZE_MAX = 1;

private static TableName TABLE_NAME = TableName.valueOf("Batching");

Expand All @@ -80,7 +79,7 @@ public static void tearDown() throws Exception {
}

@Test
public void testRegionBatching() throws IOException {
public void testSmallMaxBatchSize() throws IOException {
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
Expand All @@ -89,15 +88,23 @@ public void testRegionBatching() throws IOException {
Set<StuckRegion> stuckRegions =
regions.stream().map(r -> stickRegion(am, procExec, r)).collect(Collectors.toSet());
ReopenTableRegionsProcedure proc =
new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, REOPEN_BATCH_SIZE);
new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, REOPEN_BATCH_SIZE_MAX);
procExec.submitProcedure(proc);
UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
confirmBatchSize(REOPEN_BATCH_SIZE, stuckRegions, proc);

// the first batch should be small
confirmBatchSize(1, stuckRegions, proc);
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);

// other batches should also be small
assertTrue(proc.getBatchesProcessed() >= regions.size());

// all regions should only be opened once
assertEquals(proc.getRegionsReopened(), regions.size());
}

@Test
public void testNoRegionBatching() throws IOException {
public void testDefaultMaxBatchSize() throws IOException {
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
Expand All @@ -108,19 +115,51 @@ public void testNoRegionBatching() throws IOException {
ReopenTableRegionsProcedure proc = new ReopenTableRegionsProcedure(TABLE_NAME);
procExec.submitProcedure(proc);
UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);
confirmBatchSize(regions.size(), stuckRegions, proc);

// the first batch should be small
confirmBatchSize(1, stuckRegions, proc);
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);

// other batches should get larger
assertTrue(proc.getBatchesProcessed() < regions.size());

// all regions should only be opened once
assertEquals(proc.getRegionsReopened(), regions.size());
}

@Test
public void testNegativeBatchSizeDoesNotBreak() throws IOException {
AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
List<RegionInfo> regions = UTIL.getAdmin().getRegions(TABLE_NAME);
assertTrue(2 <= regions.size());
Set<StuckRegion> stuckRegions =
regions.stream().map(r -> stickRegion(am, procExec, r)).collect(Collectors.toSet());
ReopenTableRegionsProcedure proc =
new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, -1);
procExec.submitProcedure(proc);
UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);

// the first batch should be small
confirmBatchSize(1, stuckRegions, proc);
ProcedureSyncWait.waitForProcedureToComplete(procExec, proc, 60_000);

// other batches should also be small
assertTrue(proc.getBatchesProcessed() >= regions.size());

// all regions should only be opened once
assertEquals(proc.getRegionsReopened(), regions.size());
}

private void confirmBatchSize(int expectedBatchSize, Set<StuckRegion> stuckRegions,
ReopenTableRegionsProcedure proc) {
while (true) {
List<HRegionLocation> currentRegionBatch = proc.getCurrentRegionBatch();
if (currentRegionBatch.isEmpty()) {
if (proc.getBatchesProcessed() == 0) {
continue;
}
stuckRegions.forEach(this::unstickRegion);
assertEquals(expectedBatchSize, currentRegionBatch.size());
UTIL.waitFor(5000, () -> expectedBatchSize == proc.getRegionsReopened());
break;
}
}
Expand Down

0 comments on commit 36f7ee1

Please sign in to comment.