Skip to content

Commit

Permalink
HBASE-23006 RSGroupBasedLoadBalancer should also try to place replica…
Browse files Browse the repository at this point in the history
…s for the same region to different region servers (apache#605)

Signed-off-by: Guanghao Zhang <[email protected]>
  • Loading branch information
Apache9 committed Sep 20, 2019
1 parent 71f3c5e commit be6b890
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 117 deletions.
6 changes: 6 additions & 0 deletions hbase-rsgroup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-procedure</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-procedure</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,21 @@ public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> cluster
}

@Override
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
List<RegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> regions,
List<ServerName> servers) throws HBaseIOException {
Map<ServerName, List<RegionInfo>> assignments = Maps.newHashMap();
ListMultimap<String,RegionInfo> regionMap = ArrayListMultimap.create();
ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
ListMultimap<String, RegionInfo> regionMap = ArrayListMultimap.create();
ListMultimap<String, ServerName> serverMap = ArrayListMultimap.create();
generateGroupMaps(regions, servers, regionMap, serverMap);
for(String groupKey : regionMap.keySet()) {
for (String groupKey : regionMap.keySet()) {
if (regionMap.get(groupKey).size() > 0) {
Map<ServerName, List<RegionInfo>> result =
this.internalBalancer.roundRobinAssignment(
regionMap.get(groupKey),
serverMap.get(groupKey));
if(result != null) {
if(result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)){
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).addAll(
result.get(LoadBalancer.BOGUS_SERVER_NAME));
Map<ServerName, List<RegionInfo>> result = this.internalBalancer
.roundRobinAssignment(regionMap.get(groupKey), serverMap.get(groupKey));
if (result != null) {
if (result.containsKey(LoadBalancer.BOGUS_SERVER_NAME) &&
assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
assignments.get(LoadBalancer.BOGUS_SERVER_NAME)
.addAll(result.get(LoadBalancer.BOGUS_SERVER_NAME));
} else {
assignments.putAll(result);
}
Expand All @@ -193,24 +191,19 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(
}

@Override
public Map<ServerName, List<RegionInfo>> retainAssignment(
Map<RegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, ServerName> regions,
List<ServerName> servers) throws HBaseIOException {
try {
Map<ServerName, List<RegionInfo>> assignments = new TreeMap<>();
ListMultimap<String, RegionInfo> groupToRegion = ArrayListMultimap.create();
Set<RegionInfo> misplacedRegions = getMisplacedRegions(regions);
for (RegionInfo region : regions.keySet()) {
if (!misplacedRegions.contains(region)) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + region.getTable() + ", using default");
groupName = RSGroupInfo.DEFAULT_GROUP;
}
groupToRegion.put(groupName, region);
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + region.getTable() + ", using default");
groupName = RSGroupInfo.DEFAULT_GROUP;
}
groupToRegion.put(groupName, region);
}
// Now the "groupToRegion" map has only the regions which have correct
// assignments.
for (String key : groupToRegion.keySet()) {
Map<RegionInfo, ServerName> currentAssignmentMap = new TreeMap<RegionInfo, ServerName>();
List<RegionInfo> regionList = groupToRegion.get(key);
Expand All @@ -219,34 +212,16 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(
for (RegionInfo region : regionList) {
currentAssignmentMap.put(region, regions.get(region));
}
if(candidateList.size() > 0) {
assignments.putAll(this.internalBalancer.retainAssignment(
currentAssignmentMap, candidateList));
if (candidateList.size() > 0) {
assignments
.putAll(this.internalBalancer.retainAssignment(currentAssignmentMap, candidateList));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No available servers to assign regions: {}",
RegionInfo.getShortNameToLog(regionList));
RegionInfo.getShortNameToLog(regionList));
}
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
.addAll(regionList);
}
}

for (RegionInfo region : misplacedRegions) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + region.getTable() + ", using default");
groupName = RSGroupInfo.DEFAULT_GROUP;
}
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
List<ServerName> candidateList = filterOfflineServers(info, servers);
ServerName server = this.internalBalancer.randomAssignment(region,
candidateList);
if (server != null) {
assignments.computeIfAbsent(server, s -> new ArrayList<>()).add(region);
} else {
assignments.computeIfAbsent(LoadBalancer.BOGUS_SERVER_NAME, s -> new ArrayList<>())
.add(region);
.addAll(regionList);
}
}
return assignments;
Expand All @@ -265,11 +240,9 @@ public ServerName randomAssignment(RegionInfo region,
return this.internalBalancer.randomAssignment(region, filteredServers);
}

