Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract HttpClientStatsTracker #76279

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -65,9 +63,6 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private static final Logger logger = LogManager.getLogger(AbstractHttpServerTransport.class);
private static final ActionListener<Void> NO_OP = ActionListener.wrap(() -> {});

private static final long PRUNE_THROTTLE_INTERVAL = TimeUnit.SECONDS.toMillis(60);
private static final long MAX_CLIENT_STATS_AGE = TimeUnit.MINUTES.toMillis(5);

protected final Settings settings;
public final HttpHandlingSettings handlingSettings;
protected final NetworkService networkService;
Expand All @@ -93,13 +88,11 @@ protected void closeInternal() {
}
};
private final Set<HttpServerChannel> httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Map<Integer, HttpStats.ClientStats> httpChannelStats = new ConcurrentHashMap<>();
private final HttpClientStatsTracker httpClientStatsTracker;

private final HttpTracer tracer;

private volatile long slowLogThresholdMs;
protected volatile long lastClientStatsPruneTime;
private volatile boolean clientStatsEnabled;

protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -128,8 +121,7 @@ protected AbstractHttpServerTransport(Settings settings, NetworkService networkS
clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING,
slowLogThreshold -> this.slowLogThresholdMs = slowLogThreshold.getMillis());
slowLogThresholdMs = TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings).getMillis();
clusterSettings.addSettingsUpdateConsumer(HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED, this::enableClientStats);
clientStatsEnabled = HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED.get(settings);
httpClientStatsTracker = new HttpClientStatsTracker(settings, clusterSettings, threadPool);
}

@Override
Expand All @@ -148,43 +140,12 @@ public HttpInfo info() {

@Override
public HttpStats stats() {
pruneClientStats(false);
return new HttpStats(new ArrayList<>(httpChannelStats.values()), httpChannels.size(), totalChannelsAccepted.get());
}

/**
* Prunes client stats of entries that have been disconnected for more than five minutes.
*
* @param throttled When true, executes the prune process only if more than 60 seconds has elapsed since the last execution.
*/
void pruneClientStats(boolean throttled) {
if (clientStatsEnabled && throttled == false ||
(threadPool.relativeTimeInMillis() - lastClientStatsPruneTime > PRUNE_THROTTLE_INTERVAL)) {
long nowMillis = threadPool.absoluteTimeInMillis();
for (var statsEntry : httpChannelStats.entrySet()) {
long closedTimeMillis = statsEntry.getValue().closedTimeMillis;
if (closedTimeMillis > 0 && (nowMillis - closedTimeMillis > MAX_CLIENT_STATS_AGE)) {
httpChannelStats.remove(statsEntry.getKey());
}
}
lastClientStatsPruneTime = threadPool.relativeTimeInMillis();
}
}

/**
* Enables or disables collection of HTTP client stats.
*/
void enableClientStats(boolean enabled) {
this.clientStatsEnabled = enabled;
if (enabled == false) {
// when disabling, immediately clear client stats
httpChannelStats.clear();
}
return new HttpStats(httpClientStatsTracker.getClientStats(), httpChannels.size(), totalChannelsAccepted.get());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getClientStats also does the prune.

}

protected void bindServer() {
// Bind and start to accept incoming connections.
InetAddress hostAddresses[];
final InetAddress[] hostAddresses;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but fixes the only warning left in this file.

try {
hostAddresses = networkService.resolveBindHostAddresses(bindHosts);
} catch (IOException e) {
Expand Down Expand Up @@ -352,47 +313,18 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) {
refCounted.decRef();
}));
totalChannelsAccepted.incrementAndGet();
addClientStats(httpChannel);
httpClientStatsTracker.addClientStats(httpChannel);
logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel));
}

private HttpStats.ClientStats addClientStats(final HttpChannel httpChannel) {
if (clientStatsEnabled) {
final HttpStats.ClientStats clientStats;
if (httpChannel != null) {
clientStats = new HttpStats.ClientStats(threadPool.absoluteTimeInMillis());
httpChannelStats.put(HttpStats.ClientStats.getChannelKey(httpChannel), clientStats);
httpChannel.addCloseListener(ActionListener.wrap(() -> {
try {
HttpStats.ClientStats disconnectedClientStats =
httpChannelStats.get(HttpStats.ClientStats.getChannelKey(httpChannel));
if (disconnectedClientStats != null) {
disconnectedClientStats.closedTimeMillis = threadPool.absoluteTimeInMillis();
}
} catch (Exception e) {
assert false : e;
// the listener code above should never throw
logger.trace("error removing HTTP channel listener", e);
}
}));
} else {
clientStats = null;
}
pruneClientStats(true);
return clientStats;
} else {
return null;
}
}

