Skip to content

Commit

Permalink
HBASE-25534 Honor TableDescriptor settings earlier in normalization (#…
Browse files Browse the repository at this point in the history
…2917)

Signed-off-by: Nick Dimiduk <[email protected]>
  • Loading branch information
ZhaoBQ authored May 25, 2021
1 parent 2cb6cc8 commit 563ebc2
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -38,17 +38,17 @@
interface RegionNormalizer extends Configurable {
/**
* Set the master service. Must be called before first call to
* {@link #computePlansForTable(TableName)}.
* {@link #computePlansForTable(TableDescriptor)}.
* @param masterServices master services to use
*/
void setMasterServices(MasterServices masterServices);

/**
* Computes a list of normalizer actions to perform on the target table. This is the primary
* entry-point from the Master driving a normalization activity.
* @param table table to normalize
* @param tableDescriptor table descriptor for table which needs normalize
* @return A list of the normalization actions to perform, or an empty list
* if there's nothing to do.
*/
List<NormalizationPlan> computePlansForTable(TableName table);
List<NormalizationPlan> computePlansForTable(TableDescriptor tableDescriptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
return Collections.emptyList();
}

final TableDescriptor tblDesc;
try {
final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName);
tblDesc = masterServices.getTableDescriptors().get(tableName);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug("Skipping table {} because normalization is disabled in its table properties.",
tableName);
Expand All @@ -190,7 +191,7 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
return Collections.emptyList();
}

final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName);
final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.normalizer;

import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;
import java.io.IOException;
import java.time.Instant;
import java.time.Period;
import java.util.ArrayList;
Expand All @@ -27,6 +26,7 @@
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.RegionMetrics;
Expand Down Expand Up @@ -184,23 +184,24 @@ public void setMasterServices(final MasterServices masterServices) {
}

