Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhance] Append the url's database and parameters to newly created connection #142 #143

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public ClickHouseCatalog(
checkArgument(!isNullOrWhitespaceOnly(username), "username cannot be null or empty");
checkArgument(!isNullOrWhitespaceOnly(password), "password cannot be null or empty");

this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.baseUrl = baseUrl;
this.username = username;
this.password = password;
this.ignorePrimaryKey =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -38,7 +36,7 @@
import java.util.Properties;

import static java.util.stream.Collectors.toList;
import static org.apache.flink.connector.clickhouse.util.ClickHouseJdbcUtil.getActualHttpPort;
import static org.apache.flink.connector.clickhouse.util.ClickHouseJdbcUtil.getClusterSpec;

/** ClickHouse connection provider. Use ClickHouseDriver to create a connection. */
public class ClickHouseConnectionProvider implements Serializable {
Expand All @@ -47,9 +45,6 @@ public class ClickHouseConnectionProvider implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(ClickHouseConnectionProvider.class);

private static final String QUERY_CLUSTER_INFO_SQL =
"SELECT shard_num, host_address, port FROM system.clusters WHERE cluster = ? ORDER BY shard_num, replica_num ASC";

private final ClickHouseConnectionOptions options;

private final Properties connectionProperties;
Expand All @@ -74,17 +69,19 @@ public boolean isConnectionValid() throws SQLException {

public synchronized ClickHouseConnection getOrCreateConnection() throws SQLException {
if (connection == null) {
connection = createConnection(options.getUrl(), options.getDatabaseName());
connection = createConnection(options.getUrl());
}
return connection;
}

public synchronized Map<Integer, ClickHouseConnection> createShardConnections(
ClusterSpec clusterSpec, String defaultDatabase) throws SQLException {
Map<Integer, ClickHouseConnection> connectionMap = new HashMap<>();
String urlSuffix = options.getUrlSuffix();
for (ShardSpec shardSpec : clusterSpec.getShards()) {
String shardUrl = shardSpec.getJdbcUrls() + urlSuffix;
ClickHouseConnection connection =
createAndStoreShardConnection(shardSpec.getJdbcUrls(), defaultDatabase);
createAndStoreShardConnection(shardUrl, defaultDatabase);
connectionMap.put(shardSpec.getNum(), connection);
}

Expand All @@ -97,35 +94,29 @@ public synchronized ClickHouseConnection createAndStoreShardConnection(
shardConnections = new ArrayList<>();
}

ClickHouseConnection connection = createConnection(url, database);
ClickHouseConnection connection = createConnection(url);
shardConnections.add(connection);
return connection;
}

public List<String> getShardUrls(String remoteCluster) throws SQLException {
Map<Long, List<String>> shardsMap = new HashMap<>();
Map<Integer, String> shardsMap = new HashMap<>();
ClickHouseConnection conn = getOrCreateConnection();
try (PreparedStatement stmt = conn.prepareStatement(QUERY_CLUSTER_INFO_SQL)) {
stmt.setString(1, remoteCluster);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
String host = rs.getString("host_address");
int port = getActualHttpPort(host, rs.getInt("port"));
List<String> shardUrls =
shardsMap.computeIfAbsent(
rs.getLong("shard_num"), k -> new ArrayList<>());
shardUrls.add(host + ":" + port);
}
}
ClusterSpec clusterSpec = getClusterSpec(conn, remoteCluster);
String urlSuffix = options.getUrlSuffix();
for (ShardSpec shardSpec : clusterSpec.getShards()) {
String shardUrl = shardSpec.getJdbcUrls() + urlSuffix;
shardsMap.put(shardSpec.getNum(), shardUrl);
}

return shardsMap.values().stream()
.map(urls -> "jdbc:ch://" + String.join(",", urls))
return shardsMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(Map.Entry::getValue)
.collect(toList());
}

private ClickHouseConnection createConnection(String url, String database) throws SQLException {
LOG.info("connecting to {}, database {}", url, database);
private ClickHouseConnection createConnection(String url) throws SQLException {
LOG.info("connecting to {}", url);
Properties configuration = new Properties();
configuration.putAll(connectionProperties);
if (options.getUsername().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@

package org.apache.flink.connector.clickhouse.internal.options;

import org.apache.flink.annotation.VisibleForTesting;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.EMPTY;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;

/** ClickHouse connection options. */
public class ClickHouseConnectionOptions implements Serializable {

private static final long serialVersionUID = 1L;

public static final Pattern URL_PATTERN =
Pattern.compile("[^/]+//[^/?]+(/(?<database>[^?]*))?(\\?(?<param>\\S+))?");

private final String url;

private final String username;
Expand All @@ -37,6 +47,12 @@ public class ClickHouseConnectionOptions implements Serializable {

private final String tableName;

// For testing.
@VisibleForTesting
public ClickHouseConnectionOptions(String url) {
this(url, null, null, null, null);
}

protected ClickHouseConnectionOptions(
String url,
@Nullable String username,
Expand All @@ -50,6 +66,23 @@ protected ClickHouseConnectionOptions(
this.tableName = tableName;
}

/**
* The format of the URL suffix is as follows: {@code
* [/<database>][?param1=value1&param2=value2]}.
*/
public String getUrlSuffix() {
Matcher matcher = URL_PATTERN.matcher(url);
if (!matcher.find()) {
return EMPTY;
}

String database = matcher.group("database");
String param = matcher.group("param");
database = isNullOrWhitespaceOnly(database) ? EMPTY : "/" + database;
param = isNullOrWhitespaceOnly(param) ? EMPTY : "?" + param;
return database + param;
}

public String getUrl() {
return this.url;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.connector.clickhouse;

import org.apache.flink.connector.clickhouse.internal.options.ClickHouseConnectionOptions;
import org.apache.flink.connector.clickhouse.internal.partitioner.ValuePartitioner;
import org.apache.flink.connector.clickhouse.internal.schema.ClusterSpec;
import org.apache.flink.connector.clickhouse.internal.schema.ShardSpec;
Expand Down Expand Up @@ -218,4 +219,23 @@ public void parseEngineFullTest() {
requireNonNull(parseShardingKey(matcher.group("shardingKey"))).explain());
}
}

@Test
public void parseJdbcUriTest() {
String[] urls = {
"jdbc:ch://localhost:8123",
"jdbc:ch://localhost:8123?",
"jdbc:ch://localhost:8123?ssl=true&sslmode=STRICT",
"jdbc:ch://localhost:8123/",
"jdbc:ch://localhost:8123/?ssl=true&sslmode=STRICT",
"jdbc:ch://localhost:8123/default?ssl=true&sslmode=STRICT",
"jdbc:ch://localhost:8123,127.0.0.1:8123/default?ssl=true&sslmode=STRICT"
};

for (String url : urls) {
String urlSuffix = new ClickHouseConnectionOptions(url).getUrlSuffix();
String urlPrefix = url.substring(0, url.lastIndexOf(urlSuffix));
assertEquals(url, urlPrefix + urlSuffix);
}
}
}
Loading