private void generateGroupMaps(
List<RegionInfo> regions,
List<ServerName> servers,
ListMultimap<String, RegionInfo> regionMap,
ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
private void generateGroupMaps(List<RegionInfo> regions, List<ServerName> servers,
ListMultimap<String, RegionInfo> regionMap, ListMultimap<String, ServerName> serverMap)
throws HBaseIOException {
try {
for (RegionInfo region : regions) {
String groupName = rsGroupInfoManager.getRSGroupOfTable(region.getTable());
Expand Down Expand Up @@ -303,63 +276,26 @@ private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,

/**
* Filter servers based on the online servers.
*
* @param servers
* the servers
* @param onlineServers
* List of servers which are online.
* <p/>
* servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}), having
* its contains()'s time complexity as O(logn), which is good enough.
* <p/>
* TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain if
* needed.
* @param servers the servers
* @param onlineServers List of servers which are online.
* @return the list
*/
private List<ServerName> filterServers(Set<Address> servers,
List<ServerName> onlineServers) {
/**
* servers is actually a TreeSet (see {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo}),
* having its contains()'s time complexity as O(logn), which is good enough.
* TODO: consider using HashSet to pursue O(1) for contains() throughout the calling chain
* if needed. */
private List<ServerName> filterServers(Set<Address> servers, List<ServerName> onlineServers) {
ArrayList<ServerName> finalList = new ArrayList<>();
for (ServerName onlineServer : onlineServers) {
if (servers.contains(onlineServer.getAddress())) {
finalList.add(onlineServer);
}
}

return finalList;
}

@VisibleForTesting
public Set<RegionInfo> getMisplacedRegions(
Map<RegionInfo, ServerName> regions) throws IOException {
Set<RegionInfo> misplacedRegions = new HashSet<>();
for(Map.Entry<RegionInfo, ServerName> region : regions.entrySet()) {
RegionInfo regionInfo = region.getKey();
ServerName assignedServer = region.getValue();
String groupName = rsGroupInfoManager.getRSGroupOfTable(regionInfo.getTable());
if (groupName == null) {
LOG.debug("Group not found for table " + regionInfo.getTable() + ", using default");
groupName = RSGroupInfo.DEFAULT_GROUP;
}
RSGroupInfo info = rsGroupInfoManager.getRSGroup(groupName);
if (assignedServer == null) {
LOG.debug("There is no assigned server for {}", region);
continue;
}
RSGroupInfo otherInfo = rsGroupInfoManager.getRSGroupOfServer(assignedServer.getAddress());
if (info == null && otherInfo == null) {
LOG.warn("Couldn't obtain rs group information for {} on {}", region, assignedServer);
continue;
}
if ((info == null || !info.containsServer(assignedServer.getAddress()))) {
LOG.debug("Found misplaced region: " + regionInfo.getRegionNameAsString() +
" on server: " + assignedServer +
" found in group: " + otherInfo +
" outside of group: " + (info == null ? "UNKNOWN" : info.getName()));
misplacedRegions.add(regionInfo);
}
}
return misplacedRegions;
}

private ServerName findServerForRegion(
Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region) {
for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.balancer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
Expand All @@ -36,7 +35,6 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.net.Address;
Expand Down Expand Up @@ -131,21 +129,6 @@ public void testBulkAssignment() throws Exception {
assertClusterAsBalanced(loadMap);
}

@Test
public void testGetMisplacedRegions() throws Exception {
// Test case where region is not considered misplaced if RSGroupInfo cannot be determined
Map<RegionInfo, ServerName> inputForTest = new HashMap<>();
RegionInfo ri = RegionInfoBuilder.newBuilder(table0)
.setStartKey(new byte[16])
.setEndKey(new byte[16])
.setSplit(false)
.setRegionId(regionId++)
.build();
inputForTest.put(ri, servers.iterator().next());
Set<RegionInfo> misplacedRegions = loadBalancer.getMisplacedRegions(inputForTest);
assertFalse(misplacedRegions.contains(ri));
}

/**
* Test the cluster startup bulk assignment which attempts to retain assignment info.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ MediumTests.class })
public class TestSCPWithReplicasWithRSGroup extends TestSCPBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSCPWithReplicasWithRSGroup.class);

@Override
protected void setupConf(Configuration conf) {
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class,
LoadBalancer.class);
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName());
}

@Override
protected void startMiniCluster() throws Exception {
this.util.startMiniCluster(4);
}

@Override
protected int getRegionReplication() {
return 3;
}

@Test
public void testCrashTargetRs() throws Exception {
testRecoveryAndDoubleExecution(false, false);
}
}

0 comments on commit be6b890

Please sign in to comment.