Skip to content

Commit

Permalink
Merge branch 'main' into fix_stale_connection_issue
Browse files Browse the repository at this point in the history
  • Loading branch information
chernser committed Jul 9, 2024
2 parents 41b3e29 + 66f8bce commit 845e76d
Show file tree
Hide file tree
Showing 22 changed files with 1,778 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class ClickHouseRequest<SelfT extends ClickHouseRequest<SelfT>> implement

static {
Set<String> set = new HashSet<>();
set.add("query_id");
set.add(ClickHouseClientOption.QUERY_ID.getKey());
set.add(ClickHouseClientOption.SESSION_ID.getKey());
set.add(ClickHouseClientOption.SESSION_CHECK.getKey());
set.add(ClickHouseClientOption.SESSION_TIMEOUT.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,12 @@ public enum ClickHouseClientOption implements ClickHouseOption {
* false.
*/
USE_TIME_ZONE("use_time_zone", "", "Time zone of all DateTime* values. "
+ "Only used when use_server_time_zone is false. Empty value means client time zone.");
+ "Only used when use_server_time_zone is false. Empty value means client time zone."),

/**
* Query ID to be attached to an operation
*/
QUERY_ID("query_id", "", "Query id");

private final String key;
private final Serializable defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ public HttpConnectionManager(Registry<ConnectionSocketFactory> socketFactory, Cl

ConnectionConfig connConfig = ConnectionConfig.custom()
.setConnectTimeout(Timeout.of(config.getConnectionTimeout(), TimeUnit.MILLISECONDS))
.setTimeToLive(20, TimeUnit.SECONDS)
.setValidateAfterInactivity(10, TimeUnit.SECONDS)
.setValidateAfterInactivity(config.getLongOption(ClickHouseHttpOption.AHC_VALIDATE_AFTER_INACTIVITY), TimeUnit.MILLISECONDS)
.build();
setDefaultConnectionConfig(connConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public enum ClickHouseHttpOption implements ClickHouseOption {
REMEMBER_LAST_SET_ROLES("remember_last_set_roles", false,
"Whether to remember last set role and send them in every next requests as query parameters."),

/**
* The time in milliseconds after which the connection is validated after inactivity.
* Default value is 5000 ms. If set to negative value, the connection is never validated.
* It is used only for Apache Http Client connection provider.
*/
AHC_VALIDATE_AFTER_INACTIVITY("ahc_validate_after_inactivity", 5000L,
"The time in milliseconds after which the connection is validated after inactivity."),

/**
* Whether to retry on failure with AsyncHttpClient. Failure includes some 'critical' IO exceptions:
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public List<Proxy> select(URI uri) {

private static final String USER_AGENT = ClickHouseClientOption.buildUserAgent(null, "HttpClient");

private final AtomicBoolean busy;
private final HttpClient httpClient;
private final HttpRequest pingRequest;

Expand Down Expand Up @@ -197,15 +196,13 @@ protected HttpClientConnectionImpl(ClickHouseNode server, ClickHouseRequest<?> r
builder.sslContext(ClickHouseSslContextProvider.getProvider().getSslContext(SSLContext.class, config)
.orElse(null));
}

busy = new AtomicBoolean(false);
httpClient = builder.build();
pingRequest = newRequest(getBaseUrl() + "ping");
}

@Override
protected boolean isReusable() {
return busy.get();
return true; // httpClient is stateless and can be reused
}

private CompletableFuture<HttpResponse<InputStream>> postRequest(HttpRequest request) {
Expand All @@ -218,7 +215,7 @@ private CompletableFuture<HttpResponse<InputStream>> postRequest(HttpRequest req
private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.Builder reqBuilder, byte[] boundary,
String sql, ClickHouseInputStream data, List<ClickHouseExternalTable> tables, ClickHouseOutputStream output,
Runnable postAction) throws IOException {
try {

ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config);
reqBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(stream::getInputStream));
Expand All @@ -243,14 +240,11 @@ private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.B
}

return buildResponse(config, r, output, postAction);
} finally {
busy.set(false);
}
}

private ClickHouseHttpResponse postString(ClickHouseConfig config, HttpRequest.Builder reqBuilder, String sql,
ClickHouseOutputStream output, Runnable postAction) throws IOException {
try {

reqBuilder.POST(HttpRequest.BodyPublishers.ofString(sql));
HttpResponse<InputStream> r;
try {
Expand All @@ -267,9 +261,6 @@ private ClickHouseHttpResponse postString(ClickHouseConfig config, HttpRequest.B
}
}
return buildResponse(config, r, output, postAction);
} finally {
busy.set(false);
}
}

@Override
Expand All @@ -281,9 +272,7 @@ protected final String getDefaultUserAgent() {
protected ClickHouseHttpResponse post(ClickHouseConfig config, String sql, ClickHouseInputStream data,
List<ClickHouseExternalTable> tables, ClickHouseOutputStream output, String url,
Map<String, String> headers, Runnable postAction) throws IOException {
if (!busy.compareAndSet(false, true)) {
throw new ConnectException("Connection is busy");
}

ClickHouseConfig c = config == null ? this.config : config;
HttpRequest.Builder reqBuilder = HttpRequest.newBuilder()
.uri(URI.create(ClickHouseChecker.isNullOrEmpty(url) ? this.url : url))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.net.ConnectException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -26,10 +27,13 @@
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.admin.model.ScenarioState;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
import org.apache.hc.core5.http.NoHttpResponseException;
import org.apache.hc.core5.http.HttpStatus;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -207,4 +211,63 @@ private static StubMapping[] retryOnFailureProvider() {
.build()
};
}

@Test(groups = {"unit"}, dataProvider = "validationTimeoutProvider")
public void testNoHttpResponseExceptionWithValidation(long validationTimeout) {

faultyServer = new WireMockServer(9090);
faultyServer.start();

faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl())
.inScenario("validateOnStaleConnection")
.withRequestBody(WireMock.equalTo("SELECT 100"))
.willReturn(WireMock.aResponse()
.withHeader("X-ClickHouse-Summary",
"{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}"))
.build());


ClickHouseHttpClient httpClient = new ClickHouseHttpClient();
Map<ClickHouseOption, Serializable> options = new HashMap<>();
options.put(ClickHouseHttpOption.AHC_VALIDATE_AFTER_INACTIVITY, validationTimeout);
options.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS, 1);
ClickHouseConfig config = new ClickHouseConfig(options);
httpClient.init(config);
ClickHouseRequest request = httpClient.read("http://localhost:9090/").query("SELECT 100");

Runnable powerBlink = () -> {
try {
Thread.sleep(100);
faultyServer.stop();
Thread.sleep(50);
faultyServer.start();
} catch (InterruptedException e) {
Assert.fail("Unexpected exception", e);
}
};
try {
ClickHouseResponse response = httpClient.executeAndWait(request);
Assert.assertEquals(response.getSummary().getReadRows(), 1);
response.close();
new Thread(powerBlink).start();
Thread.sleep(200);
response = httpClient.executeAndWait(request);
Assert.assertEquals(response.getSummary().getReadRows(), 1);
response.close();
} catch (Exception e) {
if (validationTimeout < 0) {
Assert.assertTrue(e instanceof ClickHouseException);
Assert.assertTrue(e.getCause() instanceof ConnectException);
} else {
Assert.fail("Unexpected exception", e);
}
} finally {
faultyServer.stop();
}
}

@DataProvider(name = "validationTimeoutProvider")
public static Object[] validationTimeoutProvider() {
return new Long[] {-1L , 100L };
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,15 @@
package com.clickhouse.jdbc.internal;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseRequest.Mutation;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.ClickHouseResponseSummary;
import com.clickhouse.client.ClickHouseSimpleResponse;
import com.clickhouse.client.ClickHouseTransaction;
import com.clickhouse.client.ClickHouseRequest.Mutation;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.config.ClickHouseConfigChangeListener;
Expand All @@ -42,16 +27,37 @@
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHouseUtils;
import com.clickhouse.data.ClickHouseValues;
import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseResultSet;
import com.clickhouse.jdbc.ClickHouseStatement;
import com.clickhouse.jdbc.JdbcTypeMapping;
import com.clickhouse.jdbc.SqlExceptionUtils;
import com.clickhouse.jdbc.JdbcWrapper;
import com.clickhouse.jdbc.SqlExceptionUtils;
import com.clickhouse.jdbc.parser.ClickHouseSqlStatement;
import com.clickhouse.jdbc.parser.StatementType;
import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

public class ClickHouseStatementImpl extends JdbcWrapper
implements ClickHouseConfigChangeListener<ClickHouseRequest<?>>, ClickHouseStatement {
Expand Down Expand Up @@ -130,11 +136,9 @@ private ClickHouseResponse getLastResponse(Map<ClickHouseOption, Serializable> o
request.set("_set_roles_stmt", requestRoles);
}

request.query(stmt.getSQL(), queryId = connection.newQueryId());
// TODO skip useless queries to reduce network calls and server load
try {
response = autoTx ? request.executeWithinTransaction(connection.isImplicitTransactionSupported())
: request.transaction(connection.getTransaction()).executeAndWait();
response = sendRequest(stmt.getSQL(), r -> r);
} catch (Exception e) {
throw SqlExceptionUtils.handle(e);
} finally {
Expand Down Expand Up @@ -272,7 +276,6 @@ protected ClickHouseResponse processSqlStatement(ClickHouseSqlStatement stmt) th

protected ClickHouseResponse executeStatement(String stmt, Map<ClickHouseOption, Serializable> options,
List<ClickHouseExternalTable> tables, Map<String, String> settings) throws SQLException {
boolean autoTx = connection.getAutoCommit() && connection.isTransactionSupported();
try {
if (options != null) {
request.options(options);
Expand Down Expand Up @@ -310,9 +313,8 @@ protected ClickHouseResponse executeStatement(String stmt, Map<ClickHouseOption,
}
request.external(list);
}
request.query(stmt, queryId = connection.newQueryId());
return autoTx ? request.executeWithinTransaction(connection.isImplicitTransactionSupported())
: request.transaction(connection.getTransaction()).executeAndWait();

return sendRequest(stmt, r -> r);
} catch (Exception e) {
throw SqlExceptionUtils.handle(e);
}
Expand All @@ -328,18 +330,61 @@ protected ClickHouseResponse executeStatement(ClickHouseSqlStatement stmt,
return executeStatement(stmt.getSQL(), options, tables, settings);
}

protected int executeInsert(String sql, InputStream input) throws SQLException {
private ClickHouseResponse sendRequest(String sql, Function<ClickHouseRequest<?>, ClickHouseRequest<?>> preSeal) throws SQLException {
boolean autoTx = connection.getAutoCommit() && connection.isTransactionSupported();
Mutation req = request.write().query(sql, queryId = connection.newQueryId()).data(input);
try (ClickHouseResponse resp = autoTx
? req.executeWithinTransaction(connection.isImplicitTransactionSupported())
: req.transaction(connection.getTransaction()).executeAndWait();
ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp)) {
// ignore

ClickHouseRequest<?> req;
ClickHouseTransaction tx = null;
synchronized (request) {
try {
if (autoTx) {
if (connection.isImplicitTransactionSupported()) {
request.set(ClickHouseTransaction.SETTING_IMPLICIT_TRANSACTION, 1).transaction(null);
} else {
tx = request.getManager().createImplicitTransaction(request);
request.transaction(connection.getTransaction());
}
} else {
try {
request.transaction(connection.getTransaction());
} catch (ClickHouseException e) {
throw SqlExceptionUtils.handle(e);
}
}

req = preSeal.apply(request).query(sql, queryId = connection.newQueryId()).seal();
} catch (Exception e) {
throw SqlExceptionUtils.handle(e);
}
}

try {
return req.executeAndWait();
} catch (Exception e) {
if (tx != null) {
try {
tx.rollback();
} catch (Exception ex) {
log.warn("Failed to rollback transaction", ex);
}
}
throw SqlExceptionUtils.handle(e);
} finally {
try {
request.transaction(null);
} catch (Exception e) {
throw SqlExceptionUtils.handle(ClickHouseException.of(e, req.getServer()));
}
}
}

protected int executeInsert(String sql, InputStream input) throws SQLException {
try (ClickHouseResponse response = sendRequest(sql, r -> r.write().data(input));
ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), response)) {
// no more actions needed
} catch (Exception e) {
throw SqlExceptionUtils.handle(e);
}
return (int) currentUpdateCount;
}

Expand Down
Loading

0 comments on commit 845e76d

Please sign in to comment.