Skip to content

Commit

Permalink
HBASE-23081 Add an option to enable/disable rs group feature (apache#691
Browse files Browse the repository at this point in the history
)

Signed-off-by: Peter Somogyi <[email protected]>
  • Loading branch information
Apache9 authored and thangTang committed Apr 16, 2020
1 parent 477a588 commit 4361b42
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.security.AccessDeniedException;
Expand Down Expand Up @@ -800,6 +801,17 @@ protected void initializeZKBasedSystemTrackers()
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();

// This is for backwards compatible. We do not need the CP for rs group now but if user want to
// load it, we need to enable rs group.
String[] cpClasses = conf.getStrings(MasterCoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
if (cpClasses != null) {
for (String cpClass : cpClasses) {
if (RSGroupAdminEndpoint.class.getName().equals(cpClass)) {
conf.setBoolean(RSGroupInfoManager.RS_GROUP_ENABLED, true);
break;
}
}
}
this.rsGroupInfoManager = RSGroupInfoManager.create(this);

this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.net.Address;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A dummy RSGroupInfoManager which only contains a default rs group.
*/
@InterfaceAudience.Private
class DisabledRSGroupInfoManager implements RSGroupInfoManager {

private final ServerManager serverManager;

public DisabledRSGroupInfoManager(ServerManager serverManager) {
this.serverManager = serverManager;
}

@Override
public void start() {
}

@Override
public void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
throw new DoNotRetryIOException("RSGroup is disabled");
}

@Override
public void removeRSGroup(String groupName) throws IOException {
throw new DoNotRetryIOException("RSGroup is disabled");
}

@Override
public Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
throws IOException {
throw new DoNotRetryIOException("RSGroup is disabled");
}

private SortedSet<Address> getOnlineServers() {
SortedSet<Address> onlineServers = new TreeSet<Address>();
serverManager.getOnlineServers().keySet().stream().map(ServerName::getAddress)
.forEach(onlineServers::add);
return onlineServers;
}

@Override
public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
SortedSet<Address> onlineServers = getOnlineServers();
if (onlineServers.contains(serverHostPort)) {
return new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, onlineServers);
} else {
return null;
}
}

@Override
public RSGroupInfo getRSGroup(String groupName) throws IOException {
if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) {
return new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getOnlineServers());
} else {
return null;
}
}

@Override
public List<RSGroupInfo> listRSGroups() throws IOException {
return Arrays.asList(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getOnlineServers()));
}

@Override
public boolean isOnline() {
return true;
}

@Override
public void removeServers(Set<Address> servers) throws IOException {
}

