Skip to content

Commit

Permalink
fix proxy configuration globally for the client
Browse files Browse the repository at this point in the history
  • Loading branch information
chernser committed Jun 18, 2024
1 parent 5be856e commit 4fd0f5f
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 38 deletions.
52 changes: 14 additions & 38 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.clickhouse.client.api.insert.POJOSerializer;
import com.clickhouse.client.api.insert.SerializerNotFoundException;
import com.clickhouse.client.api.internal.ClientStatisticsHolder;
import com.clickhouse.client.api.internal.ClientV1AdaptorHelper;
import com.clickhouse.client.api.internal.SerializerUtils;
import com.clickhouse.client.api.internal.SettingsConverter;
import com.clickhouse.client.api.internal.TableSchemaParser;
Expand All @@ -30,6 +31,7 @@
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.client.api.query.Records;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseDataStreamFactory;
import com.clickhouse.data.ClickHouseFormat;
Expand All @@ -42,6 +44,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
Expand Down Expand Up @@ -430,10 +433,11 @@ public boolean ping() {
public boolean ping(long timeout) {
ValidationUtils.checkRange(timeout, TimeUnit.SECONDS.toMillis(1), TimeUnit.MINUTES.toMillis(10),
"timeout");
ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
return clientPing.ping(getServerNode(), Math.toIntExact(timeout));
}

try (ClickHouseClient client = ClientV1AdaptorHelper.createClient(configuration)) {
return client.ping(getServerNode(), Math.toIntExact(timeout));
}
}

/**
* <p>Registers a POJO class and maps its fields to a table schema</p>
Expand Down Expand Up @@ -615,8 +619,9 @@ public CompletableFuture<InsertResponse> insert(String tableName,

CompletableFuture<InsertResponse> responseFuture = new CompletableFuture<>();

try (ClickHouseClient client = createClient()) {
ClickHouseRequest.Mutation request = createMutationRequest(client.write(getServerNode()), tableName, settings).format(format);
try (ClickHouseClient client = ClientV1AdaptorHelper.createClient(configuration)) {
ClickHouseRequest.Mutation request = ClientV1AdaptorHelper
.createMutationRequest(client.write(getServerNode()), tableName, settings, configuration).format(format);
CompletableFuture<ClickHouseResponse> future = null;
try(ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(request.getConfig())) {
future = request.data(stream.getInputStream()).execute();
Expand Down Expand Up @@ -708,10 +713,8 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
}
ClientStatisticsHolder clientStats = new ClientStatisticsHolder();
clientStats.start(ClientMetrics.OP_DURATION);
ClickHouseClient client = createClient();
ClickHouseClient client = ClientV1AdaptorHelper.createClient(configuration);
ClickHouseRequest<?> request = client.read(getServerNode());


request.options(SettingsConverter.toRequestOptions(settings.getAllSettings()));
request.settings(SettingsConverter.toRequestSettings(settings.getAllSettings(), queryParams));
request.query(sqlQuery, settings.getQueryId());
Expand Down Expand Up @@ -762,10 +765,8 @@ public CompletableFuture<Records> queryRecords(String sqlQuery, QuerySettings se
settings.setFormat(ClickHouseFormat.RowBinaryWithNamesAndTypes);
ClientStatisticsHolder clientStats = new ClientStatisticsHolder();
clientStats.start("query");
ClickHouseClient client = createClient();
ClickHouseClient client = ClientV1AdaptorHelper.createClient(configuration);
ClickHouseRequest<?> request = client.read(getServerNode());


request.options(SettingsConverter.toRequestOptions(settings.getAllSettings()));
request.settings(SettingsConverter.toRequestSettings(settings.getAllSettings(), null));
request.query(sqlQuery, settings.getQueryId());
Expand Down Expand Up @@ -836,8 +837,8 @@ public TableSchema getTableSchema(String table) {
* @return {@code TableSchema} - Schema of the table
*/
public TableSchema getTableSchema(String table, String database) {
try (ClickHouseClient clientQuery = createClient()) {
ClickHouseRequest request = clientQuery.read(getServerNode());
try (ClickHouseClient clientQuery = ClientV1AdaptorHelper.createClient(configuration)) {
ClickHouseRequest<?> request = clientQuery.read(getServerNode());
// XML - because java has a built-in XML parser. Will consider CSV later.
request.query("DESCRIBE TABLE " + table + " FORMAT " + ClickHouseFormat.TSKV.name());
try {
Expand All @@ -848,31 +849,6 @@ public TableSchema getTableSchema(String table, String database) {
}
}

private ClickHouseClient createClient() {
ClickHouseConfig clientConfig = new ClickHouseConfig();
ClickHouseClientBuilder clientV1 = ClickHouseClient.builder()
.config(clientConfig)
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP));
return clientV1.build();
}

private ClickHouseRequest.Mutation createMutationRequest(ClickHouseRequest.Mutation request, String tableName, InsertSettings settings) {
if (settings == null) return request.table(tableName);

if (settings.getQueryId() != null) {//This has to be handled separately
request.table(tableName, settings.getQueryId());
} else {
request.table(tableName);
}

//For each setting, set the value in the request
for (Map.Entry<String, Object> entry : settings.getAllSettings().entrySet()) {
request.set(entry.getKey(), String.valueOf(entry.getValue()));
}

return request;
}

private String startOperation() {
String operationId = UUID.randomUUID().toString();
globalClientStats.put(operationId, new ClientStatisticsHolder());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.clickhouse.client.api.internal;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseClientBuilder;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseNodeSelector;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseProxyType;
import com.clickhouse.config.ClickHouseOption;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ClientV1AdaptorHelper {

private static void copyProxySettings(Map<ClickHouseOption, Serializable> target, Map<String, String> config) {
ClickHouseClientOption opt = ClickHouseClientOption.PROXY_HOST;
String value = config.get(opt.getKey());
if (value != null) {
target.put(opt, value);
}
opt = ClickHouseClientOption.PROXY_PORT;
value = config.get(opt.getKey());
if (value != null) {
target.put(opt, Integer.parseInt(value));
}
opt = ClickHouseClientOption.PROXY_TYPE;
value = config.get(opt.getKey());
if (value != null) {
target.put(opt, ClickHouseProxyType.valueOf(value));
}
}

public static ClickHouseClient createClient(Map<String, String> configuration) {
Map<ClickHouseOption, Serializable> config = new HashMap<>();
copyProxySettings(config, configuration);

ClickHouseConfig clientConfig = new ClickHouseConfig(config);

ClickHouseClientBuilder clientV1 = ClickHouseClient.builder()
.config(clientConfig)
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP));
return clientV1.build();
}

public static ClickHouseRequest.Mutation createMutationRequest(ClickHouseRequest.Mutation request,
String tableName,
InsertSettings settings,
Map<String, String> configuration) {
if (settings.getQueryId() != null) {//This has to be handled separately
request.table(tableName, settings.getQueryId());
} else {
request.table(tableName);
}

Map<String, Object> opSettings = settings == null ? Collections.emptyMap() : settings.getAllSettings();
//For each setting, set the value in the request
for (Map.Entry<String, Object> entry : opSettings.entrySet()) {
request.set(entry.getKey(), String.valueOf(entry.getValue()));
}

return request;
}
}

0 comments on commit 4fd0f5f

Please sign in to comment.