Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watcher: Configure HttpClient parallel sent requests #30130

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions x-pack/docs/en/settings/notification-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ request is aborted.
Specifies the maximum size a HTTP response is allowed to have, defaults to
`10mb`, the maximum configurable value is `50mb`.

`xpack.http.apache.evict_idle_connections`::
A setting to configure if the internal HTTP client used in watcher should
evict connections with a background thread. Defaults to `true`.

`xpack.http.apache.evict_idle_connections_timeout`::
If connections should be evicted, this specifies the possible timeout.
Defaults to `2m`.

`xpack.http.apache.max_conn_total`::
The number of total open connections in parallel. Defaults to `100`.

`xpack.http.apache.max_conn_total_per_route`::
The number of open connections per the same route in parallel. Defaults
to the `xpack.http.apache.max_conn_total` setting.

[[ssl-notification-settings]]
:ssl-prefix: xpack.http
:component: {watcher}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
Expand All @@ -57,9 +58,9 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.xpack.core.ssl.SSLService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -391,6 +392,13 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
.collect(toList());
}

@Override
public void close() throws IOException {
for (Plugin plugin : plugins) {
plugin.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be IOUtils.close(plugins);?

}
}

private <T> List<T> filterPlugins(Class<T> type) {
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
Expand Down Expand Up @@ -216,6 +217,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {

private static final Logger logger = Loggers.getLogger(Watcher.class);
private WatcherIndexingListener listener;
private HttpClient httpClient;

protected final Settings settings;
protected final boolean transportClient;
Expand Down Expand Up @@ -266,7 +268,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
// TODO: add more auth types, or remove this indirection
HttpAuthRegistry httpAuthRegistry = new HttpAuthRegistry(httpAuthFactories);
HttpRequestTemplate.Parser httpTemplateParser = new HttpRequestTemplate.Parser(httpAuthRegistry);
final HttpClient httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());
httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());

// notification
EmailService emailService = new EmailService(settings, cryptoService, clusterService.getClusterSettings());
Expand Down Expand Up @@ -608,4 +610,9 @@ public List<BootstrapCheck> getBootstrapChecks() {
public List<ScriptContext> getContexts() {
return Arrays.asList(Watcher.SCRIPT_SEARCH_CONTEXT, Watcher.SCRIPT_EXECUTABLE_CONTEXT, Watcher.SCRIPT_TEMPLATE_CONTEXT);
}

@Override
public void close() throws IOException {
IOUtils.closeWhileHandlingException(httpClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import javax.net.ssl.HostnameVerifier;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -55,8 +56,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class HttpClient extends AbstractComponent {
public class HttpClient extends AbstractComponent implements Closeable {

private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl.";

Expand Down Expand Up @@ -84,6 +86,16 @@ public HttpClient(Settings settings, HttpAuthRegistry httpAuthRegistry, SSLServi
SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier);
clientBuilder.setSSLSocketFactory(factory);

if (HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS.get(settings)) {
clientBuilder.evictExpiredConnections();
TimeValue timeout = HttpSettings.APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT.get(settings);
clientBuilder.evictIdleConnections(timeout.millis(), TimeUnit.MILLISECONDS);
}
int maxConnectionsPerRoute = HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE.get(settings);
clientBuilder.setMaxConnPerRoute(maxConnectionsPerRoute );
int maxConnectionsTotal = HttpSettings.APACHE_HTTP_CLIENT_MAX_CONN_TOTAL.get(settings);
clientBuilder.setMaxConnTotal(maxConnectionsTotal);

client = clientBuilder.build();
}

Expand Down Expand Up @@ -251,6 +263,11 @@ private URI createURI(HttpRequest request) {
}
}

@Override
public void close() throws IOException {
client.close();
}

/**
* Helper class to have all HTTP methods except HEAD allow for an body, including GET
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,29 @@ public class HttpSettings {
static final Setting<TimeValue> CONNECTION_TIMEOUT = Setting.timeSetting("xpack.http.default_connection_timeout",
DEFAULT_CONNECTION_TIMEOUT, Property.NodeScope);


// these are very apache http client specific settings, which only apply to how the apache http client is working
// keep them in their own namespace, so that we could for example switch to the new java http client to get rid
// of another dependency in the future
// more information https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html

// should idle connections be evicted? This will start an additional thread doing this
static final Setting<Boolean> APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS =
Setting.boolSetting("xpack.http.apache.evict_idle_connections", false, Property.NodeScope);
// what is the timeout for evicting idle connections
// this prevents form many connections being open due to the pooled client
// this value resembles the default set in org.apache.http.impl.client.HttpClientBuilder.build()
static final Setting<TimeValue> APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT =
Setting.timeSetting("xpack.http.apache.evict_idle_connections_timeout", TimeValue.timeValueSeconds(10), Property.NodeScope);
// how many total connections should the http client be able to keep open at once
static final Setting<Integer> APACHE_HTTP_CLIENT_MAX_CONN_TOTAL =
Setting.intSetting("xpack.http.apache.max_conn_total", 100, 1, Property.NodeScope);
// how many total connections per route should the http client be able to keep open at once
// this for example defines how often a user is able to poll the same _search endpoint of a remote cluster, which is
// also the reason why this is set to the same value than the total connections
static final Setting<Integer> APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE =
Setting.intSetting("xpack.http.apache.max_conn_total_per_route", APACHE_HTTP_CLIENT_MAX_CONN_TOTAL, 1, Property.NodeScope);

private static final String PROXY_HOST_KEY = "xpack.http.proxy.host";
private static final String PROXY_PORT_KEY = "xpack.http.proxy.port";
private static final String PROXY_SCHEME_KEY = "xpack.http.proxy.scheme";
Expand All @@ -54,6 +77,10 @@ public static List<? extends Setting<?>> getSettings() {
settings.add(PROXY_PORT);
settings.add(PROXY_SCHEME);
settings.add(MAX_HTTP_RESPONSE_SIZE);
settings.add(APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS);
settings.add(APACHE_HTTP_CLIENT_EVICT_IDLE_CONNECTIONS_TIMEOUT);
settings.add(APACHE_HTTP_CLIENT_MAX_CONN_TOTAL);
settings.add(APACHE_HTTP_CLIENT_MAX_CONN_PER_ROUTE);
return settings;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.junit.Before;

import javax.mail.internet.AddressException;

import java.io.IOException;
import java.util.Map;

Expand Down Expand Up @@ -219,10 +218,9 @@ private WebhookActionFactory webhookFactory(HttpClient client) {

public void testThatSelectingProxyWorks() throws Exception {
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
new SSLService(environment.settings(), environment));

try (MockWebServer proxyServer = new MockWebServer()) {
try (HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
new SSLService(environment.settings(), environment)); MockWebServer proxyServer = new MockWebServer()) {
proxyServer.start();
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));

Expand Down
Loading