@Override
public List<NormalizationPlan> computePlansForTable(final TableName table) {
if (table == null) {
public List<NormalizationPlan> computePlansForTable(final TableDescriptor tableDescriptor) {
if (tableDescriptor == null) {
return Collections.emptyList();
}
TableName table = tableDescriptor.getTableName();
if (table.isSystemTable()) {
LOG.debug("Normalization of system table {} isn't allowed", table);
return Collections.emptyList();
}

final boolean proceedWithSplitPlanning = proceedWithSplitPlanning();
final boolean proceedWithMergePlanning = proceedWithMergePlanning();
final boolean proceedWithSplitPlanning = proceedWithSplitPlanning(tableDescriptor);
final boolean proceedWithMergePlanning = proceedWithMergePlanning(tableDescriptor);
if (!proceedWithMergePlanning && !proceedWithSplitPlanning) {
LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
return Collections.emptyList();
}

final NormalizeContext ctx = new NormalizeContext(table);
final NormalizeContext ctx = new NormalizeContext(tableDescriptor);
if (isEmpty(ctx.getTableRegions())) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -254,41 +255,38 @@ private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
return masterServices.isSplitOrMergeEnabled(masterSwitchType);
}

private boolean proceedWithSplitPlanning() {
return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
private boolean proceedWithSplitPlanning(TableDescriptor tableDescriptor) {
String value = tableDescriptor.getValue(SPLIT_ENABLED_KEY);
return (value == null ? isSplitEnabled() : Boolean.parseBoolean(value)) &&
isMasterSwitchEnabled(MasterSwitchType.SPLIT);
}

private boolean proceedWithMergePlanning() {
return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
private boolean proceedWithMergePlanning(TableDescriptor tableDescriptor) {
String value = tableDescriptor.getValue(MERGE_ENABLED_KEY);
return (value == null ? isMergeEnabled() : Boolean.parseBoolean(value)) &&
isMasterSwitchEnabled(MasterSwitchType.MERGE);
}

/**
* @param tableRegions regions of table to normalize
* @param tableDescriptor the TableDescriptor
* @return average region size depending on
* @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
* Also make sure tableRegions contains regions of the same table
*/
private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions,
final TableDescriptor tableDescriptor) {
if (isEmpty(tableRegions)) {
throw new IllegalStateException(
"Cannot calculate average size of a table without any regions.");
}
TableName table = tableRegions.get(0).getTable();
int targetRegionCount = -1;
long targetRegionSize = -1;
TableName table = tableDescriptor.getTableName();
double avgRegionSize;
try {
TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
if (tableDescriptor != null) {
targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
LOG.debug("Table {} configured with target region count {}, target region size {} MB",
table, targetRegionCount, targetRegionSize);
}
} catch (IOException e) {
LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+ " configurations cannot be considered.", table, e);
}
int targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
long targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
LOG.debug("Table {} configured with target region count {}, target region size {}", table,
targetRegionCount, targetRegionSize);

if (targetRegionSize > 0) {
avgRegionSize = targetRegionSize;
} else {
Expand Down Expand Up @@ -316,10 +314,10 @@ private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
*/
private boolean skipForMerge(
final NormalizerConfiguration normalizerConfiguration,
final RegionStates regionStates,
final NormalizeContext ctx,
final RegionInfo regionInfo
) {
final RegionState state = regionStates.getRegionState(regionInfo);
final RegionState state = ctx.getRegionStates().getRegionState(regionInfo);
final String name = regionInfo.getEncodedName();
return
logTraceReason(
Expand All @@ -329,10 +327,10 @@ private boolean skipForMerge(
() -> !Objects.equals(state.getState(), RegionState.State.OPEN),
"skipping merge of region {} because it is not open.", name)
|| logTraceReason(
() -> !isOldEnoughForMerge(normalizerConfiguration, regionInfo),
() -> !isOldEnoughForMerge(normalizerConfiguration, ctx, regionInfo),
"skipping merge of region {} because it is not old enough.", name)
|| logTraceReason(
() -> !isLargeEnoughForMerge(normalizerConfiguration, regionInfo),
() -> !isLargeEnoughForMerge(normalizerConfiguration, ctx, regionInfo),
"skipping merge region {} because it is not large enough.", name);
}

Expand All @@ -342,15 +340,15 @@ private boolean skipForMerge(
*/
private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
final NormalizerConfiguration configuration = normalizerConfiguration;
if (ctx.getTableRegions().size() < configuration.getMinRegionCount()) {
if (ctx.getTableRegions().size() < configuration.getMinRegionCount(ctx)) {
LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
+ " is {}, not computing merge plans.", ctx.getTableName(),
ctx.getTableRegions().size(), configuration.getMinRegionCount());
return Collections.emptyList();
}

final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb();
if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb()) {
if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb(ctx)) {
return Collections.emptyList();
}
LOG.debug("Computing normalization plan for table {}. average region size: {} MB, number of"
Expand All @@ -374,7 +372,7 @@ private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeCo
for (current = rangeStart; current < ctx.getTableRegions().size(); current++) {
final RegionInfo regionInfo = ctx.getTableRegions().get(current);
final long regionSizeMb = getRegionSizeMB(regionInfo);
if (skipForMerge(configuration, ctx.getRegionStates(), regionInfo)) {
if (skipForMerge(configuration, ctx, regionInfo)) {
// this region cannot participate in a range. resume the outer loop.
rangeStart = Math.max(current, rangeStart + 1);
break;
Expand Down Expand Up @@ -454,12 +452,13 @@ private List<NormalizationPlan> computeSplitNormalizationPlans(final NormalizeCo
*/
private static boolean isOldEnoughForMerge(
final NormalizerConfiguration normalizerConfiguration,
final NormalizeContext ctx,
final RegionInfo regionInfo
) {
final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
return currentTime.isAfter(
regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge()));
regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge(ctx)));
}

/**
Expand All @@ -471,9 +470,10 @@ private static boolean isOldEnoughForMerge(
*/
private boolean isLargeEnoughForMerge(
final NormalizerConfiguration normalizerConfiguration,
final NormalizeContext ctx,
final RegionInfo regionInfo
) {
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb();
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
}

private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
Expand Down Expand Up @@ -544,28 +544,56 @@ public int getMinRegionCount() {
return minRegionCount;
}

public int getMinRegionCount(NormalizeContext context) {
int minRegionCount = context.getOrDefault(MIN_REGION_COUNT_KEY, Integer::parseInt, 0);
if (minRegionCount <= 0) {
minRegionCount = getMinRegionCount();
}
return minRegionCount;
}

public Period getMergeMinRegionAge() {
return mergeMinRegionAge;
}

public Period getMergeMinRegionAge(NormalizeContext context) {
int mergeMinRegionAge = context.getOrDefault(MERGE_MIN_REGION_AGE_DAYS_KEY,
Integer::parseInt, -1);
if (mergeMinRegionAge < 0) {
return getMergeMinRegionAge();
}
return Period.ofDays(mergeMinRegionAge);
}

public long getMergeMinRegionSizeMb() {
return mergeMinRegionSizeMb;
}

public long getMergeMinRegionSizeMb(NormalizeContext context) {
long mergeMinRegionSizeMb = context.getOrDefault(MERGE_MIN_REGION_SIZE_MB_KEY,
Long::parseLong, (long)-1);
if (mergeMinRegionSizeMb < 0) {
mergeMinRegionSizeMb = getMergeMinRegionSizeMb();
}
return mergeMinRegionSizeMb;
}
}

/**
* Inner class caries the state necessary to perform a single invocation of
* {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
* {@link #computePlansForTable(TableDescriptor)}. Grabbing this data from the assignment manager
* up-front allows any computed values to be realized just once.
*/
private class NormalizeContext {
private final TableName tableName;
private final RegionStates regionStates;
private final List<RegionInfo> tableRegions;
private final double averageRegionSizeMb;
private final TableDescriptor tableDescriptor;

public NormalizeContext(final TableName tableName) {
this.tableName = tableName;
public NormalizeContext(final TableDescriptor tableDescriptor) {
this.tableDescriptor = tableDescriptor;
tableName = tableDescriptor.getTableName();
regionStates = SimpleRegionNormalizer.this.masterServices
.getAssignmentManager()
.getRegionStates();
Expand All @@ -577,7 +605,8 @@ public NormalizeContext(final TableName tableName) {
// In order to avoid that, sort the list by RegionInfo.COMPARATOR.
// See HBASE-24376
tableRegions.sort(RegionInfo.COMPARATOR);
averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions);
averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions,
this.tableDescriptor);
}

public TableName getTableName() {
Expand All @@ -595,5 +624,14 @@ public List<RegionInfo> getTableRegions() {
public double getAverageRegionSizeMb() {
return averageRegionSizeMb;
}

public <T> T getOrDefault(String key, Function<String, T> function, T defaultValue) {
String value = tableDescriptor.getValue(key);
if (value == null) {
return defaultValue;
} else {
return function.apply(value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testMergeCounter() throws Exception {
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
when(regionNormalizer.computePlansForTable(tnDescriptor))
.thenReturn(singletonList(new MergeNormalizationPlan.Builder()
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20)
Expand All @@ -160,7 +160,7 @@ public void testSplitCounter() throws Exception {
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
when(regionNormalizer.computePlansForTable(tnDescriptor))
.thenReturn(singletonList(
new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));

Expand Down Expand Up @@ -192,7 +192,7 @@ public void testRateLimit() throws Exception {
.thenReturn(1L);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
when(regionNormalizer.computePlansForTable(tnDescriptor))
.thenReturn(Arrays.asList(
new SplitNormalizationPlan(splitRegionInfo, 2),
new MergeNormalizationPlan.Builder()
Expand Down
Loading

0 comments on commit 563ebc2

Please sign in to comment.