Skip to content

Commit

Permalink
HBASE-27466: Making metrics instance containing one or more connectio…
Browse files Browse the repository at this point in the history
…ns. (#4874)

Signed-off-by: David Manning <[email protected]>
Signed-off-by: Viraj Jasani <[email protected]>
  • Loading branch information
vli02 authored Dec 2, 2022
1 parent 8a35f0a commit 320eca2
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public class AsyncConnectionImpl implements AsyncConnection {

private final AtomicBoolean closed = new AtomicBoolean(false);

private final String metricsScope;
private final Optional<MetricsConnection> metrics;

private final ClusterStatusListener clusterStatusListener;
Expand All @@ -128,15 +129,16 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
SocketAddress localAddress, User user) {
this.conf = conf;
this.user = user;
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);

if (user.isLoginFromKeytab()) {
spawnRenewalChore(user.getUGI());
}
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
String scope = MetricsConnection.getScope(conf, clusterId, this);
this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
this.metrics =
Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
} else {
this.metrics = Optional.empty();
}
Expand Down Expand Up @@ -235,7 +237,9 @@ public void close() {
choreService = null;
}
}
metrics.ifPresent(MetricsConnection::shutdown);
if (metrics.isPresent()) {
MetricsConnection.deleteMetricsConnection(metricsScope);
}
ConnectionOverAsyncConnection c = this.conn;
if (c != null) {
c.closePool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand All @@ -47,12 +49,43 @@
/**
* This class is for maintaining the various connection statistics and publishing them through the
* metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
* as to not conflict with other uses of Yammer Metrics within the client application. Instantiating
* this class implicitly creates and "starts" instances of these classes; be sure to call
* {@link #shutdown()} to terminate the thread pools they allocate.
* as to not conflict with other uses of Yammer Metrics within the client application. Calling
* {@link #getMetricsConnection(String, Supplier, Supplier)} implicitly creates and "starts"
* instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to terminate
* the thread pools they allocate. The metrics reporter will be shutdown {@link #shutdown()} when
* all connections within this metrics instances are closed.
*/
@InterfaceAudience.Private
public class MetricsConnection implements StatisticTrackable {
public final class MetricsConnection implements StatisticTrackable {

private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
new ConcurrentHashMap<>();

static MetricsConnection getMetricsConnection(final String scope,
Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
if (metricsConnection == null) {
MetricsConnection newMetricsConn = new MetricsConnection(scope, batchPool, metaPool);
newMetricsConn.incrConnectionCount();
return newMetricsConn;
} else {
metricsConnection.addThreadPools(batchPool, metaPool);
metricsConnection.incrConnectionCount();
return metricsConnection;
}
});
}

static void deleteMetricsConnection(final String scope) {
METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
metricsConnection.decrConnectionCount();
if (metricsConnection.getConnectionCount() == 0) {
metricsConnection.shutdown();
return null;
}
return metricsConnection;
});
}

/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
Expand Down Expand Up @@ -231,7 +264,7 @@ public void updateDelayInterval(long interval) {
}
}

protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
new ConcurrentHashMap<>();

public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
Expand Down Expand Up @@ -272,7 +305,7 @@ private static interface NewMetric<T> {

private final MetricRegistry registry;
private final JmxReporter reporter;
protected final String scope;
private final String scope;

private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
@Override
Expand All @@ -295,66 +328,93 @@ public Counter newMetric(Class<?> clazz, String name, String scope) {
}
};

// List of thread pool per connection of the metrics.
private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();

// static metrics

protected final Counter metaCacheHits;
protected final Counter metaCacheMisses;
protected final CallTracker getTracker;
protected final CallTracker scanTracker;
protected final CallTracker appendTracker;
protected final CallTracker deleteTracker;
protected final CallTracker incrementTracker;
protected final CallTracker putTracker;
protected final CallTracker multiTracker;
protected final RunnerStats runnerStats;
protected final Counter metaCacheNumClearServer;
protected final Counter metaCacheNumClearRegion;
protected final Counter hedgedReadOps;
protected final Counter hedgedReadWin;
protected final Histogram concurrentCallsPerServerHist;
protected final Histogram numActionsPerServerHist;
protected final Counter nsLookups;
protected final Counter nsLookupsFailed;
protected final Timer overloadedBackoffTimer;
private final Counter connectionCount;
private final Counter metaCacheHits;
private final Counter metaCacheMisses;
private final CallTracker getTracker;
private final CallTracker scanTracker;
private final CallTracker appendTracker;
private final CallTracker deleteTracker;
private final CallTracker incrementTracker;
private final CallTracker putTracker;
private final CallTracker multiTracker;
private final RunnerStats runnerStats;
private final Counter metaCacheNumClearServer;
private final Counter metaCacheNumClearRegion;
private final Counter hedgedReadOps;
private final Counter hedgedReadWin;
private final Histogram concurrentCallsPerServerHist;
private final Histogram numActionsPerServerHist;
private final Counter nsLookups;
private final Counter nsLookupsFailed;
private final Timer overloadedBackoffTimer;

