Skip to content

Commit

Permalink
Merge pull request #1766 from ClickHouse/clientv2_connection_pooling
Browse files Browse the repository at this point in the history
[client-v2] Connection pool configuration
  • Loading branch information
chernser authored Aug 15, 2024
2 parents 2a41352 + d4ad2cc commit 82f8b21
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 61 deletions.
119 changes: 109 additions & 10 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.client.http.ClickHouseHttpProto;
import com.clickhouse.client.http.config.ClickHouseHttpOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.format.BinaryStreamUtils;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.NoHttpResponseException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -220,17 +223,17 @@ public Builder() {
public Builder addEndpoint(String endpoint) {
try {
URL endpointURL = new java.net.URL(endpoint);
if (!(endpointURL.getProtocol().equalsIgnoreCase("https") ||
endpointURL.getProtocol().equalsIgnoreCase("http"))) {

if (endpointURL.getProtocol().equalsIgnoreCase("https")) {
addEndpoint(Protocol.HTTP, endpointURL.getHost(), endpointURL.getPort(), true);
} else if (endpointURL.getProtocol().equalsIgnoreCase("http")) {
addEndpoint(Protocol.HTTP, endpointURL.getHost(), endpointURL.getPort(), false);
} else {
throw new IllegalArgumentException("Only HTTP and HTTPS protocols are supported");
}
} catch (java.net.MalformedURLException e) {
throw new IllegalArgumentException("Endpoint should be a valid URL string", e);
}
if (endpoint.startsWith("https://")) {
this.configuration.put(ClickHouseClientOption.SSL.getKey(), "true");
}
this.endpoints.add(endpoint);
return this;
}

Expand All @@ -252,7 +255,7 @@ public Builder addEndpoint(Protocol protocol, String host, int port, boolean sec
this.configuration.put(ClickHouseClientOption.SSL.getKey(), "true");
}
String endpoint = String.format("%s%s://%s:%d", protocol.toString().toLowerCase(), secure ? "s": "", host, port);
this.addEndpoint(endpoint);
this.endpoints.add(endpoint);
return this;
}

Expand Down Expand Up @@ -302,7 +305,15 @@ public Builder setAccessToken(String accessToken) {
return this;
}

// SOCKET SETTINGS
/**
* Configures client to use build-in connection pool
* @param enable - if connection pool should be enabled
* @return
*/
public Builder enableConnectionPool(boolean enable) {
this.configuration.put("connection_pool_enabled", String.valueOf(enable));
return this;
}

/**
* Default connection timeout in milliseconds. Timeout is applied to establish a connection.
Expand All @@ -324,6 +335,72 @@ public Builder setConnectTimeout(long timeout, ChronoUnit unit) {
return this.setConnectTimeout(Duration.of(timeout, unit).toMillis());
}

/**
* Set timeout for waiting a free connection from a pool when all connections are leased.
* This configuration is important when need to fail fast in high concurrent scenarios.
* Default is 10 s.
* @param timeout - connection timeout in milliseconds
* @param unit - time unit
*/
public Builder setConnectionRequestTimeout(long timeout, ChronoUnit unit) {
this.configuration.put("connection_request_timeout", String.valueOf(Duration.of(timeout, unit).toMillis()));
return this;
}

/**
* Sets the maximum number of connections that can be opened at the same time to a single server. Limit is not
* a hard stop. It is done to prevent threads stuck inside a connection pool waiting for a connection.
* Default is 10. It is recommended to set a higher value for a high concurrent applications. It will let
* more threads to get a connection and execute a query.
*
* @param maxConnections - maximum number of connections
*/
public Builder setMaxConnections(int maxConnections) {
this.configuration.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), String.valueOf(maxConnections));
return this;
}

/**
* Sets how long any connection would be considered as active and able for a lease.
* After this time connection will be marked for sweep and will not be returned from a pool.
* Has more effect than keep-alive timeout.
* @param timeout - time in unit
* @param unit - time unit
* @return
*/
public Builder setConnectionTTL(long timeout, ChronoUnit unit) {
this.configuration.put(ClickHouseClientOption.CONNECTION_TTL.getKey(), String.valueOf(Duration.of(timeout, unit).toMillis()));
return this;
}

/**
* Sets keep alive timeout for a connection to override server value. If set to -1 then server value will be used.
* Default is -1.
* Doesn't override connection TTL value.
* {@see Client#setConnectionTTL}
* @param timeout - time in unit
* @param unit - time unit
* @return
*/
public Builder setKeepAliveTimeout(long timeout, ChronoUnit unit) {
this.configuration.put(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT.getKey(), String.valueOf(Duration.of(timeout, unit).toMillis()));
return this;
}

/**
* Sets strategy of how connections are reuse.
* Default is {@link ConnectionReuseStrategy#FIFO} to evenly distribute load between them.
*
* @param strategy - strategy for connection reuse
* @return
*/
public Builder setConnectionReuseStrategy(ConnectionReuseStrategy strategy) {
this.configuration.put("connection_reuse_strategy", strategy.name());
return this;
}

// SOCKET SETTINGS

/**
* Default socket timeout in milliseconds. Timeout is applied to read and write operations.
*
Expand Down Expand Up @@ -485,8 +562,8 @@ public Builder setProxyCredentials(String user, String pass) {
* @param timeUnit
* @return
*/
public Builder setExecutionTimeout(long timeout, TimeUnit timeUnit) {
this.configuration.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), String.valueOf(timeUnit.toMillis(timeout)));
public Builder setExecutionTimeout(long timeout, ChronoUnit timeUnit) {
this.configuration.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), String.valueOf(Duration.of(timeout, timeUnit).toMillis()));
return this;
}

Expand Down Expand Up @@ -719,6 +796,26 @@ private Map<String, String> setDefaults(Map<String, String> userConfig) {
userConfig.put(ClickHouseClientOption.ASYNC.getKey(), "false");
}

if (!userConfig.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) {
userConfig.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), "10");
}

if (!userConfig.containsKey("connection_request_timeout")) {
userConfig.put("connection_request_timeout", "10000");
}

if (!userConfig.containsKey("connection_reuse_strategy")) {
userConfig.put("connection_reuse_strategy", ConnectionReuseStrategy.FIFO.name());
}

if (!userConfig.containsKey("connection_pool_enabled")) {
userConfig.put("connection_pool_enabled", "true");
}

if (!userConfig.containsKey("connection_ttl")) {
userConfig.put("connection_ttl", "-1");
}

return userConfig;
}
}
Expand Down Expand Up @@ -1212,6 +1309,8 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
return new QueryResponse(httpResponse, finalSettings.getFormat(), finalSettings, metrics);
} catch (ClientException e) {
throw e;
} catch (ConnectionRequestTimeoutException | ConnectTimeoutException e) {
throw new ConnectionInitiationException("Failed to get connection", e);
} catch (Exception e) {
throw new ClientException("Failed to execute query", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.clickhouse.client.api;

public enum ConnectionReuseStrategy {

/**
* Reuse recently freed connection and returned to a pool
*/
LIFO,

/**
* Reuse mostly all connections
*/
FIFO
;
}
Loading

0 comments on commit 82f8b21

Please sign in to comment.