Skip to content

Commit

Permalink
HBASE-23647: Make MasterRegistry the default impl. (apache#1039)
Browse files Browse the repository at this point in the history
Signed-off-by: Stack <[email protected]>
Signed-off-by: Nick Dimiduk <[email protected]>
Signed-off-by: Andrew Purtell <[email protected]>
(cherry picked from commit 229b8aa)
  • Loading branch information
bharathv committed Feb 26, 2020
1 parent 81c246b commit 60998ea
Show file tree
Hide file tree
Showing 54 changed files with 559 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private ConnectionRegistryFactory() {
*/
static ConnectionRegistry getRegistry(Configuration conf) {
Class<? extends ConnectionRegistry> clazz = conf.getClass(
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class,
ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class MasterRegistry implements ConnectionRegistry {
} else {
finalConf = conf;
}
finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT));
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
masterServers = new HashSet<>();
Expand Down Expand Up @@ -146,12 +147,13 @@ private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
if (rpcResult == null) {
future.completeExceptionally(
new MasterRegistryFetchException(masterServers, hrc.getFailed()));
return;
}
if (!isValidResp.test(rpcResult)) {
// Rpc returned ok, but result was malformed.
future.completeExceptionally(new IOException(
String.format("Invalid result for request %s. Will be retried", debug)));

return;
}
future.complete(transformResult.apply(rpcResult));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,6 @@ public static Configuration getPeerClusterConfiguration(Configuration conf,
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}

return otherConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.HbckService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
// new Service will not be found when all is Kerberized!!!!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public static Configuration createClusterConf(Configuration baseConf, String clu
* @return the merged configuration with override properties and cluster key applied
*/
public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
String overridePrefix) throws IOException {
String overridePrefix) throws IOException {
Configuration clusterConf = HBaseConfiguration.create(baseConf);
if (clusterKey != null && !clusterKey.isEmpty()) {
applyClusterKeyToConf(clusterConf, clusterKey);
Expand All @@ -294,14 +294,21 @@ public static Configuration createClusterConf(Configuration baseConf, String clu
* used to communicate with distant clusters
* @param conf configuration object to configure
* @param key string that contains the 3 required configuratins
* @throws IOException
*/
private static void applyClusterKeyToConf(Configuration conf, String key)
throws IOException{
throws IOException {
ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
// Without the right registry, the above configs are useless. Also, we don't use setClass()
// here because the ConnectionRegistry* classes are not resolvable from this module.
// This will be broken if ZkConnectionRegistry class gets renamed or moved. Is there a better
// way?
LOG.info("Overriding client registry implementation to {}",
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public enum OperationStatusCode {

public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;

/** Full class name of the Zookeeper based connection registry implementation */
public static final String ZK_CONNECTION_REGISTRY_CLASS =
"org.apache.hadoop.hbase.client.ZKConnectionRegistry";

/** Configuration to enable hedged reads on master registry **/
public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
"hbase.client.master_registry.enable_hedged_reads";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testSyncUpTool() throws Exception {
private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
throws Exception {
LOG.debug("mimicSyncUpAfterBulkLoad");
UTIL2.shutdownMiniHBaseCluster();
shutDownTargetHBaseCluster();

loadAndReplicateHFiles(false, randomHFileRangeListIterator);

Expand All @@ -126,8 +126,8 @@ private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListItera
assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
rowCount_ht2Source);

UTIL1.shutdownMiniHBaseCluster();
UTIL2.restartHBaseCluster(1);
shutDownSourceHBaseCluster();
restartTargetHBaseCluster(1);

Thread.sleep(SLEEP_TIME);

Expand All @@ -148,7 +148,7 @@ private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListItera
if (i == NB_RETRIES - 1) {
if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
// syncUP still failed. Let's look at the source in case anything wrong there
UTIL1.restartHBaseCluster(1);
restartSourceHBaseCluster(1);
rowCount_ht1Source = UTIL1.countRows(ht1Source);
LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = UTIL1.countRows(ht2Source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public LocalHBaseCluster(final Configuration conf)
*/
public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
throws IOException {
this(conf, 1, noRegionServers, getMasterImplementation(conf),
this(conf, 1, 0, noRegionServers, getMasterImplementation(conf),
getRegionServerImplementation(conf));
}

Expand All @@ -106,7 +106,7 @@ public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noRegionServers)
throws IOException {
this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf),
getRegionServerImplementation(conf));
}

Expand All @@ -122,6 +122,12 @@ private static Class<? extends HMaster> getMasterImplementation(final Configurat
HMaster.class);
}

public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers,
final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass) throws IOException {
this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass);
}

/**
* Constructor.
* @param conf Configuration to use. Post construction has the master's
Expand All @@ -134,9 +140,9 @@ private static Class<? extends HMaster> getMasterImplementation(final Configurat
*/
@SuppressWarnings("unchecked")
public LocalHBaseCluster(final Configuration conf, final int noMasters,
final int noRegionServers, final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass)
throws IOException {
final int noAlwaysStandByMasters, final int noRegionServers,
final Class<? extends HMaster> masterClass,
final Class<? extends HRegionServer> regionServerClass) throws IOException {
this.conf = conf;

// When active, if a port selection is default then we switch to random
Expand Down Expand Up @@ -170,24 +176,22 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters,
this.masterClass = (Class<? extends HMaster>)
conf.getClass(HConstants.MASTER_IMPL, masterClass);
// Start the HMasters.
for (int i = 0; i < noMasters; i++) {
int i;
for (i = 0; i < noMasters; i++) {
addMaster(new Configuration(conf), i);
}

// Populate the master address host ports in the config. This is needed if a master based
// registry is configured for client metadata services (HBASE-18095)
List<String> masterHostPorts = new ArrayList<>();
getMasters().forEach(masterThread ->
masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString()));
conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));

for (int j = 0; j < noAlwaysStandByMasters; j++) {
Configuration c = new Configuration(conf);
c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster");
addMaster(c, i + j);
}
// Start the HRegionServers.
this.regionServerClass =
(Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
regionServerClass);

for (int i = 0; i < noRegionServers; i++) {
addRegionServer(new Configuration(conf), i);
for (int j = 0; j < noRegionServers; j++) {
addRegionServer(new Configuration(conf), j);
}
}

Expand Down Expand Up @@ -233,8 +237,13 @@ public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
// its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager.
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
(Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
(Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
this.masterThreads.add(mt);
// Refresh the master address config.
List<String> masterHostPorts = new ArrayList<>();
getMasters().forEach(masterThread ->
masterHostPorts.add(masterThread.getMaster().getServerName().getAddress().toString()));
conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
return mt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public class ActiveMasterManager extends ZKListener {
final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
final AtomicBoolean clusterShutDown = new AtomicBoolean(false);

// This server's information.
private final ServerName sn;
private int infoPort;
private final Server master;
// This server's information. Package-private for child implementations.
int infoPort;
final ServerName sn;
final Server master;

// Active master's server name. Invalidated anytime active master changes (based on ZK
// notifications) and lazily fetched on-demand.
// ServerName is immutable, so we don't need heavy synchronization around it.
private volatile ServerName activeMasterServerName;
volatile ServerName activeMasterServerName;

/**
* @param watcher ZK watcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ public HMaster(final Configuration conf) throws IOException {
// Some unit tests don't need a cluster, so no zookeeper at all
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
} else {
this.metaRegionLocationCache = null;
this.activeMasterManager = null;
Expand All @@ -582,6 +582,15 @@ public HMaster(final Configuration conf) throws IOException {
}
}

/**
* Protected to have custom implementations in tests override the default ActiveMaster
* implementation.
*/
protected ActiveMasterManager createActiveMasterManager(
ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
return new ActiveMasterManager(zk, sn, server);
}

@Override
protected String getUseThisHostnameInstead(Configuration conf) {
return conf.get(MASTER_HOSTNAME_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,15 +799,17 @@ public boolean registerService(com.google.protobuf.Service instance) {
return true;
}

/**
* Create a 'smarter' Connection, one that is capable of by-passing RPC if the request is to
* the local server; i.e. a short-circuit Connection. Safe to use going to local or remote
* server. Create this instance in a method can be intercepted and mocked in tests.
* @throws IOException
*/
@VisibleForTesting
protected ClusterConnection createClusterConnection() throws IOException {
Configuration conf = this.conf;
// We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
// - Decouples RS and master life cycles. RegionServers can continue be up independent of
// masters' availability.
// - Configuration management for region servers (cluster internal) is much simpler when adding
// new masters or removing existing masters, since only clients' config needs to be updated.
// - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
// other internal connections too.
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
// Use server ZK cluster for server-issued connections, so we clone
// the conf and unset the client ZK related properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class HBasePolicyProvider extends PolicyProvider {
new Service("security.client.protocol.acl", AdminService.BlockingInterface.class),
new Service("security.client.protocol.acl",
MasterProtos.HbckService.BlockingInterface.class),
new Service("security.client.protocol.acl",
MasterProtos.ClientMetaService.BlockingInterface.class),
new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class),
new Service("security.masterregion.protocol.acl",
RegionServerStatusService.BlockingInterface.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import org.slf4j.impl.Log4jLoggerAdapter;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;

/**
Expand Down Expand Up @@ -1142,8 +1143,9 @@ public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option)
Configuration c = new Configuration(this.conf);
TraceUtil.initTracer(c);
this.hbaseCluster =
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY,
c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
Expand Down Expand Up @@ -1259,6 +1261,7 @@ public void restartHBaseCluster(int servers, List<Integer> ports)
StartMiniClusterOption option =
StartMiniClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
restartHBaseCluster(option);
invalidateConnection();
}

public void restartHBaseCluster(StartMiniClusterOption option)
Expand All @@ -1272,8 +1275,9 @@ public void restartHBaseCluster(StartMiniClusterOption option)
this.connection = null;
}
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
option.getRsClass());
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);
Expand Down Expand Up @@ -3096,9 +3100,34 @@ public HBaseCluster getHBaseClusterInterface() {
return hbaseCluster;
}

public void closeConnection() throws IOException {
Closeables.close(hbaseAdmin, true);
Closeables.close(connection, true);
this.hbaseAdmin = null;
this.connection = null;
}

/**
* Resets the connections so that the next time getConnection() is called, a new connection is
* created. This is needed in cases where the entire cluster / all the masters are shutdown and
* the connection is not valid anymore.
* TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
* written, not all start() stop() calls go through this class. Most tests directly operate on
* the underlying mini/local hbase cluster. That makes it difficult for this wrapper class to
* maintain the connection state automatically. Cleaning this is a much bigger refactor.
*/
public void invalidateConnection() throws IOException {
closeConnection();
// Update the master addresses if they changed.
final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
final String masterConfAfter = getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY);
LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
masterConfigBefore, masterConfAfter);
conf.set(HConstants.MASTER_ADDRS_KEY,
getMiniHBaseCluster().conf.get(HConstants.MASTER_ADDRS_KEY));
}

/**
* Get a Connection to the cluster.
* Not thread-safe (This class needs a lot of work to make it thread-safe).
* @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
* @throws IOException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public MiniClusterRule(final StartMiniClusterOption miniClusterOptions) {
this.miniClusterOptions = miniClusterOptions;
}

/**
* @return the underlying instance of {@link HBaseTestingUtility}
*/
public HBaseTestingUtility getTestingUtility() {
return testingUtility;
}

/**
* Create a {@link AsyncConnection} to the managed {@link MiniHBaseCluster}. It's up to the caller
* to {@link AsyncConnection#close() close()} the connection when finished.
Expand Down
Loading

0 comments on commit 60998ea

Please sign in to comment.