/**
* This method handles an incoming http request.
*
* @param httpRequest that is incoming
* @param httpChannel that received the http request
*/
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
updateClientStats(httpRequest, httpChannel);
httpClientStatsTracker.updateClientStats(httpRequest, httpChannel);
final long startTime = threadPool.relativeTimeInMillis();
try {
handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException());
Expand All @@ -406,61 +338,6 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt
}
}

void updateClientStats(final HttpRequest httpRequest, final HttpChannel httpChannel) {
if (clientStatsEnabled && httpChannel != null) {
HttpStats.ClientStats clientStats = httpChannelStats.get(HttpStats.ClientStats.getChannelKey(httpChannel));
if (clientStats == null) {
// will always return a non-null value when httpChannel is non-null
clientStats = addClientStats(httpChannel);
}

if (clientStats.agent == null) {
final String elasticProductOrigin = getFirstValueForHeader(httpRequest, "x-elastic-product-origin");
if (elasticProductOrigin != null) {
clientStats.agent = elasticProductOrigin;
} else {
final String userAgent = getFirstValueForHeader(httpRequest, "User-Agent");
if (userAgent != null) {
clientStats.agent = userAgent;
}
}
}
if (clientStats.localAddress == null) {
clientStats.localAddress =
httpChannel.getLocalAddress() == null ? null : NetworkAddress.format(httpChannel.getLocalAddress());
clientStats.remoteAddress =
httpChannel.getRemoteAddress() == null ? null : NetworkAddress.format(httpChannel.getRemoteAddress());
}
if (clientStats.forwardedFor == null) {
final String forwardedFor = getFirstValueForHeader(httpRequest, "x-forwarded-for");
if (forwardedFor != null) {
clientStats.forwardedFor = forwardedFor;
}
}
if (clientStats.opaqueId == null) {
final String opaqueId = getFirstValueForHeader(httpRequest, "x-opaque-id");
if (opaqueId != null) {
clientStats.opaqueId = opaqueId;
}
}
clientStats.lastRequestTimeMillis = threadPool.absoluteTimeInMillis();
clientStats.lastUri = httpRequest.uri();
clientStats.requestCount.increment();
clientStats.requestSizeBytes.add(httpRequest.content().length());
}
}

private static String getFirstValueForHeader(final HttpRequest request, final String header) {
for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
if (entry.getKey().equalsIgnoreCase(header)) {
if (entry.getValue().size() > 0) {
return entry.getValue().get(0);
}
}
}
return null;
}

