Skip to content

Commit

Permalink
Merge branch 'HBASE-26067' into HBASE-26079
Browse files Browse the repository at this point in the history
  • Loading branch information
wchevreuil authored Aug 25, 2021
2 parents a66cecc + 1848381 commit e92f501
Show file tree
Hide file tree
Showing 30 changed files with 556 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
private final RegistryEndpointsRefresher registryEndpointRefresher;

protected AbstractRpcBasedConnectionRegistry(Configuration conf,
String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
String minRefreshIntervalSecsConfigName) throws IOException {
String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
throws IOException {
this.hedgedReadFanOut =
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
Expand All @@ -103,8 +104,9 @@ protected AbstractRpcBasedConnectionRegistry(Configuration conf,
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
populateStubs(getBootstrapNodes(conf));
// could return null here is refresh interval is less than zero
registryEndpointRefresher = RegistryEndpointsRefresher.create(conf,
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
registryEndpointRefresher =
RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
}

protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
"hbase.client.master_registry.hedged.fanout";

public static final String MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS =
"hbase.client.master_registry.initial_refresh_delay_secs";

public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.master_registry.refresh_interval_secs";

Expand All @@ -85,7 +88,7 @@ public static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unknow
}

MasterRegistry(Configuration conf) throws IOException {
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ final class RegistryEndpointsRefresher {

private final Thread thread;
private final Refresher refresher;
private final long initialDelayMs;
private final long periodicRefreshMs;
private final long minTimeBetweenRefreshesMs;

Expand All @@ -56,9 +57,20 @@ synchronized void stop() {
notifyAll();
}

private long getRefreshIntervalMs(boolean firstRefresh) {
if (refreshNow) {
return minTimeBetweenRefreshesMs;
}
if (firstRefresh) {
return initialDelayMs;
}
return periodicRefreshMs;
}

// The main loop for the refresh thread.
private void mainLoop() {
long lastRefreshTime = EnvironmentEdgeManager.currentTime();
boolean firstRefresh = true;
for (;;) {
synchronized (this) {
for (;;) {
Expand All @@ -68,9 +80,12 @@ private void mainLoop() {
}
// if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
// otherwise wait until periodicRefreshMs elapsed
long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
long waitTime = getRefreshIntervalMs(firstRefresh) -
(EnvironmentEdgeManager.currentTime() - lastRefreshTime);
if (waitTime <= 0) {
// we are going to refresh, reset this flag
firstRefresh = false;
refreshNow = false;
break;
}
try {
Expand All @@ -81,8 +96,6 @@ private void mainLoop() {
continue;
}
}
// we are going to refresh, reset this flag
refreshNow = false;
}
LOG.debug("Attempting to refresh registry end points");
try {
Expand All @@ -104,8 +117,9 @@ public interface Refresher {
void refresh() throws IOException;
}

private RegistryEndpointsRefresher(long periodicRefreshMs, long minTimeBetweenRefreshesMs,
Refresher refresher) {
private RegistryEndpointsRefresher(long initialDelayMs, long periodicRefreshMs,
long minTimeBetweenRefreshesMs, Refresher refresher) {
this.initialDelayMs = initialDelayMs;
this.periodicRefreshMs = periodicRefreshMs;
this.minTimeBetweenRefreshesMs = minTimeBetweenRefreshesMs;
this.refresher = refresher;
Expand All @@ -129,16 +143,19 @@ synchronized void refreshNow() {
* {@code intervalSecsConfigName} is less than zero, will return null here, which means disable
* refreshing of endpoints.
*/
static RegistryEndpointsRefresher create(Configuration conf, String intervalSecsConfigName,
String minIntervalSecsConfigName, Refresher refresher) {
static RegistryEndpointsRefresher create(Configuration conf, String initialDelaySecsConfigName,
String intervalSecsConfigName, String minIntervalSecsConfigName, Refresher refresher) {
long periodicRefreshMs = TimeUnit.SECONDS
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
if (periodicRefreshMs <= 0) {
return null;
}
long initialDelayMs = Math.max(1,
TimeUnit.SECONDS.toMillis(conf.getLong(initialDelaySecsConfigName, periodicRefreshMs / 10)));
long minTimeBetweenRefreshesMs = TimeUnit.SECONDS
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
return new RegistryEndpointsRefresher(periodicRefreshMs, minTimeBetweenRefreshesMs, refresher);
return new RegistryEndpointsRefresher(initialDelayMs, periodicRefreshMs,
minTimeBetweenRefreshesMs, refresher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
/** Configuration key that controls the fan out of requests **/
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";

/**
* As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is
* possible that different end users will configure the same machine which makes the machine over
* load. So we should have a shorter delay for the initial refresh, to let users quickly switch to
* the bootstrap nodes we want them to connect to.
* <p/>
* The default value for initial refresh delay is 1/10 of periodic refresh interval.
*/
public static final String INITIAL_REFRESH_DELAY_SECS =
"hbase.client.bootstrap.initial_refresh_delay_secs";

public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.bootstrap.refresh_interval_secs";

Expand All @@ -62,7 +73,8 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
private static final char ADDRS_CONF_SEPARATOR = ',';

RpcConnectionRegistry(Configuration conf) throws IOException {
super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES);
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
MIN_SECS_BETWEEN_REFRESHES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -46,6 +46,8 @@ public class TestRegistryEndpointsRefresher {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);

private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
"hbase.test.registry.initial.delay.secs";
private static final String INTERVAL_SECS_CONFIG_NAME =
"hbase.test.registry.refresh.interval.secs";
private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
Expand Down Expand Up @@ -75,33 +77,45 @@ private void refresh() {
callTimestamps.add(EnvironmentEdgeManager.currentTime());
}

private void createRefresher(long intervalSecs, long minIntervalSecs) {
private void createRefresher(long initialDelaySecs, long intervalSecs, long minIntervalSecs) {
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, initialDelaySecs);
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
refresher = RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
refresher = RegistryEndpointsRefresher.create(conf, INITIAL_DELAY_SECS_CONFIG_NAME,
INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
}

@Test
public void testDisableRefresh() {
conf.setLong(INTERVAL_SECS_CONFIG_NAME, -1);
assertNull(RegistryEndpointsRefresher.create(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
INTERVAL_SECS_CONFIG_NAME, MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh));
}

@Test
public void testPeriodicEndpointRefresh() throws IOException {
public void testInitialDelay() throws InterruptedException {
createRefresher(1, 10, 0);
// Wait for 2 seconds to see that at least 1 refresh have been made since the initial delay is 1
// seconds
Waiter.waitFor(conf, 2000, () -> refreshCallCounter.get() == 1);
// Sleep more 5 seconds to make sure we have not made new calls since the interval is 10 seconds
Thread.sleep(5000);
assertEquals(1, refreshCallCounter.get());
}

@Test
public void testPeriodicMasterEndPointRefresh() {
// Refresh every 1 second.
createRefresher(1, 0);
createRefresher(1, 1, 0);
// Wait for > 3 seconds to see that at least 3 refresh have been made.
Waiter.waitFor(conf, 5000, () -> refreshCallCounter.get() > 3);
}

@Test
public void testDurationBetweenRefreshes() throws IOException {
public void testDurationBetweenRefreshes() {
// Disable periodic refresh
// A minimum duration of 1s between refreshes
createRefresher(Integer.MAX_VALUE, 1);
createRefresher(Integer.MAX_VALUE, Integer.MAX_VALUE, 1);
// Issue a ton of manual refreshes.
for (int i = 0; i < 10000; i++) {
refresher.refreshNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class TestRpcBasedRegistryHedgedReads {
private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);

private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
"hbase.test.refresh.initial.delay.secs";
private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
"hbase.test.refresh.interval.secs";
private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
Expand Down Expand Up @@ -153,7 +155,8 @@ private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOE
Configuration conf = UTIL.getConfiguration();
conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME,
MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {

@Override
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
Expand All @@ -173,6 +176,7 @@ public static void setUpBeforeClass() {
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
RpcClient.class);
// disable refresh, we do not need to refresh in this test
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
BOOTSTRAP_NODES = IntStream.range(0, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,19 @@ java.util.*;
org.apache.hadoop.hbase.ServerName;
org.apache.hadoop.hbase.ClusterMetrics;
org.apache.hadoop.hbase.master.HMaster;
org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
</%import>
<%java>
MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
</%java>

<%if (!master.isActiveMaster()) %>
<%java>
ServerName active_master =
(masterAddressTracker == null) ? null : masterAddressTracker.getMasterAddress();
assert active_master != null : "Failed to retrieve master's ServerName!";
int infoPort = (masterAddressTracker == null) ? 0 : masterAddressTracker.getMasterInfoPort();
ServerName active_master = master.getActiveMaster().orElse(null);
assert active_master != null : "Failed to retrieve active master's ServerName!";
int activeInfoPort = active_master == null ? 0 : master.getActiveMasterInfoPort();
</%java>
<div class="row inner_header">
<div class="page-header">
<h1>Backup Master <small><% master.getServerName().getHostname() %></small></h1>
</div>
</div>
<h4>Current Active Master: <a href="//<% active_master.getHostname() %>:<% infoPort %>/master-status"
<h4>Current Active Master: <a href="//<% active_master.getHostname() %>:<% activeInfoPort %>/master-status"
target="_blank"><% active_master.getHostname() %></a><h4>
<%else>
<h2>Backup Masters</h2>
Expand All @@ -54,13 +48,11 @@ MasterAddressTracker masterAddressTracker = master.getMasterAddressTracker();
<th>Start Time</th>
</tr>
<%java>
Collection<ServerName> backup_masters = master.getClusterMetricsWithoutCoprocessor(
EnumSet.of(ClusterMetrics.Option.BACKUP_MASTERS)).getBackupMasterNames();
Collection<ServerName> backup_masters = master.getBackupMasters();
ServerName [] backupServerNames = backup_masters.toArray(new ServerName[backup_masters.size()]);
Arrays.sort(backupServerNames);
for (ServerName serverName : backupServerNames) {
int infoPort = (masterAddressTracker == null) ? 0 : masterAddressTracker
.getBackupMasterInfoPort(serverName);
int infoPort = master.getBackupMasterInfoPort(serverName);
</%java>
<tr>
<td><a href="//<% serverName.getHostname() %>:<% infoPort %>/master-status"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.hadoop.hbase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
Expand Down Expand Up @@ -203,19 +203,19 @@ private void updateMetaLocation(String path, ZNodeOpType opType) {
* @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
*
*/
public Optional<List<HRegionLocation>> getMetaRegionLocations() {
public List<HRegionLocation> getMetaRegionLocations() {
ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
if (snapshot.isEmpty()) {
// This could be possible if the master has not successfully initialized yet or meta region
// is stuck in some weird state.
return Optional.empty();
return Collections.emptyList();
}
List<HRegionLocation> result = new ArrayList<>();
// Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
// ArrayValueCollection does not implement toArray().
snapshot.values().forEach(location -> result.add(location));
return Optional.of(result);
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand Down Expand Up @@ -71,15 +69,13 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration
}

/**
* Create a new {@link AsyncClusterConnection} instance for a region server.
* Create a new {@link AsyncClusterConnection} instance to be used at server side where we have a
* {@link ConnectionRegistryEndpoint}.
*/
public static AsyncClusterConnection createAsyncClusterConnection(HRegionServer regionServer)
public static AsyncClusterConnection createAsyncClusterConnection(
ConnectionRegistryEndpoint endpoint, Configuration conf, SocketAddress localAddress, User user)
throws IOException {
RegionServerRegistry registry = new RegionServerRegistry(regionServer);
Configuration conf = regionServer.getConfiguration();
InetSocketAddress localAddress =
new InetSocketAddress(regionServer.getRSRpcServices().getSocketAddress().getAddress(), 0);
User user = regionServer.getUserProvider().getCurrent();
ShortCircuitConnectionRegistry registry = new ShortCircuitConnectionRegistry(endpoint);
return createAsyncClusterConnection(conf, registry, localAddress, user);
}
}
Loading

0 comments on commit e92f501

Please sign in to comment.