From 13e0a8ce0a4d617e42dbbf1ad0f2769b9665265f Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 10 Aug 2021 11:19:37 +0100 Subject: [PATCH] Extract HttpClientStatsTracker Extracts the functionality in `AbstractHttpServerTransport` that keeps track of the HTTP client stats so that it can be more easily tested in isolation. --- .../http/AbstractHttpServerTransport.java | 135 +------------ .../http/HttpClientStatsTracker.java | 185 ++++++++++++++++++ .../org/elasticsearch/http/HttpStats.java | 8 - 3 files changed, 191 insertions(+), 137 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 4ab35b2a80bb0..dc15232b184a5 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -48,10 +48,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -65,9 +63,6 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo private static final Logger logger = LogManager.getLogger(AbstractHttpServerTransport.class); private static final ActionListener NO_OP = ActionListener.wrap(() -> {}); - private static final long PRUNE_THROTTLE_INTERVAL = TimeUnit.SECONDS.toMillis(60); - private static final long MAX_CLIENT_STATS_AGE = TimeUnit.MINUTES.toMillis(5); - protected final Settings settings; public final HttpHandlingSettings handlingSettings; protected final NetworkService networkService; @@ -93,13 +88,11 @@ protected void closeInternal() { } }; private final Set httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Map httpChannelStats = new ConcurrentHashMap<>(); + private final HttpClientStatsTracker httpClientStatsTracker; private final HttpTracer tracer; private volatile long slowLogThresholdMs; - protected volatile long lastClientStatsPruneTime; - private volatile boolean clientStatsEnabled; protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings) { @@ -128,8 +121,7 @@ protected AbstractHttpServerTransport(Settings settings, NetworkService networkS clusterSettings.addSettingsUpdateConsumer(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING, slowLogThreshold -> this.slowLogThresholdMs = slowLogThreshold.getMillis()); slowLogThresholdMs = TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings).getMillis(); - clusterSettings.addSettingsUpdateConsumer(HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED, this::enableClientStats); - clientStatsEnabled = HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED.get(settings); + httpClientStatsTracker = new HttpClientStatsTracker(settings, clusterSettings, threadPool); } @Override @@ -148,43 +140,12 @@ public HttpInfo info() { @Override public HttpStats stats() { - pruneClientStats(false); - return new HttpStats(new ArrayList<>(httpChannelStats.values()), httpChannels.size(), totalChannelsAccepted.get()); - } - - /** - * Prunes client stats of entries that have been disconnected for more than five minutes. - * - * @param throttled When true, executes the prune process only if more than 60 seconds has elapsed since the last execution. - */ - void pruneClientStats(boolean throttled) { - if (clientStatsEnabled && throttled == false || - (threadPool.relativeTimeInMillis() - lastClientStatsPruneTime > PRUNE_THROTTLE_INTERVAL)) { - long nowMillis = threadPool.absoluteTimeInMillis(); - for (var statsEntry : httpChannelStats.entrySet()) { - long closedTimeMillis = statsEntry.getValue().closedTimeMillis; - if (closedTimeMillis > 0 && (nowMillis - closedTimeMillis > MAX_CLIENT_STATS_AGE)) { - httpChannelStats.remove(statsEntry.getKey()); - } - } - lastClientStatsPruneTime = threadPool.relativeTimeInMillis(); - } - } - - /** - * Enables or disables collection of HTTP client stats. - */ - void enableClientStats(boolean enabled) { - this.clientStatsEnabled = enabled; - if (enabled == false) { - // when disabling, immediately clear client stats - httpChannelStats.clear(); - } + return new HttpStats(httpClientStatsTracker.getClientStats(), httpChannels.size(), totalChannelsAccepted.get()); } protected void bindServer() { // Bind and start to accept incoming connections. - InetAddress hostAddresses[]; + final InetAddress[] hostAddresses; try { hostAddresses = networkService.resolveBindHostAddresses(bindHosts); } catch (IOException e) { @@ -352,39 +313,10 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { refCounted.decRef(); })); totalChannelsAccepted.incrementAndGet(); - addClientStats(httpChannel); + httpClientStatsTracker.addClientStats(httpChannel); logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel)); } - private HttpStats.ClientStats addClientStats(final HttpChannel httpChannel) { - if (clientStatsEnabled) { - final HttpStats.ClientStats clientStats; - if (httpChannel != null) { - clientStats = new HttpStats.ClientStats(threadPool.absoluteTimeInMillis()); - httpChannelStats.put(HttpStats.ClientStats.getChannelKey(httpChannel), clientStats); - httpChannel.addCloseListener(ActionListener.wrap(() -> { - try { - HttpStats.ClientStats disconnectedClientStats = - httpChannelStats.get(HttpStats.ClientStats.getChannelKey(httpChannel)); - if (disconnectedClientStats != null) { - disconnectedClientStats.closedTimeMillis = threadPool.absoluteTimeInMillis(); - } - } catch (Exception e) { - assert false : e; - // the listener code above should never throw - logger.trace("error removing HTTP channel listener", e); - } - })); - } else { - clientStats = null; - } - pruneClientStats(true); - return clientStats; - } else { - return null; - } - } - /** * This method handles an incoming http request. * @@ -392,7 +324,7 @@ private HttpStats.ClientStats addClientStats(final HttpChannel httpChannel) { * @param httpChannel that received the http request */ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) { - updateClientStats(httpRequest, httpChannel); + httpClientStatsTracker.updateClientStats(httpRequest, httpChannel); final long startTime = threadPool.relativeTimeInMillis(); try { handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException()); @@ -406,61 +338,6 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt } } - void updateClientStats(final HttpRequest httpRequest, final HttpChannel httpChannel) { - if (clientStatsEnabled && httpChannel != null) { - HttpStats.ClientStats clientStats = httpChannelStats.get(HttpStats.ClientStats.getChannelKey(httpChannel)); - if (clientStats == null) { - // will always return a non-null value when httpChannel is non-null - clientStats = addClientStats(httpChannel); - } - - if (clientStats.agent == null) { - final String elasticProductOrigin = getFirstValueForHeader(httpRequest, "x-elastic-product-origin"); - if (elasticProductOrigin != null) { - clientStats.agent = elasticProductOrigin; - } else { - final String userAgent = getFirstValueForHeader(httpRequest, "User-Agent"); - if (userAgent != null) { - clientStats.agent = userAgent; - } - } - } - if (clientStats.localAddress == null) { - clientStats.localAddress = - httpChannel.getLocalAddress() == null ? null : NetworkAddress.format(httpChannel.getLocalAddress()); - clientStats.remoteAddress = - httpChannel.getRemoteAddress() == null ? null : NetworkAddress.format(httpChannel.getRemoteAddress()); - } - if (clientStats.forwardedFor == null) { - final String forwardedFor = getFirstValueForHeader(httpRequest, "x-forwarded-for"); - if (forwardedFor != null) { - clientStats.forwardedFor = forwardedFor; - } - } - if (clientStats.opaqueId == null) { - final String opaqueId = getFirstValueForHeader(httpRequest, "x-opaque-id"); - if (opaqueId != null) { - clientStats.opaqueId = opaqueId; - } - } - clientStats.lastRequestTimeMillis = threadPool.absoluteTimeInMillis(); - clientStats.lastUri = httpRequest.uri(); - clientStats.requestCount.increment(); - clientStats.requestSizeBytes.add(httpRequest.content().length()); - } - } - - private static String getFirstValueForHeader(final HttpRequest request, final String header) { - for (Map.Entry> entry : request.getHeaders().entrySet()) { - if (entry.getKey().equalsIgnoreCase(header)) { - if (entry.getValue().size() > 0) { - return entry.getValue().get(0); - } - } - } - return null; - } - // Visible for testing void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) { final ThreadContext threadContext = threadPool.getThreadContext(); diff --git a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java new file mode 100644 index 0000000000000..1a3c2345e9280 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java @@ -0,0 +1,185 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * Tracks a collection of {@link org.elasticsearch.http.HttpStats.ClientStats} for current and recently-closed HTTP connections. + */ +public class HttpClientStatsTracker { + + private static final Logger logger = LogManager.getLogger(); + + private static final long PRUNE_THROTTLE_INTERVAL = TimeUnit.SECONDS.toMillis(60); + private static final long MAX_CLIENT_STATS_AGE = TimeUnit.MINUTES.toMillis(5); + + private final Map httpChannelStats = new ConcurrentHashMap<>(); + private final ThreadPool threadPool; + + private volatile long lastClientStatsPruneTime; + private volatile boolean clientStatsEnabled; + + HttpClientStatsTracker(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + this.threadPool = threadPool; + clientStatsEnabled = HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED.get(settings); + clusterSettings.addSettingsUpdateConsumer(HttpTransportSettings.SETTING_HTTP_CLIENT_STATS_ENABLED, this::enableClientStats); + } + + /** + * Prunes client stats of entries that have been disconnected for more than {@link #MAX_CLIENT_STATS_AGE} (i.e. 5 minutes). + * + * @param throttled When true, executes the prune process only if more than {@link #PRUNE_THROTTLE_INTERVAL} (i.e. 60 seconds) has + * elapsed since the last execution. + */ + private void pruneClientStats(boolean throttled) { + if (clientStatsEnabled && throttled == false || + (threadPool.relativeTimeInMillis() - lastClientStatsPruneTime > PRUNE_THROTTLE_INTERVAL)) { + long nowMillis = threadPool.absoluteTimeInMillis(); + for (var statsEntry : httpChannelStats.entrySet()) { + long closedTimeMillis = statsEntry.getValue().closedTimeMillis; + if (closedTimeMillis > 0 && (nowMillis - closedTimeMillis > MAX_CLIENT_STATS_AGE)) { + httpChannelStats.remove(statsEntry.getKey()); + } + } + lastClientStatsPruneTime = threadPool.relativeTimeInMillis(); + } + } + + /** + * Enables or disables collection of HTTP client stats. + */ + private void enableClientStats(boolean enabled) { + this.clientStatsEnabled = enabled; + if (enabled == false) { + // when disabling, immediately clear client stats + httpChannelStats.clear(); + } + } + + /** + * Register the given channel with this tracker. + * + * @return the corresponding newly-created stats object, or {@code null} if disabled. + */ + HttpStats.ClientStats addClientStats(final HttpChannel httpChannel) { + if (clientStatsEnabled) { + final HttpStats.ClientStats clientStats; + if (httpChannel != null) { + clientStats = new HttpStats.ClientStats(threadPool.absoluteTimeInMillis()); + httpChannelStats.put(getChannelKey(httpChannel), clientStats); + httpChannel.addCloseListener(ActionListener.wrap(() -> { + try { + HttpStats.ClientStats disconnectedClientStats = + httpChannelStats.get(getChannelKey(httpChannel)); + if (disconnectedClientStats != null) { + disconnectedClientStats.closedTimeMillis = threadPool.absoluteTimeInMillis(); + } + } catch (Exception e) { + assert false : e; // the listener code above should never throw + logger.warn("error removing HTTP channel listener", e); + } + })); + } else { + clientStats = null; + } + pruneClientStats(true); + return clientStats; + } else { + return null; + } + } + + private static String getFirstValueForHeader(final HttpRequest request, final String header) { + for (Map.Entry> entry : request.getHeaders().entrySet()) { + if (entry.getKey().equalsIgnoreCase(header)) { + if (entry.getValue().size() > 0) { + return entry.getValue().get(0); + } + } + } + return null; + } + + /** + * Adjust the stats for the given channel to reflect the latest request received. + */ + void updateClientStats(final HttpRequest httpRequest, final HttpChannel httpChannel) { + if (clientStatsEnabled && httpChannel != null) { + HttpStats.ClientStats clientStats = httpChannelStats.get(getChannelKey(httpChannel)); + if (clientStats == null) { + // will always return a non-null value when httpChannel is non-null + clientStats = addClientStats(httpChannel); + } + + if (clientStats.agent == null) { + final String elasticProductOrigin = getFirstValueForHeader(httpRequest, "x-elastic-product-origin"); + if (elasticProductOrigin != null) { + clientStats.agent = elasticProductOrigin; + } else { + final String userAgent = getFirstValueForHeader(httpRequest, "User-Agent"); + if (userAgent != null) { + clientStats.agent = userAgent; + } + } + } + if (clientStats.localAddress == null) { + clientStats.localAddress = + httpChannel.getLocalAddress() == null ? null : NetworkAddress.format(httpChannel.getLocalAddress()); + clientStats.remoteAddress = + httpChannel.getRemoteAddress() == null ? null : NetworkAddress.format(httpChannel.getRemoteAddress()); + } + if (clientStats.forwardedFor == null) { + final String forwardedFor = getFirstValueForHeader(httpRequest, "x-forwarded-for"); + if (forwardedFor != null) { + clientStats.forwardedFor = forwardedFor; + } + } + if (clientStats.opaqueId == null) { + final String opaqueId = getFirstValueForHeader(httpRequest, "x-opaque-id"); + if (opaqueId != null) { + clientStats.opaqueId = opaqueId; + } + } + clientStats.lastRequestTimeMillis = threadPool.absoluteTimeInMillis(); + clientStats.lastUri = httpRequest.uri(); + clientStats.requestCount.increment(); + clientStats.requestSizeBytes.add(httpRequest.content().length()); + } + } + + /** + * @return a list of the stats for the channels that are currently being tracked. + */ + List getClientStats() { + pruneClientStats(false); + return new ArrayList<>(httpChannelStats.values()); + } + + /** + * Returns a key suitable for use in a hash table for the specified HttpChannel + */ + private static int getChannelKey(HttpChannel channel) { + // always use an identity-based hash code rather than one based on object state + return System.identityHashCode(channel); + } + +} diff --git a/server/src/main/java/org/elasticsearch/http/HttpStats.java b/server/src/main/java/org/elasticsearch/http/HttpStats.java index 6401f5f43d822..ccf47c73dce76 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpStats.java +++ b/server/src/main/java/org/elasticsearch/http/HttpStats.java @@ -190,13 +190,5 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(requestCount.longValue()); out.writeLong(requestSizeBytes.longValue()); } - - /** - * Returns a key suitable for use in a hash table for the specified HttpChannel - */ - public static int getChannelKey(HttpChannel channel) { - // always use an identity-based hash code rather than one based on object state - return System.identityHashCode(channel); - } } }