Skip to content

Commit

Permalink
[Feature] Upgrade Clickhouse JDBC Version to 0.6.0-patch3 #105
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 authored and itinycheng committed Jun 12, 2024
1 parent 24e024b commit f8c8d48
Show file tree
Hide file tree
Showing 22 changed files with 144 additions and 158 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");

## Roadmap

- [x] Implement the Flink SQL Sink function.
- [x] Support array and Map types.
- [x] Support ClickHouseCatalog.
- [x] Implement the Flink SQL Source function.
- [x] Implement the Flink SQL Lookup function.
The main branch is currently unstable and should not be used in production

- [ ] Flink Clickhouse Connector donated to Apache Flink #102 @czy006
- [ ] Perfect Junit Tests for Connector
2 changes: 1 addition & 1 deletion flink-connector-clickhouse-e2e-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse-parent</artifactId>
<version>1.17.1-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
</parent>

<groupId>org.apache.flink</groupId>
Expand Down
22 changes: 10 additions & 12 deletions flink-connector-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse-parent</artifactId>
<version>1.17.1-SNAPSHOT</version>
<version>1.0.0-SNAPSHOT</version>
</parent>

<artifactId>flink-connector-clickhouse</artifactId>
Expand Down Expand Up @@ -46,19 +46,9 @@
<!-- Clickhouse -->

<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse-jdbc.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand All @@ -71,6 +61,14 @@
<version>${commons-lang3.version}</version>
</dependency>

<!-- Http -->

<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
</dependency>

<!-- Tests -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.apache.flink.connector.clickhouse.ClickHouseDynamicTableFactory;
import org.apache.flink.connector.clickhouse.internal.schema.DistributedEngineFull;
import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;
import org.apache.flink.connector.clickhouse.util.DataTypeUtil;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
Expand Down Expand Up @@ -33,13 +32,13 @@
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.DataType;