@Override
public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
@InterfaceAudience.Private
public interface RSGroupInfoManager {

static final String RS_GROUP_ENABLED = "hbase.balancer.rsgroup.enabled";

void start();

/**
Expand Down Expand Up @@ -90,6 +92,10 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException;

static RSGroupInfoManager create(MasterServices master) throws IOException {
return RSGroupInfoManagerImpl.getInstance(master);
if (master.getConfiguration().getBoolean(RS_GROUP_ENABLED, false)) {
return RSGroupInfoManagerImpl.getInstance(master);
} else {
return new DisabledRSGroupInfoManager(master.getServerManager());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ public void verifyReservedNS() throws IOException {
assertEquals(2, admin.listNamespaceDescriptors().length);

//verify existence of system tables
Set<TableName> systemTables = Sets.newHashSet(
TableName.META_TABLE_NAME, TableName.valueOf("hbase:rsgroup"));
Set<TableName> systemTables = Sets.newHashSet(TableName.META_TABLE_NAME);
List<TableDescriptor> descs = admin.listTableDescriptorsByNamespace(
Bytes.toBytes(NamespaceDescriptor.SYSTEM_NAMESPACE.getName()));
assertEquals(systemTables.size(), descs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,14 +504,14 @@ private Table installTable(int nrs, int existingRegions) throws Exception {
for (String oregion : regions)
LOG.debug("Region still online: " + oregion);
}
assertEquals(2 + existingRegions, regions.size());
assertEquals(1 + existingRegions, regions.size());
LOG.debug("Enabling table\n");
TEST_UTIL.getAdmin().enableTable(tableName);
LOG.debug("Waiting for no more RIT\n");
blockUntilNoRIT();
LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
assertEquals(numRegions + 2 + existingRegions, regions.size());
assertEquals(numRegions + 1 + existingRegions, regions.size());
return table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class TestClusterRestart extends AbstractTestRestartCluster {

private static final Logger LOG = LoggerFactory.getLogger(TestClusterRestart.class);

private static final int NUM_REGIONS = 4;
private static final int NUM_REGIONS = 3;

@Override
protected boolean splitWALCoordinatedByZk() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
Expand Down Expand Up @@ -91,7 +90,6 @@ public static void startCluster() throws Exception {
cluster = TEST_UTIL.getHBaseCluster();
LOG.info("Waiting for active/ready master");
cluster.waitForActiveAndReadyMaster();
TEST_UTIL.waitTableAvailable(TableName.valueOf("hbase:rsgroup"));
master = cluster.getMaster();
}

Expand Down Expand Up @@ -133,7 +131,7 @@ public void testDefaultMasterMetrics() throws Exception {
MetricsMasterSource masterSource = master.getMasterMetrics().getMetricsSource();
boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration());
metricsHelper.assertGauge("numRegionServers", 1 + (tablesOnMaster ? 1 : 0), masterSource);
metricsHelper.assertGauge("averageLoad", 2, masterSource);
metricsHelper.assertGauge("averageLoad", 1, masterSource);
metricsHelper.assertGauge("numDeadRegionServers", 0, masterSource);

metricsHelper.assertGauge("masterStartTime", master.getMasterStartTime(), masterSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void testForCheckingIfEnableAndDisableWorksFineAfterSwitch()

NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
assertEquals("The number of regions for the table tableRestart should be 0 and only" +
"the catalog table should be present.", 2, regions.size());
"the catalog table should be present.", 1, regions.size());

List<MasterThread> masterThreads = cluster.getMasterThreads();
MasterThread activeMaster = null;
Expand Down Expand Up @@ -120,7 +120,7 @@ public void testForCheckingIfEnableAndDisableWorksFineAfterSwitch()
log("Verifying there are " + numRegions + " assigned on cluster\n");
regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
assertEquals("The assigned regions were not onlined after master" +
" switch except for the catalog table.", 6, regions.size());
" switch except for the catalog table.", 5, regions.size());
assertTrue("The table should be in enabled state", cluster.getMaster().getTableStateManager()
.isTableState(TableName.valueOf(name.getMethodName()), TableState.State.ENABLED));
ht.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testBasicRollingRestart() throws Exception {
log("Region still online: " + oregion);
}
}
assertEquals(2, regions.size());
assertEquals(1, regions.size());
log("Enabling table\n");
TEST_UTIL.getAdmin().enableTable(tableName);
log("Waiting for no more RIT\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public void testPriorityRegionIsOpenedWithSeparateThreadPool() throws Exception
final TableName tableName = TableName.valueOf(TestRegionOpen.class.getSimpleName());
ThreadPoolExecutor exec = getRS().getExecutorService()
.getExecutorThreadPool(ExecutorType.RS_OPEN_PRIORITY_REGION);
HTU.waitTableAvailable(TableName.valueOf("hbase:rsgroup"));
long completed = exec.getCompletedTaskCount();

HTableDescriptor htd = new HTableDescriptor(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private void assertReplicaDistributed(Collection<HRegion> onlineRegions) throws
assertFalse(res);
int totalRegions = HTU.getMiniHBaseCluster().getLiveRegionServerThreads().stream().
mapToInt(l -> l.getRegionServer().getOnlineRegions().size()).sum();
assertEquals(62, totalRegions);
assertEquals(61, totalRegions);
}

private boolean checkDuplicates(Collection<HRegion> onlineRegions3) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private void doScan(int n, boolean caching) throws IOException {

@Test
public void testRegionCount() throws Exception {
metricsHelper.assertGauge("regionCount", TABLES_ON_MASTER ? 1 : 3, serverSource);
metricsHelper.assertGauge("regionCount", TABLES_ON_MASTER ? 1 : 2, serverSource);
}

@Test
Expand Down Expand Up @@ -341,7 +341,7 @@ public void testStoreCount() throws Exception {
TEST_UTIL.getAdmin().flush(tableName);

metricsRegionServer.getRegionServerWrapper().forceRecompute();
assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 6);
assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 5);
assertGauge("storeFileCount", 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static void beforeClass() throws Exception {

// Wait for the ACL table to become available
UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
UTIL.waitTableAvailable(TableName.valueOf("hbase:rsgroup"));
UTIL.waitTableAvailable(TableName.valueOf("hbase:acl"));

ZKW = new ZKWatcher(UTIL.getConfiguration(),
"TestTablePermissions", ABORTABLE);
Expand Down Expand Up @@ -223,7 +223,7 @@ public void testBasicWrite() throws Exception {
// check full load
Map<byte[], ListMultimap<String, UserPermission>> allPerms = PermissionStorage.loadAll(conf);
assertEquals("Full permission map should have entries for both test tables",
3, allPerms.size());
2, allPerms.size());

userPerms = allPerms.get(TEST_TABLE.getName()).get("hubert");
assertNotNull(userPerms);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
Expand Down Expand Up @@ -53,7 +52,6 @@ public class TestHBaseFsckReplication {
public static void setUp() throws Exception {
UTIL.getConfiguration().setBoolean("hbase.write.hbck1.lock.file", false);
UTIL.startMiniCluster(1);
UTIL.waitTableAvailable(TableName.valueOf("hbase:rsgroup"));
}

@AfterClass
Expand Down

0 comments on commit 4361b42

Please sign in to comment.