diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java index 584cb8fb0..81924fe80 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java @@ -425,7 +425,16 @@ public enum ClickHouseClientOption implements ClickHouseOption { /** * Query ID to be attached to an operation */ - QUERY_ID("query_id", "", "Query id"); + QUERY_ID("query_id", "", "Query id"), + + + /** + * Connection time to live in milliseconds. 0 or negative number means no limit. + * Can be used to override keep-alive time suggested by a server. + */ + CONNECTION_TTL("connection_ttl", 0L, + "Connection time to live in milliseconds. 0 or negative number means no limit."), + ; private final String key; private final Serializable defaultValue; diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java index 04cc650f1..e108b53d3 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java @@ -40,7 +40,10 @@ import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.pool.PoolConcurrencyPolicy; +import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; import org.apache.hc.core5.util.VersionInfo; @@ -91,14 +94,31 @@ private CloseableHttpClient newConnection(ClickHouseConfig c) throws IOException r.register("https", socketFactory.create(c, SSLConnectionSocketFactory.class)); } - HttpConnectionManager connManager = new HttpConnectionManager(r.build(), c); + long connectionTTL = config.getLongOption(ClickHouseClientOption.CONNECTION_TTL); + log.info("Connection TTL: %d ms", connectionTTL); + String poolReuseStrategy = c.getStrOption(ClickHouseHttpOption.CONNECTION_REUSE_STRATEGY); + PoolReusePolicy poolReusePolicy = PoolReusePolicy.LIFO; + if (poolReuseStrategy != null && !poolReuseStrategy.isEmpty()) { + try { + poolReusePolicy = PoolReusePolicy.valueOf(poolReuseStrategy); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid connection reuse strategy: " + poolReuseStrategy); + } + } + log.info("Connection reuse strategy: %s", poolReusePolicy.name()); + HttpConnectionManager connManager = new HttpConnectionManager(r.build(), c, PoolConcurrencyPolicy.LAX, + poolReusePolicy, TimeValue.ofMilliseconds(connectionTTL)); int maxConnection = config.getIntOption(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS); - connManager.setMaxTotal(maxConnection); + connManager.setMaxTotal(Integer.MAX_VALUE); // unlimited on global level connManager.setDefaultMaxPerRoute(maxConnection); HttpClientBuilder builder = HttpClientBuilder.create().setConnectionManager(connManager) .disableContentCompression(); + long timeout = c.getLongOption(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT); + if (timeout > 0) { + builder.setKeepAliveStrategy((response, context) -> TimeValue.ofMilliseconds(timeout)); + } if (c.getProxyType() == ClickHouseProxyType.HTTP) { builder.setProxy(new HttpHost(c.getProxyHost(), c.getProxyPort())); } @@ -394,9 +414,10 @@ static class HttpConnectionManager extends PoolingHttpClientConnectionManager { versionInfo != null && !versionInfo.isEmpty() ? versionInfo : PROVIDER); } - public HttpConnectionManager(Registry socketFactory, ClickHouseConfig config) { - super(socketFactory); - + public HttpConnectionManager(Registry socketFactory, ClickHouseConfig config, + PoolConcurrencyPolicy poolConcurrentcyPolicy, PoolReusePolicy poolReusePolicy, + TimeValue ttl) { + super(socketFactory, poolConcurrentcyPolicy, poolReusePolicy, ttl); ConnectionConfig connConfig = ConnectionConfig.custom() .setConnectTimeout(Timeout.of(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)) .setValidateAfterInactivity(config.getLongOption(ClickHouseHttpOption.AHC_VALIDATE_AFTER_INACTIVITY), TimeUnit.MILLISECONDS) diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java index 0149f1053..eb267cf25 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java @@ -87,7 +87,28 @@ public enum ClickHouseHttpOption implements ClickHouseOption { *
  • {@code 503 Service Unavailable}
  • * */ - AHC_RETRY_ON_FAILURE("ahc_retry_on_failure", false, "Whether to retry on failure with AsyncHttpClient.") + AHC_RETRY_ON_FAILURE("ahc_retry_on_failure", false, "Whether to retry on failure with AsyncHttpClient."), + + /** + * Configuration for AsyncHttpClient connection pool. It defines how to reuse connections. + * If {@code "FIFO"} is set, the connections are reused in the order they were created. + * If {@code "LIFO"} is set, the connections are reused as soon they are available. + * Default value is {@code "LIFO"}. + */ + CONNECTION_REUSE_STRATEGY("connection_reuse_strategy", "LIFO", + "Connection reuse strategy for AsyncHttpClient. Valid values: LIFO, FIFO"), + + /** + * Configures client with preferred connection keep alive timeout if keep alive is enabled. + * Usually servers tells a client how long it can keep a connection alive. This option can be used + * when connection should be ended earlier. If value less or equal to 0, the server's timeout is used. + * Default value is -1. + * Time unit is milliseconds. + * + * Supported only for Apache Http Client connection provider currently. + */ + KEEP_ALIVE_TIMEOUT("alive_timeout", -1L, + "Default keep-alive timeout in milliseconds."), ; private final String key; diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java index e02652e52..2b2a54397 100644 --- a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java +++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java @@ -9,33 +9,42 @@ import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseSocketFactory; import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.config.ClickHouseDefaults; +import com.clickhouse.client.config.ClickHouseProxyType; import com.clickhouse.client.http.config.ClickHouseHttpOption; import com.clickhouse.client.http.config.HttpConnectionProvider; import com.clickhouse.config.ClickHouseOption; import com.clickhouse.data.ClickHouseUtils; - -import java.io.IOException; -import java.io.Serializable; -import java.net.ConnectException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; - import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.common.Slf4jNotifier; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; import com.github.tomakehurst.wiremock.http.Fault; +import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener; import com.github.tomakehurst.wiremock.stubbing.Scenario; import com.github.tomakehurst.wiremock.stubbing.StubMapping; import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.net.URIBuilder; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Ignore; import org.testng.annotations.Test; +import java.io.IOException; +import java.io.Serializable; +import java.net.ConnectException; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + public class ApacheHttpConnectionImplTest extends ClickHouseHttpClientTest { public static class CustomSocketFactory implements ClickHouseSocketFactory { private static final AtomicBoolean created = new AtomicBoolean(); @@ -267,4 +276,102 @@ public void testNoHttpResponseExceptionWithValidation(long validationTimeout) { public static Object[] validationTimeoutProvider() { return new Long[] {-1L , 100L }; } + + @Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider") + @SuppressWarnings("java:S2925") + public void testConnectionTTL(Map options, int openSockets) throws Exception { + if (isCloud()) { + // skip for cloud because wiremock proxy need extra configuration. TODO: need to fix it + return; + } + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + int proxyPort = new Random().nextInt(1000) + 10000; + System.out.println("proxyPort: " + proxyPort); + ConnectionCounterListener connectionCounter = new ConnectionCounterListener(); + WireMockServer proxy = new WireMockServer(WireMockConfiguration + .options().port(proxyPort) + .networkTrafficListener(connectionCounter) + .notifier(new Slf4jNotifier(true))); + proxy.start(); + URIBuilder targetURI = new URIBuilder(server.getBaseUri()) + .setPath(""); + proxy.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse().proxiedFrom(targetURI.build().toString())).build()); + + Map baseOptions = new HashMap<>(); + baseOptions.put(ClickHouseClientOption.PROXY_PORT, proxyPort); + baseOptions.put(ClickHouseClientOption.PROXY_HOST, "localhost"); + baseOptions.put(ClickHouseClientOption.PROXY_TYPE, ClickHouseProxyType.HTTP); + baseOptions.put(ClickHouseDefaults.PASSWORD, getPassword()); + baseOptions.put(ClickHouseDefaults.USER, "default"); + baseOptions.putAll(options); + + ClickHouseConfig config = new ClickHouseConfig(baseOptions); + try (ClickHouseClient client = ClickHouseClient.builder().config(config).build()) { + try (ClickHouseResponse resp = client.read(server).query("select 1").executeAndWait()) { + Assert.assertEquals(resp.firstRecord().getValue(0).asString(), "1"); + } + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + Assert.fail("Unexpected exception", e); + } + + try (ClickHouseResponse resp = client.read(server).query("select 1").executeAndWait()) { + Assert.assertEquals(resp.firstRecord().getValue(0).asString(), "1"); + } + } catch (Exception e) { + Assert.fail("Unexpected exception", e); + } finally { + Assert.assertEquals(connectionCounter.opened.get(), openSockets); + proxy.stop(); + } + } + + @DataProvider(name = "testConnectionTTLProvider") + public static Object[][] testConnectionTTLProvider() { + HashMap disabledKeepAlive = new HashMap<>(); + disabledKeepAlive.put(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT, 1000L); + disabledKeepAlive.put(ClickHouseHttpOption.KEEP_ALIVE, false); + HashMap fifoOption = new HashMap<>(); + fifoOption.put(ClickHouseClientOption.CONNECTION_TTL, 1000L); + fifoOption.put(ClickHouseHttpOption.CONNECTION_REUSE_STRATEGY, "FIFO"); + return new Object[][] { + { Collections.singletonMap(ClickHouseClientOption.CONNECTION_TTL, 1000L), 2 }, + { Collections.singletonMap(ClickHouseClientOption.CONNECTION_TTL, 2000L), 1 }, + { Collections.singletonMap(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT, 2000L), 1 }, + { Collections.singletonMap(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT, 500L), 2 }, + { disabledKeepAlive, 2 }, + { fifoOption, 2 } + }; + } + + private static class ConnectionCounterListener implements WiremockNetworkTrafficListener { + + private AtomicInteger opened = new AtomicInteger(0); + private AtomicInteger closed = new AtomicInteger(0); + + @Override + public void opened(Socket socket) { + opened.incrementAndGet(); + System.out.println("Opened: " + socket); + } + + @Override + public void incoming(Socket socket, ByteBuffer bytes) { + // ignore + } + + @Override + public void outgoing(Socket socket, ByteBuffer bytes) { + // ignore + } + + @Override + public void closed(Socket socket) { + closed.incrementAndGet(); + System.out.println("Closed: " + socket); + } + } }