// Visible for testing
void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {
final ThreadContext threadContext = threadPool.getThreadContext();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Tracks a collection of {@link org.elasticsearch.http.HttpStats.ClientStats} for current and recently-closed HTTP connections.
*/
public class HttpClientStatsTracker {

private static final Logger logger = LogManager.getLogger();

private static final long PRUNE_THROTTLE_INTERVAL = TimeUnit.SECONDS.toMillis(60);
private static final long MAX_CLIENT_STATS_AGE = TimeUnit.MINUTES.toMillis(5);

private final Map<Integer, HttpStats.ClientStats> httpChannelStats = new ConcurrentHashMap<>();
private final ThreadPool threadPool;

private volatile long lastClientStatsPruneTime;
private volatile boolean clientStatsEnabled;

HttpClientStatsTracker(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.threadPool = threadPool;
clientStatsEnabled = HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED, this::enableClientStats);
}

/**
* Prunes client stats of entries that have been disconnected for more than {@link #MAX_CLIENT_STATS_AGE} (i.e. 5 minutes).
*
* @param throttled When true, executes the prune process only if more than {@link #PRUNE_THROTTLE_INTERVAL} (i.e. 60 seconds) has
* elapsed since the last execution.
*/
private void pruneClientStats(boolean throttled) {
if (clientStatsEnabled && throttled == false ||
(threadPool.relativeTimeInMillis() - lastClientStatsPruneTime > PRUNE_THROTTLE_INTERVAL)) {
long nowMillis = threadPool.absoluteTimeInMillis();
for (var statsEntry : httpChannelStats.entrySet()) {
long closedTimeMillis = statsEntry.getValue().closedTimeMillis;
if (closedTimeMillis > 0 && (nowMillis - closedTimeMillis > MAX_CLIENT_STATS_AGE)) {
httpChannelStats.remove(statsEntry.getKey());
}
}
lastClientStatsPruneTime = threadPool.relativeTimeInMillis();
}
}

/**
* Enables or disables collection of HTTP client stats.
*/
private void enableClientStats(boolean enabled) {
this.clientStatsEnabled = enabled;
if (enabled == false) {
// when disabling, immediately clear client stats
httpChannelStats.clear();
}
}

/**
* Register the given channel with this tracker.
*
* @return the corresponding newly-created stats object, or {@code null} if disabled.
*/
HttpStats.ClientStats addClientStats(final HttpChannel httpChannel) {
if (clientStatsEnabled) {
final HttpStats.ClientStats clientStats;
if (httpChannel != null) {
clientStats = new HttpStats.ClientStats(threadPool.absoluteTimeInMillis());
httpChannelStats.put(getChannelKey(httpChannel), clientStats);
httpChannel.addCloseListener(ActionListener.wrap(() -> {
try {
HttpStats.ClientStats disconnectedClientStats =
httpChannelStats.get(getChannelKey(httpChannel));
if (disconnectedClientStats != null) {
disconnectedClientStats.closedTimeMillis = threadPool.absoluteTimeInMillis();
}
} catch (Exception e) {
assert false : e; // the listener code above should never throw
logger.warn("error removing HTTP channel listener", e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Promoted to WARN, as the comment says this should never happen but if it does then it'd be good to hear about it.

}
}));
} else {
clientStats = null;
}
pruneClientStats(true);
return clientStats;
} else {
return null;
}
}

private static String getFirstValueForHeader(final HttpRequest request, final String header) {
for (Map.Entry<String, List<String>> entry : request.getHeaders().entrySet()) {
if (entry.getKey().equalsIgnoreCase(header)) {
if (entry.getValue().size() > 0) {
return entry.getValue().get(0);
}
}
}
return null;
}

/**
* Adjust the stats for the given channel to reflect the latest request received.
*/
void updateClientStats(final HttpRequest httpRequest, final HttpChannel httpChannel) {
if (clientStatsEnabled && httpChannel != null) {
HttpStats.ClientStats clientStats = httpChannelStats.get(getChannelKey(httpChannel));
if (clientStats == null) {
// will always return a non-null value when httpChannel is non-null
clientStats = addClientStats(httpChannel);
}

if (clientStats.agent == null) {
final String elasticProductOrigin = getFirstValueForHeader(httpRequest, "x-elastic-product-origin");
if (elasticProductOrigin != null) {
clientStats.agent = elasticProductOrigin;
} else {
final String userAgent = getFirstValueForHeader(httpRequest, "User-Agent");
if (userAgent != null) {
clientStats.agent = userAgent;
}
}
}
if (clientStats.localAddress == null) {
clientStats.localAddress =
httpChannel.getLocalAddress() == null ? null : NetworkAddress.format(httpChannel.getLocalAddress());
clientStats.remoteAddress =
httpChannel.getRemoteAddress() == null ? null : NetworkAddress.format(httpChannel.getRemoteAddress());
}
if (clientStats.forwardedFor == null) {
final String forwardedFor = getFirstValueForHeader(httpRequest, "x-forwarded-for");
if (forwardedFor != null) {
clientStats.forwardedFor = forwardedFor;
}
}
if (clientStats.opaqueId == null) {
final String opaqueId = getFirstValueForHeader(httpRequest, "x-opaque-id");
if (opaqueId != null) {
clientStats.opaqueId = opaqueId;
}
}
clientStats.lastRequestTimeMillis = threadPool.absoluteTimeInMillis();
clientStats.lastUri = httpRequest.uri();
clientStats.requestCount.increment();
clientStats.requestSizeBytes.add(httpRequest.content().length());
}
}

/**
* @return a list of the stats for the channels that are currently being tracked.
*/
List<HttpStats.ClientStats> getClientStats() {
pruneClientStats(false);
return new ArrayList<>(httpChannelStats.values());
}

/**
* Returns a key suitable for use in a hash table for the specified HttpChannel
*/
private static int getChannelKey(HttpChannel channel) {
// always use an identity-based hash code rather than one based on object state
return System.identityHashCode(channel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated comment but isn't this broken? I guess it's approximately ok and that might be good enough for the stats, but shouldn't we have a collision safer key here for the stats map?

}

}
Loading