import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDriver;
import com.clickhouse.jdbc.ClickHouseResultSetMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.response.ClickHouseColumnInfo;
import ru.yandex.clickhouse.response.ClickHouseResultSetMetaData;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -129,14 +128,11 @@ public void open() throws CatalogException {
try {
Properties configuration = new Properties();
configuration.putAll(properties);
configuration.setProperty(ClickHouseQueryParam.USER.getKey(), username);
configuration.setProperty(ClickHouseQueryParam.PASSWORD.getKey(), password);
String jdbcUrl = ClickHouseUtil.getJdbcUrl(baseUrl, getDefaultDatabase());
BalancedClickhouseDataSource dataSource =
new BalancedClickhouseDataSource(jdbcUrl, configuration);
dataSource.actualize();
connection = dataSource.getConnection();
LOG.info("Created catalog {}, established connection to {}", getName(), jdbcUrl);
configuration.setProperty(ClickHouseDefaults.USER.getKey(), username);
configuration.setProperty(ClickHouseDefaults.PASSWORD.getKey(), password);
ClickHouseDriver driver = new ClickHouseDriver();
connection = driver.connect(baseUrl, configuration);
LOG.info("Created catalog {}, established connection to {}", getName(), baseUrl);
} catch (Exception e) {
throw new CatalogException(String.format("Opening catalog %s failed.", getName()), e);
}
Expand Down Expand Up @@ -307,8 +303,7 @@ private synchronized TableSchema createTableSchema(String databaseName, String t
List<String> primaryKeys = getPrimaryKeys(databaseName, tableName);
TableSchema.Builder builder = TableSchema.builder();
for (int idx = 1; idx <= metaData.getColumnCount(); idx++) {
ClickHouseColumnInfo columnInfo =
(ClickHouseColumnInfo) getColMethod.invoke(metaData, idx);
ClickHouseColumn columnInfo = (ClickHouseColumn) getColMethod.invoke(metaData, idx);
String columnName = columnInfo.getColumnName();
DataType columnType = DataTypeUtil.toFlinkType(columnInfo);
if (primaryKeys.contains(columnName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.clickhouse.jdbc.ClickHouseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;

import java.io.Flushable;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;

import com.clickhouse.jdbc.ClickHouseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;

import java.io.IOException;
import java.sql.PreparedStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import com.clickhouse.jdbc.ClickHousePreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHousePreparedStatement;

import java.sql.Connection;
import java.sql.ResultSet;
Expand Down Expand Up @@ -146,7 +146,7 @@ private void establishConnectionAndStatement() throws SQLException {
Connection dbConn = connectionProvider.getOrCreateConnection();
statement =
new ClickHouseStatementWrapper(
(ClickHousePreparedStatement) dbConn.prepareStatement(query));
dbConn, (ClickHousePreparedStatement) dbConn.prepareStatement(query));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;

import com.clickhouse.jdbc.ClickHouseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;

import java.io.IOException;
import java.sql.PreparedStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;

import ru.yandex.clickhouse.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseConnection;

import javax.annotation.Nonnull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseConnectionOptions;
import org.apache.flink.connector.clickhouse.internal.schema.ClusterSpec;
import org.apache.flink.connector.clickhouse.internal.schema.ShardSpec;
import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;

import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.settings.ClickHouseProperties;

import java.io.Serializable;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -110,17 +109,13 @@ public List<String> getShardUrls(String remoteCluster) throws SQLException {

private ClickHouseConnection createConnection(String url, String database) throws SQLException {
LOG.info("connecting to {}, database {}", url, database);

String jdbcUrl = ClickHouseUtil.getJdbcUrl(url, database);
ClickHouseProperties properties = new ClickHouseProperties(connectionProperties);
properties.setUser(options.getUsername().orElse(null));
properties.setPassword(options.getPassword().orElse(null));
BalancedClickhouseDataSource dataSource =
new BalancedClickhouseDataSource(jdbcUrl, properties);
if (dataSource.getAllClickhouseUrls().size() > 1) {
dataSource.actualize();
}
return dataSource.getConnection();
Properties configuration = new Properties();
configuration.setProperty(
ClickHouseDefaults.USER.getKey(), options.getUsername().orElse(null));
configuration.setProperty(
ClickHouseDefaults.PASSWORD.getKey(), options.getPassword().orElse(null));
ClickHouseDriver driver = new ClickHouseDriver();
return driver.connect(url, configuration);
}

public void closeConnections() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.apache.flink.connector.clickhouse.internal.connection;

import ru.yandex.clickhouse.ClickHousePreparedStatement;
import com.clickhouse.jdbc.ClickHousePreparedStatement;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -11,9 +12,12 @@
/** Wrapper class for ClickHousePreparedStatement. */
public class ClickHouseStatementWrapper {
public final ClickHousePreparedStatement statement;
public final Connection connection;

public ClickHouseStatementWrapper(ClickHousePreparedStatement statement) {
public ClickHouseStatementWrapper(
Connection connection, ClickHousePreparedStatement statement) {
this.statement = statement;
this.connection = connection;
}

public void addBatch() throws SQLException {
Expand Down Expand Up @@ -77,7 +81,8 @@ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
}

public void setArray(int parameterIndex, Object[] array) throws SQLException {
statement.setArray(parameterIndex, array);
java.sql.Array sqlArray = connection.createArrayOf("VARCHAR", array);
statement.setArray(parameterIndex, sqlArray);
}

public void setObject(int parameterIndex, Object x) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;

import ru.yandex.clickhouse.ClickHousePreparedStatement;
import ru.yandex.clickhouse.response.ClickHouseResultSet;
import com.clickhouse.jdbc.ClickHousePreparedStatement;
import com.clickhouse.jdbc.ClickHouseResultSet;

import java.io.Serializable;
import java.math.BigDecimal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions;
import org.apache.flink.table.data.RowData;

import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHousePreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHousePreparedStatement;

import java.sql.SQLException;

Expand Down Expand Up @@ -43,6 +43,7 @@ public ClickHouseBatchExecutor(
public void prepareStatement(ClickHouseConnection connection) throws SQLException {
statement =
new ClickHouseStatementWrapper(
connection,
(ClickHousePreparedStatement) connection.prepareStatement(insertSql));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import com.clickhouse.jdbc.ClickHouseConnection;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;

import java.io.Serializable;
import java.sql.SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions;
import org.apache.flink.table.data.RowData;

import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHousePreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHousePreparedStatement;

import java.sql.SQLException;
import java.util.Arrays;
Expand Down Expand Up @@ -90,12 +90,15 @@ public ClickHouseUpsertExecutor(
public void prepareStatement(ClickHouseConnection connection) throws SQLException {
this.insertStatement =
new ClickHouseStatementWrapper(
connection,
(ClickHousePreparedStatement) connection.prepareStatement(this.insertSql));
this.updateStatement =
new ClickHouseStatementWrapper(
connection,
(ClickHousePreparedStatement) connection.prepareStatement(this.updateSql));
this.deleteStatement =
new ClickHouseStatementWrapper(
connection,
(ClickHousePreparedStatement) connection.prepareStatement(this.deleteSql));
}

Expand Down
Loading

0 comments on commit f8c8d48

Please sign in to comment.