Skip to content

Commit

Permalink
simplify, protect against overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Dec 1, 2023
1 parent 8a9f8f4 commit 08991b2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,13 @@ public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]>
this.tableName = tableName;
this.regionNames = regionNames;
this.reopenBatchBackoffMillis = reopenBatchBackoffMillis;
this.reopenBatchSize = reopenBatchSizeMax != PROGRESSIVE_BATCH_SIZE_MAX_DISABLED
? 1
: PROGRESSIVE_BATCH_SIZE_MAX_DEFAULT_VALUE;
this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX);
if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) {
this.reopenBatchSize = Integer.MAX_VALUE;
this.reopenBatchSizeMax = Integer.MAX_VALUE;
} else {
this.reopenBatchSize = 1;
this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX);
}
}

@Override
Expand All @@ -134,6 +137,18 @@ public long getBatchesProcessed() {
return batchesProcessed;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
protected int progressBatchSize() {
int previousBatchSize = reopenBatchSize;
reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
if (reopenBatchSize < previousBatchSize) {
// the batch size should never decrease. this must be overflow, so just use max
reopenBatchSize = reopenBatchSizeMax;
}
return reopenBatchSize;
}

private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) {
if (loc.getSeqNum() < 0) {
return false;
Expand Down Expand Up @@ -224,7 +239,7 @@ private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> r
retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) {
reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
progressBatchSize();
setBackoffState(reopenBatchBackoffMillis);
throw new ProcedureSuspendedException();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void testNegativeBatchSizeDoesNotBreak() throws IOException {
Set<StuckRegion> stuckRegions =
regions.stream().map(r -> stickRegion(am, procExec, r)).collect(Collectors.toSet());
ReopenTableRegionsProcedure proc =
new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, -1);
new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, -100);
procExec.submitProcedure(proc);
UTIL.waitFor(10000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT);

Expand All @@ -149,6 +149,17 @@ public void testNegativeBatchSizeDoesNotBreak() throws IOException {
assertEquals(proc.getRegionsReopened(), regions.size());
}

@Test
public void testBatchSizeDoesNotOverflow() {
ReopenTableRegionsProcedure proc =
new ReopenTableRegionsProcedure(TABLE_NAME, BACKOFF_MILLIS_PER_RS, Integer.MAX_VALUE);
int currentBatchSize = 1;
while (currentBatchSize < Integer.MAX_VALUE) {
currentBatchSize = proc.progressBatchSize();
assertTrue(currentBatchSize > 0);
}
}

private void confirmBatchSize(int expectedBatchSize, Set<StuckRegion> stuckRegions,
ReopenTableRegionsProcedure proc) {
while (true) {
Expand Down

0 comments on commit 08991b2

Please sign in to comment.