diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java index 2ef0a12..7de60da 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java @@ -132,7 +132,7 @@ public ClickHouseCatalog( checkArgument(!isNullOrWhitespaceOnly(username), "username cannot be null or empty"); checkArgument(!isNullOrWhitespaceOnly(password), "password cannot be null or empty"); - this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; + this.baseUrl = baseUrl; this.username = username; this.password = password; this.ignorePrimaryKey = diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java index bb5c8d5..7405bb5 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java @@ -28,8 +28,6 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -38,7 +36,7 @@ import java.util.Properties; import static java.util.stream.Collectors.toList; -import static org.apache.flink.connector.clickhouse.util.ClickHouseJdbcUtil.getActualHttpPort; +import static org.apache.flink.connector.clickhouse.util.ClickHouseJdbcUtil.getClusterSpec; /** ClickHouse connection provider. Use ClickHouseDriver to create a connection. */ public class ClickHouseConnectionProvider implements Serializable { @@ -47,9 +45,6 @@ public class ClickHouseConnectionProvider implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseConnectionProvider.class); - private static final String QUERY_CLUSTER_INFO_SQL = - "SELECT shard_num, host_address, port FROM system.clusters WHERE cluster = ? ORDER BY shard_num, replica_num ASC"; - private final ClickHouseConnectionOptions options; private final Properties connectionProperties; @@ -74,7 +69,7 @@ public boolean isConnectionValid() throws SQLException { public synchronized ClickHouseConnection getOrCreateConnection() throws SQLException { if (connection == null) { - connection = createConnection(options.getUrl(), options.getDatabaseName()); + connection = createConnection(options.getUrl()); } return connection; } @@ -82,9 +77,11 @@ public synchronized ClickHouseConnection getOrCreateConnection() throws SQLExcep public synchronized Map createShardConnections( ClusterSpec clusterSpec, String defaultDatabase) throws SQLException { Map connectionMap = new HashMap<>(); + String urlSuffix = options.getUrlSuffix(); for (ShardSpec shardSpec : clusterSpec.getShards()) { + String shardUrl = shardSpec.getJdbcUrls() + urlSuffix; ClickHouseConnection connection = - createAndStoreShardConnection(shardSpec.getJdbcUrls(), defaultDatabase); + createAndStoreShardConnection(shardUrl, defaultDatabase); connectionMap.put(shardSpec.getNum(), connection); } @@ -97,35 +94,29 @@ public synchronized ClickHouseConnection createAndStoreShardConnection( shardConnections = new ArrayList<>(); } - ClickHouseConnection connection = createConnection(url, database); + ClickHouseConnection connection = createConnection(url); shardConnections.add(connection); return connection; } public List getShardUrls(String remoteCluster) throws SQLException { - Map> shardsMap = new HashMap<>(); + Map shardsMap = new HashMap<>(); ClickHouseConnection conn = getOrCreateConnection(); - try (PreparedStatement stmt = conn.prepareStatement(QUERY_CLUSTER_INFO_SQL)) { - stmt.setString(1, remoteCluster); - try (ResultSet rs = stmt.executeQuery()) { - while (rs.next()) { - String host = rs.getString("host_address"); - int port = getActualHttpPort(host, rs.getInt("port")); - List shardUrls = - shardsMap.computeIfAbsent( - rs.getLong("shard_num"), k -> new ArrayList<>()); - shardUrls.add(host + ":" + port); - } - } + ClusterSpec clusterSpec = getClusterSpec(conn, remoteCluster); + String urlSuffix = options.getUrlSuffix(); + for (ShardSpec shardSpec : clusterSpec.getShards()) { + String shardUrl = shardSpec.getJdbcUrls() + urlSuffix; + shardsMap.put(shardSpec.getNum(), shardUrl); } - return shardsMap.values().stream() - .map(urls -> "jdbc:ch://" + String.join(",", urls)) + return shardsMap.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(Map.Entry::getValue) .collect(toList()); } - private ClickHouseConnection createConnection(String url, String database) throws SQLException { - LOG.info("connecting to {}, database {}", url, database); + private ClickHouseConnection createConnection(String url) throws SQLException { + LOG.info("connecting to {}", url); Properties configuration = new Properties(); configuration.putAll(connectionProperties); if (options.getUsername().isPresent()) { diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/options/ClickHouseConnectionOptions.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/options/ClickHouseConnectionOptions.java index 8053c69..427e053 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/options/ClickHouseConnectionOptions.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/options/ClickHouseConnectionOptions.java @@ -17,16 +17,26 @@ package org.apache.flink.connector.clickhouse.internal.options; +import org.apache.flink.annotation.VisibleForTesting; + import javax.annotation.Nullable; import java.io.Serializable; import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.EMPTY; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** ClickHouse connection options. */ public class ClickHouseConnectionOptions implements Serializable { private static final long serialVersionUID = 1L; + public static final Pattern URL_PATTERN = + Pattern.compile("[^/]+//[^/?]+(/(?[^?]*))?(\\?(?\\S+))?"); + private final String url; private final String username; @@ -37,6 +47,12 @@ public class ClickHouseConnectionOptions implements Serializable { private final String tableName; + // For testing. + @VisibleForTesting + public ClickHouseConnectionOptions(String url) { + this(url, null, null, null, null); + } + protected ClickHouseConnectionOptions( String url, @Nullable String username, @@ -50,6 +66,23 @@ protected ClickHouseConnectionOptions( this.tableName = tableName; } + /** + * The format of the URL suffix is as follows: {@code + * [/][?param1=value1¶m2=value2]}. + */ + public String getUrlSuffix() { + Matcher matcher = URL_PATTERN.matcher(url); + if (!matcher.find()) { + return EMPTY; + } + + String database = matcher.group("database"); + String param = matcher.group("param"); + database = isNullOrWhitespaceOnly(database) ? EMPTY : "/" + database; + param = isNullOrWhitespaceOnly(param) ? EMPTY : "?" + param; + return database + param; + } + public String getUrl() { return this.url; } diff --git a/flink-connector-clickhouse/src/test/java/org/apache/flink/connector/clickhouse/AppTest.java b/flink-connector-clickhouse/src/test/java/org/apache/flink/connector/clickhouse/AppTest.java index edf657c..0f2092b 100644 --- a/flink-connector-clickhouse/src/test/java/org/apache/flink/connector/clickhouse/AppTest.java +++ b/flink-connector-clickhouse/src/test/java/org/apache/flink/connector/clickhouse/AppTest.java @@ -17,6 +17,7 @@ package org.apache.flink.connector.clickhouse; +import org.apache.flink.connector.clickhouse.internal.options.ClickHouseConnectionOptions; import org.apache.flink.connector.clickhouse.internal.partitioner.ValuePartitioner; import org.apache.flink.connector.clickhouse.internal.schema.ClusterSpec; import org.apache.flink.connector.clickhouse.internal.schema.ShardSpec; @@ -218,4 +219,23 @@ public void parseEngineFullTest() { requireNonNull(parseShardingKey(matcher.group("shardingKey"))).explain()); } } + + @Test + public void parseJdbcUriTest() { + String[] urls = { + "jdbc:ch://localhost:8123", + "jdbc:ch://localhost:8123?", + "jdbc:ch://localhost:8123?ssl=true&sslmode=STRICT", + "jdbc:ch://localhost:8123/", + "jdbc:ch://localhost:8123/?ssl=true&sslmode=STRICT", + "jdbc:ch://localhost:8123/default?ssl=true&sslmode=STRICT", + "jdbc:ch://localhost:8123,127.0.0.1:8123/default?ssl=true&sslmode=STRICT" + }; + + for (String url : urls) { + String urlSuffix = new ClickHouseConnectionOptions(url).getUrlSuffix(); + String urlPrefix = url.substring(0, url.lastIndexOf(urlSuffix)); + assertEquals(url, urlPrefix + urlSuffix); + } + } }