// dynamic metrics

// These maps are used to cache references to the metric instances that are managed by the
// registry. I don't think their use perfectly removes redundant allocations, but it's
// a big improvement over calling registry.newMetric each time.
protected final ConcurrentMap<String, Timer> rpcTimers =
private final ConcurrentMap<String, Timer> rpcTimers =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
protected final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
protected final ConcurrentMap<String, Counter> rpcCounters =
private final ConcurrentMap<String, Counter> rpcCounters =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);

MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
private MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
this.scope = scope;
addThreadPools(batchPool, metaPool);
this.registry = new MetricRegistry();
this.registry.register(getExecutorPoolName(), new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor pool = batchPool.get();
if (pool == null) {
return Ratio.of(0, 0);
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among batch pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
return Ratio.of(numerator, denominator);
}
});
this.registry.register(getMetaPoolName(), new RatioGauge() {
@Override
protected Ratio getRatio() {
ThreadPoolExecutor pool = metaPool.get();
if (pool == null) {
return Ratio.of(0, 0);
int numerator = 0;
int denominator = 0;
for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
ThreadPoolExecutor pool = poolSupplier.get();
if (pool != null) {
int activeCount = pool.getActiveCount();
int maxPoolSize = pool.getMaximumPoolSize();
/* The max thread usage ratio among meta lookup pools of all connections */
if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
numerator = activeCount;
denominator = maxPoolSize;
}
}
}
return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
return Ratio.of(numerator, denominator);
}
});
this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
this.metaCacheNumClearServer =
Expand Down Expand Up @@ -396,8 +456,84 @@ MetricRegistry getMetricRegistry() {
return registry;
}

public void shutdown() {
this.reporter.stop();
/** scope of the metrics object */
public String getMetricScope() {
return scope;
}

/** serverStats metric */
public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
return serverStats;
}

/** runnerStats metric */
public RunnerStats getRunnerStats() {
return runnerStats;
}

/** metaCacheNumClearServer metric */
public Counter getMetaCacheNumClearServer() {
return metaCacheNumClearServer;
}

/** metaCacheNumClearRegion metric */
public Counter getMetaCacheNumClearRegion() {
return metaCacheNumClearRegion;
}

/** hedgedReadOps metric */
public Counter getHedgedReadOps() {
return hedgedReadOps;
}

/** hedgedReadWin metric */
public Counter getHedgedReadWin() {
return hedgedReadWin;
}

/** numActionsPerServerHist metric */
public Histogram getNumActionsPerServerHist() {
return numActionsPerServerHist;
}

/** rpcCounters metric */
public ConcurrentMap<String, Counter> getRpcCounters() {
return rpcCounters;
}

/** getTracker metric */
public CallTracker getGetTracker() {
return getTracker;
}

/** scanTracker metric */
public CallTracker getScanTracker() {
return scanTracker;
}

/** multiTracker metric */
public CallTracker getMultiTracker() {
return multiTracker;
}

/** appendTracker metric */
public CallTracker getAppendTracker() {
return appendTracker;
}

/** deleteTracker metric */
public CallTracker getDeleteTracker() {
return deleteTracker;
}

/** incrementTracker metric */
public CallTracker getIncrementTracker() {
return incrementTracker;
}

/** putTracker metric */
public CallTracker getPutTracker() {
return putTracker;
}

/** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
Expand Down Expand Up @@ -457,6 +593,28 @@ public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
overloadedBackoffTimer.update(time, timeUnit);
}

/** Return the connection count of the metrics within a scope */
public long getConnectionCount() {
return connectionCount.getCount();
}

/** Increment the connection count of the metrics within a scope */
private void incrConnectionCount() {
connectionCount.inc();
}

/** Decrement the connection count of the metrics within a scope */
private void decrConnectionCount() {
connectionCount.dec();
}

/** Add thread pools of additional connections to the metrics */
private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
batchPools.add(batchPool);
metaPools.add(metaPool);
}

/**
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/
Expand All @@ -474,6 +632,10 @@ private void updateRpcGeneric(String methodName, CallStats stats) {
.update(stats.getResponseSizeBytes());
}

private void shutdown() {
this.reporter.stop();
}

/** Report RPC context to metrics system. */
public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
int callsPerServer = stats.getConcurrentCallsPerServer();
Expand Down
Loading

0 comments on commit 320eca2

Please sign in to comment.