diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java index cc163ac199de..e1a216d64d3d 100644 --- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupBasedLoadBalancer.java @@ -20,8 +20,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +79,17 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer { private volatile RSGroupInfoManager rsGroupInfoManager; private LoadBalancer internalBalancer; + /** + * Define the config key of fallback groups + * Enabled only if this property is set + * Please keep balancer switch on at the same time, which is relied on to correct misplaced + * regions + */ + public static final String FALLBACK_GROUPS_KEY = "hbase.rsgroup.fallback.groups"; + + private boolean fallbackEnabled = false; + private Set fallbackGroups; + /** * Used by reflection in {@link org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory}. */ @@ -198,11 +211,14 @@ public Map> retainAssignment(Map currentAssignmentMap = new TreeMap(); - List regionList = groupToRegion.get(key); - RSGroupInfo info = rsGroupInfoManager.getRSGroup(key); + List regionList = groupToRegion.get(group); + RSGroupInfo info = rsGroupInfoManager.getRSGroup(group); List candidateList = filterOfflineServers(info, servers); + if (fallbackEnabled && candidateList.isEmpty()) { + candidateList = getFallBackCandidates(servers); + } for (RegionInfo region : regionList) { currentAssignmentMap.put(region, regions.get(region)); } @@ -211,7 +227,7 @@ public Map> retainAssignment(Map new ArrayList<>()) @@ -249,7 +265,10 @@ private void generateGroupMaps(List regions, List server for (String groupKey : regionMap.keySet()) { RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupKey); serverMap.putAll(groupKey, filterOfflineServers(info, servers)); - if(serverMap.get(groupKey).size() < 1) { + if (fallbackEnabled && serverMap.get(groupKey).isEmpty()) { + serverMap.putAll(groupKey, getFallBackCandidates(servers)); + } + if (serverMap.get(groupKey).isEmpty()) { serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME); } } @@ -369,6 +388,12 @@ public void initialize() throws HBaseIOException { } internalBalancer.setConf(config); internalBalancer.initialize(); + // init fallback groups + Collection groups = config.getTrimmedStringCollection(FALLBACK_GROUPS_KEY); + if (groups != null && !groups.isEmpty()) { + this.fallbackEnabled = true; + this.fallbackGroups = new HashSet<>(groups); + } } public boolean isOnline() { @@ -449,4 +474,17 @@ public List balanceTable(TableName tableName, } return regionPlans; } + + private List getFallBackCandidates(List servers) { + List serverNames = new ArrayList<>(); + for (String fallbackGroup : fallbackGroups) { + try { + RSGroupInfo info = rsGroupInfoManager.getRSGroup(fallbackGroup); + serverNames.addAll(filterOfflineServers(info, servers)); + } catch (IOException e) { + LOG.error("Get group info for {} failed", fallbackGroup, e); + } + } + return serverNames; + } } diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java index a134a832cbb6..0b875f59eb75 100644 --- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java @@ -71,7 +71,7 @@ public abstract class TestRSGroupsBase { protected final static Random rand = new Random(); //shared, cluster type specific - protected static HBaseTestingUtility TEST_UTIL; + protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected static Admin admin; protected static HBaseCluster cluster; protected static RSGroupAdmin rsGroupAdmin; @@ -90,7 +90,6 @@ public abstract class TestRSGroupsBase { protected TableName tableName; public static void setUpTestBeforeClass() throws Exception { - TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL.getConfiguration().setFloat( "hbase.master.balancer.stochastic.tableSkewCost", 6000); TEST_UTIL.getConfiguration().set( diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java new file mode 100644 index 000000000000..2e8c667ab049 --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsFallback.java @@ -0,0 +1,110 @@ +package org.apache.hadoop.hbase.rsgroup; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(MediumTests.class) +public class TestRSGroupsFallback extends TestRSGroupsBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRSGroupsFallback.class); + + protected static final Logger LOG = LoggerFactory.getLogger(TestRSGroupsFallback.class); + + private static final String FALLBACK_GROUP = "fallback"; + + @BeforeClass + public static void setUp() throws Exception { + Configuration configuration = TEST_UTIL.getConfiguration(); + configuration.set(RSGroupBasedLoadBalancer.FALLBACK_GROUPS_KEY, FALLBACK_GROUP); + setUpTestBeforeClass(); + master.balanceSwitch(true); + } + + @AfterClass + public static void tearDown() throws Exception { + tearDownAfterClass(); + } + + @Before + public void beforeMethod() throws Exception { + setUpBeforeMethod(); + } + + @After + public void afterMethod() throws Exception { + tearDownAfterMethod(); + } + + @Test + public void testGroupFallback() throws Exception { + // add fallback group + addGroup(FALLBACK_GROUP, 1); + // add test group + String groupName = getGroupName(name.getMethodName()); + addGroup(groupName, 1); + TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).build()) + .build(); + admin.createTable(desc); + rsGroupAdmin.moveTables(Collections.singleton(tableName), groupName); + TEST_UTIL.waitUntilAllRegionsAssigned(tableName); + // server of test group crash + for (Address server : rsGroupAdmin.getRSGroupInfo(groupName).getServers()) { + AssignmentTestingUtil.crashRs(TEST_UTIL, getServerName(server), true); + } + TEST_UTIL.waitUntilNoRegionsInTransition(10000); + TEST_UTIL.waitUntilAllRegionsAssigned(tableName); + + // regions move to fallback group + assertRegionsInGroup(FALLBACK_GROUP); + + // move a new server from default group + Address address = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getServers() + .iterator().next(); + rsGroupAdmin.moveServers(Collections.singleton(address), groupName); + + // correct misplaced regions + master.balance(); + + TEST_UTIL.waitUntilNoRegionsInTransition(10000); + TEST_UTIL.waitUntilAllRegionsAssigned(tableName); + + // regions move back + assertRegionsInGroup(groupName); + + TEST_UTIL.deleteTable(tableName); + } + + private void assertRegionsInGroup(String group) throws IOException { + RSGroupInfo fallbackGroup = rsGroupAdmin.getRSGroupInfo(group); + master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName).forEach(region -> { + Address regionOnServer = master.getAssignmentManager().getRegionStates() + .getRegionAssignments().get(region).getAddress(); + assertTrue(fallbackGroup.getServers().contains(regionOnServer)); + }); + } + +}