Skip to content

Commit

Permalink
HBASE-25592 Improve normalizer code in line with HBASE-23932 (#2972)
Browse files Browse the repository at this point in the history
Signed-off-by: Viraj Jasani <[email protected]>
  • Loading branch information
mnpoonia authored Feb 23, 2021
1 parent 9e9301a commit d37f734
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
Expand Down Expand Up @@ -327,7 +328,7 @@ public void run() {
new ProcedureEvent("server crash processing");

// Maximum time we should run balancer for
private final int maxBlancingTime;
private final int maxBalancingTime;
// Maximum percent of regions in transition when balancing
private final double maxRitPercent;

Expand Down Expand Up @@ -493,7 +494,7 @@ public HMaster(final Configuration conf, CoordinatedStateManager csm)
// preload table descriptor at startup
this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);

this.maxBlancingTime = getMaxBalancingTime();
this.maxBalancingTime = getMaxBalancingTime();
this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);

Expand Down Expand Up @@ -1552,13 +1553,7 @@ public boolean balance() throws IOException {

public boolean balance(boolean force) throws IOException {
// if master not initialized, don't run balancer.
if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run balancer.");
return false;
}

if (isInMaintenanceMode()) {
LOG.info("Master is in maintenanceMode mode, don't run balancer.");
if (skipRegionManagementAction("balancer")) {
return false;
}

Expand Down Expand Up @@ -1610,10 +1605,10 @@ public boolean balance(boolean force) throws IOException {
}

long balanceStartTime = System.currentTimeMillis();
long cutoffTime = balanceStartTime + this.maxBlancingTime;
long cutoffTime = balanceStartTime + this.maxBalancingTime;
int rpCount = 0; // number of RegionPlans balanced so far
if (plans != null && !plans.isEmpty()) {
int balanceInterval = this.maxBlancingTime / plans.size();
int balanceInterval = this.maxBalancingTime / plans.size();
LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
+ balanceInterval + " ms, and the max number regions in transition is "
+ maxRegionsInTransition);
Expand All @@ -1632,7 +1627,7 @@ public boolean balance(boolean force) throws IOException {
// TODO: After balance, there should not be a cutoff time (keeping it as a security net
// for now)
LOG.debug("No more balancing till next balance run; maxBalanceTime="
+ this.maxBlancingTime);
+ this.maxBalancingTime);
break;
}
}
Expand All @@ -1652,6 +1647,23 @@ public boolean balance(boolean force) throws IOException {
return true;
}

private boolean skipRegionManagementAction(String action) throws IOException {
if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run " + action);
return true;
}

if (this.getServerManager().isClusterShutdown()) {
LOG.info("CLuster is shutting down, don't run " + action);
}

if (isInMaintenanceMode()) {
LOG.info("Master is in maintenanceMode mode, don't run " + action);
return true;
}
return false;
}

