Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Nov 29, 2023
1 parent 36f7ee1 commit afa02f9
Showing 1 changed file with 53 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -159,10 +158,12 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
case REOPEN_TABLE_REGIONS_REOPEN_REGIONS:
if (!regions.isEmpty()) {
// if we didn't finish reopening the last batch yet, let's keep trying until we do.
// at that point, the batch will be empty and we can generate a new batch
if (!regions.isEmpty() && currentRegionBatch.isEmpty()) {
currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
batchesProcessed++;
}
currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList());
for (HRegionLocation loc : currentRegionBatch) {
RegionStateNode regionNode =
env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion());
Expand All @@ -187,59 +188,65 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED);
return Flow.HAS_MORE_STATE;
case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED:
regions = regions.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
.filter(l -> l != null).collect(Collectors.toList());
// we need to create a set of region names because the HRegionLocation hashcode is only
// based
// on the server name
Set<byte[]> currentRegionBatchNames = currentRegionBatch.stream()
.map(r -> r.getRegion().getRegionName()).collect(Collectors.toSet());
currentRegionBatch = regions.stream()
.filter(r -> currentRegionBatchNames.contains(r.getRegion().getRegionName()))
.collect(Collectors.toList());
if (currentRegionBatch.isEmpty()) {
if (regions.isEmpty()) {
return Flow.NO_MORE_STATE;
} else {
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
if (reopenBatchBackoffMillis > 0) {
setBackoffStateAndSuspend(reopenBatchBackoffMillis);
} else {
return Flow.HAS_MORE_STATE;
}
}
// update region lists based on what's been reopened
regions = filterReopened(env, regions);
currentRegionBatch = filterReopened(env, currentRegionBatch);

// existing batch didn't fully reopen, so try to resolve that first.
// since this is a retry, don't do the batch backoff
if (!currentRegionBatch.isEmpty()) {
return reopenIfSchedulable(env, currentRegionBatch, false);
}
if (currentRegionBatch.stream().anyMatch(loc -> canSchedule(env, loc))) {
retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
if (reopenBatchBackoffMillis > 0) {
setBackoffStateAndSuspend(reopenBatchBackoffMillis);
} else {
return Flow.HAS_MORE_STATE;
}
}
// We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
// again.
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());

if (regions.isEmpty()) {
return Flow.NO_MORE_STATE;
}
long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.info(
"There are still {} region(s) which need to be reopened for table {}. {} are in "
+ "OPENING state, suspend {}secs and try again later",
regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000);
setBackoffStateAndSuspend(backoffMillis);

// current batch is finished, schedule more regions
return reopenIfSchedulable(env, regions, true);
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}

private void setBackoffStateAndSuspend(long millis) throws ProcedureSuspendedException {
private List<HRegionLocation> filterReopened(MasterProcedureEnv env,
List<HRegionLocation> regionsToCheck) {
return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened)
.filter(l -> l != null).collect(Collectors.toList());
}

private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> regionsToReopen,
boolean shouldBatchBackoff) throws ProcedureSuspendedException {
if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) {
retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize);
if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) {
setBackoffState(reopenBatchBackoffMillis);
throw new ProcedureSuspendedException();
} else {
return Flow.HAS_MORE_STATE;
}
}

// We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
// again.
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.info(
"There are still {} region(s) which need to be reopened for table {}. {} are in "
+ "OPENING state, suspend {}secs and try again later",
regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000);
setBackoffState(backoffMillis);
throw new ProcedureSuspendedException();
}

private void setBackoffState(long millis) {
setTimeout(Math.toIntExact(millis));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}

private List<HRegionLocation>
Expand Down

0 comments on commit afa02f9

Please sign in to comment.