Skip to content

Commit

Permalink
Fix some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
itinycheng committed Jun 12, 2024
1 parent f8c8d48 commit 6a2c76b
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDriver;
import com.clickhouse.jdbc.ClickHouseResultSetMetaData;
import com.clickhouse.jdbc.ClickHouseStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -291,13 +292,15 @@ private synchronized TableSchema createTableSchema(String databaseName, String t
// types? 3. All queried data will be obtained before PreparedStatement is closed, so we
// must add `limit 0` statement to avoid data transmission to the client, look at
// `ChunkedInputStream.close()` for more info.
try (PreparedStatement stmt =
connection.prepareStatement(
String.format(
"SELECT * from `%s`.`%s` limit 0", databaseName, tableName))) {
try (ClickHouseStatement stmt = connection.createStatement();
ResultSet rs =
stmt.executeQuery(
String.format(
"SELECT * from `%s`.`%s` limit 0",
databaseName, tableName))) {
ClickHouseResultSetMetaData metaData =
stmt.getMetaData().unwrap(ClickHouseResultSetMetaData.class);
Method getColMethod = metaData.getClass().getDeclaredMethod("getCol", int.class);
rs.getMetaData().unwrap(ClickHouseResultSetMetaData.class);
Method getColMethod = metaData.getClass().getDeclaredMethod("getColumn", int.class);
getColMethod.setAccessible(true);

List<String> primaryKeys = getPrimaryKeys(databaseName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void establishConnectionAndStatement() throws SQLException {
Connection dbConn = connectionProvider.getOrCreateConnection();
statement =
new ClickHouseStatementWrapper(
dbConn, (ClickHousePreparedStatement) dbConn.prepareStatement(query));
(ClickHousePreparedStatement) dbConn.prepareStatement(query));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public static String getSelectWhereStatement(
return selectStatement + (conditionFields.length > 0 ? " WHERE " + whereClause : "");
}

public static String getInsertIntoStatement(String tableName, String[] fieldNames) {
public static String getInsertIntoStatement(
String tableName, String databaseName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(ClickHouseUtil::quoteIdentifier)
Expand All @@ -50,7 +51,7 @@ public static String getInsertIntoStatement(String tableName, String[] fieldName
return String.join(
EMPTY,
"INSERT INTO ",
quoteIdentifier(tableName),
fromTableClause(tableName, databaseName),
"(",
columns,
") VALUES (",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,14 @@ public List<String> getShardUrls(String remoteCluster) throws SQLException {
}

return shardsMap.values().stream()
.map(urls -> "clickhouse://" + String.join(",", urls))
.map(urls -> "jdbc:ch://" + String.join(",", urls))
.collect(toList());
}

private ClickHouseConnection createConnection(String url, String database) throws SQLException {
LOG.info("connecting to {}, database {}", url, database);
Properties configuration = new Properties();
configuration.putAll(connectionProperties);
configuration.setProperty(
ClickHouseDefaults.USER.getKey(), options.getUsername().orElse(null));
configuration.setProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.clickhouse.jdbc.ClickHousePreparedStatement;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -12,12 +11,9 @@
/** Wrapper class for ClickHousePreparedStatement. */
public class ClickHouseStatementWrapper {
public final ClickHousePreparedStatement statement;
public final Connection connection;

public ClickHouseStatementWrapper(
Connection connection, ClickHousePreparedStatement statement) {
public ClickHouseStatementWrapper(ClickHousePreparedStatement statement) {
this.statement = statement;
this.connection = connection;
}

public void addBatch() throws SQLException {
Expand Down Expand Up @@ -81,8 +77,7 @@ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
}

public void setArray(int parameterIndex, Object[] array) throws SQLException {
java.sql.Array sqlArray = connection.createArrayOf("VARCHAR", array);
statement.setArray(parameterIndex, sqlArray);
statement.setArray(parameterIndex, new ObjectArray(array));
}

public void setObject(int parameterIndex, Object x) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.apache.flink.connector.clickhouse.internal.connection;

import org.apache.flink.util.Preconditions;

import java.sql.Array;
import java.sql.ResultSet;
import java.util.Map;

/** Wrap object array. */
public class ObjectArray implements Array {

private Object[] array;

public ObjectArray(Object[] array) {
this.array = Preconditions.checkNotNull(array);
}

@Override
public String getBaseTypeName() {
throw new UnsupportedOperationException();
}

@Override
public int getBaseType() {
throw new UnsupportedOperationException();
}

@Override
public Object getArray() {
return array;
}

@Override
public Object getArray(Map<String, Class<?>> map) {
throw new UnsupportedOperationException();
}

@Override
public Object getArray(long index, int count) {
throw new UnsupportedOperationException();
}

@Override
public Object getArray(long index, int count, Map<String, Class<?>> map) {
throw new UnsupportedOperationException();
}

@Override
public ResultSet getResultSet() {
throw new UnsupportedOperationException();
}

@Override
public ResultSet getResultSet(Map<String, Class<?>> map) {
throw new UnsupportedOperationException();
}

@Override
public ResultSet getResultSet(long index, int count) {
throw new UnsupportedOperationException();
}

@Override
public ResultSet getResultSet(long index, int count, Map<String, Class<?>> map) {
throw new UnsupportedOperationException();
}

@Override
public void free() {
this.array = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone;
import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toEpochDayOneTimestamp;

/** convert between internal and external data types. */
Expand Down Expand Up @@ -132,9 +134,10 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept
return (int) (((Time) value).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TimestampData.fromTimestamp((Timestamp) value);
return TimestampData.fromLocalDateTime((LocalDateTime) value);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return TimestampData.fromInstant(((Timestamp) value).toInstant());
return TimestampData.fromInstant(
((LocalDateTime) value).atZone(getFlinkTimeZone().toZoneId()).toInstant());
case CHAR:
case VARCHAR:
return StringData.fromString((String) value);
Expand All @@ -144,7 +147,8 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept
.findFirst()
.orElseThrow(
() -> new RuntimeException("Unknown array element type"));
Object externalArray = ((Array) value).getArray();
Object externalArray =
value.getClass().isArray() ? value : ((Array) value).getArray();
int externalArrayLength = java.lang.reflect.Array.getLength(externalArray);
Object[] internalArray = new Object[externalArrayLength];
for (int i = 0; i < externalArrayLength; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.UUID;

import static org.apache.flink.connector.clickhouse.internal.converter.ClickHouseConverterUtils.BOOL_TRUE;
import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone;
import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toEpochDayOneTimestamp;

/** Row converter,convert flink type to/from ClickHouse type. */
Expand Down Expand Up @@ -113,9 +115,13 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val -> TimestampData.fromTimestamp((Timestamp) val);
return val -> TimestampData.fromLocalDateTime((LocalDateTime) val);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val -> TimestampData.fromInstant(((Timestamp) val).toInstant());
return val ->
TimestampData.fromInstant(
((LocalDateTime) val)
.atZone(getFlinkTimeZone().toZoneId())
.toInstant());
case CHAR:
case VARCHAR:
return val ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public ClickHouseBatchExecutor(
public void prepareStatement(ClickHouseConnection connection) throws SQLException {
statement =
new ClickHouseStatementWrapper(
connection,
(ClickHousePreparedStatement) connection.prepareStatement(insertSql));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,19 @@ static ClickHouseExecutor createClickHouseExecutor(
fieldTypes,
options);
} else {
return createBatchExecutor(tableName, fieldNames, fieldTypes, options);
return createBatchExecutor(tableName, databaseName, fieldNames, fieldTypes, options);
}
}

static ClickHouseBatchExecutor createBatchExecutor(
String tableName,
String databaseName,
String[] fieldNames,
LogicalType[] fieldTypes,
ClickHouseDmlOptions options) {
String insertSql = ClickHouseStatementFactory.getInsertIntoStatement(tableName, fieldNames);
String insertSql =
ClickHouseStatementFactory.getInsertIntoStatement(
tableName, databaseName, fieldNames);
ClickHouseRowConverter converter = new ClickHouseRowConverter(RowType.of(fieldTypes));
return new ClickHouseBatchExecutor(insertSql, converter, options);
}
Expand All @@ -110,7 +113,9 @@ static ClickHouseUpsertExecutor createUpsertExecutor(
String[] partitionFields,
LogicalType[] fieldTypes,
ClickHouseDmlOptions options) {
String insertSql = ClickHouseStatementFactory.getInsertIntoStatement(tableName, fieldNames);
String insertSql =
ClickHouseStatementFactory.getInsertIntoStatement(
tableName, databaseName, fieldNames);
String updateSql =
ClickHouseStatementFactory.getUpdateStatement(
tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,12 @@ public ClickHouseUpsertExecutor(
public void prepareStatement(ClickHouseConnection connection) throws SQLException {
this.insertStatement =
new ClickHouseStatementWrapper(
connection,
(ClickHousePreparedStatement) connection.prepareStatement(this.insertSql));
this.updateStatement =
new ClickHouseStatementWrapper(
connection,
(ClickHousePreparedStatement) connection.prepareStatement(this.updateSql));
this.deleteStatement =
new ClickHouseStatementWrapper(
connection,
(ClickHousePreparedStatement) connection.prepareStatement(this.deleteSql));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public ShardSpec(
public String getJdbcUrls() {
return replicas.stream()
.map(replicaSpec -> replicaSpec.getHost() + ":" + replicaSpec.getPort())
.collect(joining(",", "clickhouse://", ""));
.collect(joining(",", "jdbc:ch://", ""));
}

public void initShardRangeBound(List<Long> weights) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -70,7 +71,7 @@ private static Expression parseFunctionExpr(String shardingExpr) {
String subExprLiteral =
shardingExpr.substring(bracketStartIndex + 1, shardingExpr.lastIndexOf(")"));

if (subExprLiteral.trim().length() == 0) {
if (subExprLiteral.trim().isEmpty()) {
return FunctionExpr.of(functionName, emptyList());
}

Expand All @@ -86,4 +87,9 @@ private static Expression parseFunctionExpr(String shardingExpr) {
Expression expression = parseFunctionExpr(subExprLiteral);
return FunctionExpr.of(functionName, singletonList(expression));
}

/** TODO The timezone configured via `table.local-time-zone` should be used. */
public static TimeZone getFlinkTimeZone() {
return TimeZone.getDefault();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) {
case Array:
String arrayBaseType =
getInternalClickHouseType(clickHouseColumnInfo.getOriginalTypeName());
String arrayColumnName = clickHouseColumnInfo.getColumnName() + ".array_base";
String arrayBaseName = clickHouseColumnInfo.getColumnName() + ".array_base";
ClickHouseColumn clickHouseColumn =
ClickHouseColumn.of(arrayColumnName, arrayBaseType);
ClickHouseColumn.of(arrayBaseName, arrayBaseType);
return DataTypes.ARRAY(toFlinkType(clickHouseColumn));
case Map:
return DataTypes.MAP(
Expand All @@ -106,8 +106,7 @@ private static String getInternalClickHouseType(String clickHouseTypeLiteral) {
return matcher.group("type");
} else {
throw new CatalogException(
java.lang.String.format(
"No content found in the bucket of '%s'", clickHouseTypeLiteral));
String.format("No content found in the bucket of '%s'", clickHouseTypeLiteral));
}
}
}
Loading

0 comments on commit 6a2c76b

Please sign in to comment.