/**
* Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
*
Expand All @@ -1662,50 +1674,55 @@ public boolean balance(boolean force) throws IOException {
* @throws CoordinatedStateException
*/
public boolean normalizeRegions() throws IOException, CoordinatedStateException {
if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run region normalizer.");
if (skipRegionManagementAction("normalizer")) {
return false;
}

if (isInMaintenanceMode()) {
LOG.info("Master is in maintenance mode, don't run region normalizer.");
return false;
}

if (!this.regionNormalizerTracker.isNormalizerOn()) {
if (isNormalizerOn()) {
LOG.debug("Region normalization is disabled, don't run region normalizer.");
return false;
}

synchronized (this.normalizer) {
// Don't run the normalizer concurrently
List<TableName> allEnabledTables = new ArrayList<>(
final List<TableName> allEnabledTables = new ArrayList<>(
this.assignmentManager.getTableStateManager().getTablesInStates(
TableState.State.ENABLED));

Collections.shuffle(allEnabledTables);

for (TableName table : allEnabledTables) {
if (isInMaintenanceMode()) {
LOG.debug("Master is in maintenance mode, stop running region normalizer.");
final NamespaceAuditor namespaceQuotaManager = quotaManager.getNamespaceQuotaManager();
if(namespaceQuotaManager == null) {
LOG.debug("Skipping normalizing since namespace quota is null");
return false;
}

if (quotaManager.getNamespaceQuotaManager() != null &&
quotaManager.getNamespaceQuotaManager().getState(table.getNamespaceAsString()) != null){
if (namespaceQuotaManager.getState(table.getNamespaceAsString()) != null) {
LOG.debug("Skipping normalizing " + table + " since its namespace has quota");
continue;
}
if (table.isSystemTable() || (getTableDescriptors().get(table) != null &&
!getTableDescriptors().get(table).isNormalizationEnabled())) {
LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
+ " table or doesn't have auto normalization turned on");
if (table.isSystemTable()) {
continue;
}

HTableDescriptor tableDescriptor = getTableDescriptors().get(table);
if (tableDescriptor != null && !tableDescriptor.isNormalizationEnabled()) {
LOG.debug("Skipping normalization for table: " + table
+ ", as it doesn't have auto normalization turned on");
continue;
}
List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
if (plans != null) {
// make one last check that the cluster isn't shutting down before proceeding.
if (skipRegionManagementAction("region normalizer")) {
return false;
}

List<NormalizationPlan> plans = this.normalizer.computePlansForTable(table);
if (plans == null || plans.isEmpty()) {
return true;
}
try (Admin admin = clusterConnection.getAdmin()) {
for (NormalizationPlan plan : plans) {
plan.execute(clusterConnection.getAdmin());
plan.execute(admin);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
public interface RegionNormalizer {
/**
* Set the master service. Must be called before first call to
* {@link #computePlanForTable(TableName)}.
* {@link #computePlansForTable(TableName)}.
* @param masterServices master services to use
*/
void setMasterServices(MasterServices masterServices);

/**
* Set the master RPC service. Must be called before first call to
* {@link #computePlanForTable(TableName)}.
* {@link #computePlansForTable(TableName)}.
* @param masterRpcServices master RPC services to use
*/
void setMasterRpcServices(MasterRpcServices masterRpcServices);
Expand All @@ -57,6 +57,6 @@ public interface RegionNormalizer {
* @param table table to normalize
* @return normalization actions to perform. Null if no action to take
*/
List<NormalizationPlan> computePlanForTable(TableName table)
List<NormalizationPlan> computePlansForTable(TableName table)
throws HBaseIOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,15 @@ public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
* @return normalization plan to execute
*/
@Override
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
public List<NormalizationPlan> computePlansForTable(TableName table) throws HBaseIOException {
if (table == null || table.isSystemTable()) {
LOG.debug("Normalization of system table " + table + " isn't allowed");
return null;
}
boolean splitEnabled = true, mergeEnabled = true;
try {
splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
} catch (ServiceException se) {
LOG.debug("Unable to determine whether split is enabled", se);
}
try {
mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
} catch (ServiceException se) {
LOG.debug("Unable to determine whether merge is enabled", se);
}
splitEnabled = isSplitEnabled();
mergeEnabled = isMergeEnabled();

if (!splitEnabled && !mergeEnabled) {
LOG.debug("Both split and merge are disabled for table: " + table);
return null;
Expand All @@ -142,13 +133,13 @@ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBase
", number of regions: " + tableRegions.size());

long totalSizeMb = 0;
int acutalRegionCnt = 0;
int actualRegionCnt = 0;

for (int i = 0; i < tableRegions.size(); i++) {
HRegionInfo hri = tableRegions.get(i);
long regionSize = getRegionSize(hri);
if (regionSize > 0) {
acutalRegionCnt++;
actualRegionCnt++;
totalSizeMb += regionSize;
}
}
Expand Down Expand Up @@ -176,16 +167,15 @@ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBase
} else if (targetRegionCount > 0) {
avgRegionSize = totalSizeMb / (double) targetRegionCount;
} else {
avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
avgRegionSize = actualRegionCnt == 0 ? 0 : totalSizeMb / (double) actualRegionCnt;
}

LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
LOG.debug("Table " + table + ", average region size: " + avgRegionSize);

int candidateIdx = 0;
int splitCount = 0;
int mergeCount = 0;
while (candidateIdx < tableRegions.size()) {
for (int candidateIdx = 0; candidateIdx < tableRegions.size(); candidateIdx++) {
HRegionInfo hri = tableRegions.get(candidateIdx);
long regionSize = getRegionSize(hri);
// if the region is > 2 times larger than average, we split it, split
Expand Down Expand Up @@ -214,7 +204,6 @@ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBase
}
}
}
candidateIdx++;
}
if (plans.isEmpty()) {
LOG.debug("No normalization needed, regions look good for table: " + table);
Expand All @@ -227,7 +216,38 @@ public List<NormalizationPlan> computePlanForTable(TableName table) throws HBase
}
return plans;
}
/**
* Return configured value for MasterSwitchType.SPLIT.
*/
private boolean isSplitEnabled() {
boolean splitEnabled = true;
try {
splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
} catch (ServiceException se) {
LOG.debug("Unable to determine whether split is enabled", se);
}
return splitEnabled;
}

/**
* Return configured value for MasterSwitchType.MERGE.
*/
private boolean isMergeEnabled() {
boolean mergeEnabled = true;
try {
mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
} catch (ServiceException se) {
LOG.debug("Unable to determine whether merge is enabled", se);
}
return mergeEnabled;
}

/**
* @param hri used to calculate region size
* @return region size in MB
*/
private long getRegionSize(HRegionInfo hri) {
ServerName sn =
masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testNoNormalizationForMetaTable() throws HBaseIOException {
Map<byte[], Integer> regionSizes = new HashMap<>();

setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
assertTrue(plans == null);
}

Expand All @@ -92,7 +92,7 @@ public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
regionSizes.put(hri2.getRegionName(), 15);

setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
assertTrue(plans == null);
}

Expand All @@ -119,7 +119,7 @@ public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
regionSizes.put(hri4.getRegionName(), 10);

setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
assertTrue(plans == null);
}

Expand Down Expand Up @@ -150,7 +150,7 @@ public void testMergeOfSmallRegions() throws HBaseIOException {
regionSizes.put(hri5.getRegionName(), 16);

setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);

NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan);
Expand Down Expand Up @@ -190,7 +190,7 @@ public void testMergeOfSecondSmallestRegions() throws HBaseIOException {
regionSizes.put(hri6.getRegionName(), 2700);

setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
NormalizationPlan plan = plans.get(0);

assertTrue(plan instanceof MergeNormalizationPlan);
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testMergeOfSmallNonAdjacentRegions() throws HBaseIOException {
regionSizes.put(hri5.getRegionName(), 5);

setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);

assertTrue(plans == null);
}
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testSplitOfLargeRegion() throws HBaseIOException {
regionSizes.put(hri4.getRegionName(), 30);

setupMocksForNormalizer(regionSizes, hris);
List<NormalizationPlan> plans = normalizer.computePlanForTable(testTable);
List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
NormalizationPlan plan = plans.get(0);

assertTrue(plan instanceof SplitNormalizationPlan);
Expand Down Expand Up @@ -296,7 +296,7 @@ public void testSplitWithTargetRegionCount() throws Exception {
when(
masterServices.getTableDescriptors().get((TableName) any()).getNormalizerTargetRegionSize())
.thenReturn(20L);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertEquals(4, plans.size());

for (NormalizationPlan plan : plans) {
Expand All @@ -307,7 +307,7 @@ public void testSplitWithTargetRegionCount() throws Exception {
when(
masterServices.getTableDescriptors().get((TableName) any()).getNormalizerTargetRegionSize())
.thenReturn(200L);
plans = normalizer.computePlanForTable(tableName);
plans = normalizer.computePlansForTable(tableName);
assertEquals(2, plans.size());
NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan);
Expand Down Expand Up @@ -343,7 +343,7 @@ public void testSplitWithTargetRegionSize() throws Exception {
when(
masterServices.getTableDescriptors().get((TableName) any()).getNormalizerTargetRegionCount())
.thenReturn(8);
List<NormalizationPlan> plans = normalizer.computePlanForTable(tableName);
List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
assertEquals(2, plans.size());

for (NormalizationPlan plan : plans) {
Expand All @@ -354,7 +354,7 @@ public void testSplitWithTargetRegionSize() throws Exception {
when(
masterServices.getTableDescriptors().get((TableName) any()).getNormalizerTargetRegionCount())
.thenReturn(3);
plans = normalizer.computePlanForTable(tableName);
plans = normalizer.computePlansForTable(tableName);
assertEquals(1, plans.size());
NormalizationPlan plan = plans.get(0);
assertTrue(plan instanceof MergeNormalizationPlan);
Expand Down

0 comments on commit d37f734

Please sign in to comment.