From 6a2c76b84ebc386b7758d9c14ef6da7dc9d7496c Mon Sep 17 00:00:00 2001 From: "chenglong.wang" Date: Wed, 5 Jun 2024 17:52:27 +0800 Subject: [PATCH] Fix some bugs --- .../clickhouse/catalog/ClickHouseCatalog.java | 15 ++-- .../ClickHouseRowDataLookupFunction.java | 2 +- .../internal/ClickHouseStatementFactory.java | 5 +- .../ClickHouseConnectionProvider.java | 3 +- .../ClickHouseStatementWrapper.java | 9 +-- .../internal/connection/ObjectArray.java | 72 +++++++++++++++++++ .../converter/ClickHouseConverterUtils.java | 10 ++- .../converter/ClickHouseRowConverter.java | 10 ++- .../executor/ClickHouseBatchExecutor.java | 1 - .../internal/executor/ClickHouseExecutor.java | 11 ++- .../executor/ClickHouseUpsertExecutor.java | 3 - .../clickhouse/internal/schema/ShardSpec.java | 2 +- .../clickhouse/util/ClickHouseUtil.java | 8 ++- .../clickhouse/util/DataTypeUtil.java | 7 +- .../clickhouse/util/FilterPushDownHelper.java | 23 +++--- .../flink/connector/clickhouse/AppTest.java | 17 ++--- 16 files changed, 136 insertions(+), 62 deletions(-) create mode 100644 flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ObjectArray.java diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java index e00f538..82e21e9 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/catalog/ClickHouseCatalog.java @@ -37,6 +37,7 @@ import com.clickhouse.jdbc.ClickHouseConnection; import com.clickhouse.jdbc.ClickHouseDriver; import com.clickhouse.jdbc.ClickHouseResultSetMetaData; +import com.clickhouse.jdbc.ClickHouseStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -291,13 +292,15 @@ private synchronized TableSchema createTableSchema(String databaseName, String t // types? 3. All queried data will be obtained before PreparedStatement is closed, so we // must add `limit 0` statement to avoid data transmission to the client, look at // `ChunkedInputStream.close()` for more info. - try (PreparedStatement stmt = - connection.prepareStatement( - String.format( - "SELECT * from `%s`.`%s` limit 0", databaseName, tableName))) { + try (ClickHouseStatement stmt = connection.createStatement(); + ResultSet rs = + stmt.executeQuery( + String.format( + "SELECT * from `%s`.`%s` limit 0", + databaseName, tableName))) { ClickHouseResultSetMetaData metaData = - stmt.getMetaData().unwrap(ClickHouseResultSetMetaData.class); - Method getColMethod = metaData.getClass().getDeclaredMethod("getCol", int.class); + rs.getMetaData().unwrap(ClickHouseResultSetMetaData.class); + Method getColMethod = metaData.getClass().getDeclaredMethod("getColumn", int.class); getColMethod.setAccessible(true); List primaryKeys = getPrimaryKeys(databaseName, tableName); diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java index ae9d8d1..f69040e 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java @@ -146,7 +146,7 @@ private void establishConnectionAndStatement() throws SQLException { Connection dbConn = connectionProvider.getOrCreateConnection(); statement = new ClickHouseStatementWrapper( - dbConn, (ClickHousePreparedStatement) dbConn.prepareStatement(query)); + (ClickHousePreparedStatement) dbConn.prepareStatement(query)); } @Override diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java index e23a9b2..336c116 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java @@ -41,7 +41,8 @@ public static String getSelectWhereStatement( return selectStatement + (conditionFields.length > 0 ? " WHERE " + whereClause : ""); } - public static String getInsertIntoStatement(String tableName, String[] fieldNames) { + public static String getInsertIntoStatement( + String tableName, String databaseName, String[] fieldNames) { String columns = Arrays.stream(fieldNames) .map(ClickHouseUtil::quoteIdentifier) @@ -50,7 +51,7 @@ public static String getInsertIntoStatement(String tableName, String[] fieldName return String.join( EMPTY, "INSERT INTO ", - quoteIdentifier(tableName), + fromTableClause(tableName, databaseName), "(", columns, ") VALUES (", diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java index 103084e..93aa116 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java @@ -103,13 +103,14 @@ public List getShardUrls(String remoteCluster) throws SQLException { } return shardsMap.values().stream() - .map(urls -> "clickhouse://" + String.join(",", urls)) + .map(urls -> "jdbc:ch://" + String.join(",", urls)) .collect(toList()); } private ClickHouseConnection createConnection(String url, String database) throws SQLException { LOG.info("connecting to {}, database {}", url, database); Properties configuration = new Properties(); + configuration.putAll(connectionProperties); configuration.setProperty( ClickHouseDefaults.USER.getKey(), options.getUsername().orElse(null)); configuration.setProperty( diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseStatementWrapper.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseStatementWrapper.java index 3aabba2..5dc1e57 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseStatementWrapper.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseStatementWrapper.java @@ -3,7 +3,6 @@ import com.clickhouse.jdbc.ClickHousePreparedStatement; import java.math.BigDecimal; -import java.sql.Connection; import java.sql.Date; import java.sql.ResultSet; import java.sql.SQLException; @@ -12,12 +11,9 @@ /** Wrapper class for ClickHousePreparedStatement. */ public class ClickHouseStatementWrapper { public final ClickHousePreparedStatement statement; - public final Connection connection; - public ClickHouseStatementWrapper( - Connection connection, ClickHousePreparedStatement statement) { + public ClickHouseStatementWrapper(ClickHousePreparedStatement statement) { this.statement = statement; - this.connection = connection; } public void addBatch() throws SQLException { @@ -81,8 +77,7 @@ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { } public void setArray(int parameterIndex, Object[] array) throws SQLException { - java.sql.Array sqlArray = connection.createArrayOf("VARCHAR", array); - statement.setArray(parameterIndex, sqlArray); + statement.setArray(parameterIndex, new ObjectArray(array)); } public void setObject(int parameterIndex, Object x) throws SQLException { diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ObjectArray.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ObjectArray.java new file mode 100644 index 0000000..ad92663 --- /dev/null +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ObjectArray.java @@ -0,0 +1,72 @@ +package org.apache.flink.connector.clickhouse.internal.connection; + +import org.apache.flink.util.Preconditions; + +import java.sql.Array; +import java.sql.ResultSet; +import java.util.Map; + +/** Wrap object array. */ +public class ObjectArray implements Array { + + private Object[] array; + + public ObjectArray(Object[] array) { + this.array = Preconditions.checkNotNull(array); + } + + @Override + public String getBaseTypeName() { + throw new UnsupportedOperationException(); + } + + @Override + public int getBaseType() { + throw new UnsupportedOperationException(); + } + + @Override + public Object getArray() { + return array; + } + + @Override + public Object getArray(Map> map) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getArray(long index, int count) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getArray(long index, int count, Map> map) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet getResultSet() { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet getResultSet(Map> map) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet getResultSet(long index, int count) { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSet getResultSet(long index, int count, Map> map) { + throw new UnsupportedOperationException(); + } + + @Override + public void free() { + this.array = null; + } +} diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java index 5b7638f..bfbfd12 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java @@ -20,10 +20,12 @@ import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone; import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toEpochDayOneTimestamp; /** convert between internal and external data types. */ @@ -132,9 +134,10 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept return (int) (((Time) value).toLocalTime().toNanoOfDay() / 1_000_000L); case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - return TimestampData.fromTimestamp((Timestamp) value); + return TimestampData.fromLocalDateTime((LocalDateTime) value); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return TimestampData.fromInstant(((Timestamp) value).toInstant()); + return TimestampData.fromInstant( + ((LocalDateTime) value).atZone(getFlinkTimeZone().toZoneId()).toInstant()); case CHAR: case VARCHAR: return StringData.fromString((String) value); @@ -144,7 +147,8 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept .findFirst() .orElseThrow( () -> new RuntimeException("Unknown array element type")); - Object externalArray = ((Array) value).getArray(); + Object externalArray = + value.getClass().isArray() ? value : ((Array) value).getArray(); int externalArrayLength = java.lang.reflect.Array.getLength(externalArray); Object[] internalArray = new Object[externalArrayLength]; for (int i = 0; i < externalArrayLength; i++) { diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java index 1ce183c..a5787cf 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java @@ -26,10 +26,12 @@ import java.sql.Time; import java.sql.Timestamp; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.UUID; import static org.apache.flink.connector.clickhouse.internal.converter.ClickHouseConverterUtils.BOOL_TRUE; +import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone; import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toEpochDayOneTimestamp; /** Row converter,convert flink type to/from ClickHouse type. */ @@ -113,9 +115,13 @@ private DeserializationConverter createToInternalConverter(LogicalType type) { return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L); case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - return val -> TimestampData.fromTimestamp((Timestamp) val); + return val -> TimestampData.fromLocalDateTime((LocalDateTime) val); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return val -> TimestampData.fromInstant(((Timestamp) val).toInstant()); + return val -> + TimestampData.fromInstant( + ((LocalDateTime) val) + .atZone(getFlinkTimeZone().toZoneId()) + .toInstant()); case CHAR: case VARCHAR: return val -> diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java index 2b8aed8..a7c5345 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java @@ -43,7 +43,6 @@ public ClickHouseBatchExecutor( public void prepareStatement(ClickHouseConnection connection) throws SQLException { statement = new ClickHouseStatementWrapper( - connection, (ClickHousePreparedStatement) connection.prepareStatement(insertSql)); } diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java index 7e0a5ca..7eee08d 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java @@ -87,16 +87,19 @@ static ClickHouseExecutor createClickHouseExecutor( fieldTypes, options); } else { - return createBatchExecutor(tableName, fieldNames, fieldTypes, options); + return createBatchExecutor(tableName, databaseName, fieldNames, fieldTypes, options); } } static ClickHouseBatchExecutor createBatchExecutor( String tableName, + String databaseName, String[] fieldNames, LogicalType[] fieldTypes, ClickHouseDmlOptions options) { - String insertSql = ClickHouseStatementFactory.getInsertIntoStatement(tableName, fieldNames); + String insertSql = + ClickHouseStatementFactory.getInsertIntoStatement( + tableName, databaseName, fieldNames); ClickHouseRowConverter converter = new ClickHouseRowConverter(RowType.of(fieldTypes)); return new ClickHouseBatchExecutor(insertSql, converter, options); } @@ -110,7 +113,9 @@ static ClickHouseUpsertExecutor createUpsertExecutor( String[] partitionFields, LogicalType[] fieldTypes, ClickHouseDmlOptions options) { - String insertSql = ClickHouseStatementFactory.getInsertIntoStatement(tableName, fieldNames); + String insertSql = + ClickHouseStatementFactory.getInsertIntoStatement( + tableName, databaseName, fieldNames); String updateSql = ClickHouseStatementFactory.getUpdateStatement( tableName, diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java index ded4f88..9e4f8ba 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java @@ -90,15 +90,12 @@ public ClickHouseUpsertExecutor( public void prepareStatement(ClickHouseConnection connection) throws SQLException { this.insertStatement = new ClickHouseStatementWrapper( - connection, (ClickHousePreparedStatement) connection.prepareStatement(this.insertSql)); this.updateStatement = new ClickHouseStatementWrapper( - connection, (ClickHousePreparedStatement) connection.prepareStatement(this.updateSql)); this.deleteStatement = new ClickHouseStatementWrapper( - connection, (ClickHousePreparedStatement) connection.prepareStatement(this.deleteSql)); } diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/schema/ShardSpec.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/schema/ShardSpec.java index 782d486..7172628 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/schema/ShardSpec.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/schema/ShardSpec.java @@ -34,7 +34,7 @@ public ShardSpec( public String getJdbcUrls() { return replicas.stream() .map(replicaSpec -> replicaSpec.getHost() + ":" + replicaSpec.getPort()) - .collect(joining(",", "clickhouse://", "")); + .collect(joining(",", "jdbc:ch://", "")); } public void initShardRangeBound(List weights) { diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java index 68af8bb..ddcafac 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.TimeZone; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -70,7 +71,7 @@ private static Expression parseFunctionExpr(String shardingExpr) { String subExprLiteral = shardingExpr.substring(bracketStartIndex + 1, shardingExpr.lastIndexOf(")")); - if (subExprLiteral.trim().length() == 0) { + if (subExprLiteral.trim().isEmpty()) { return FunctionExpr.of(functionName, emptyList()); } @@ -86,4 +87,9 @@ private static Expression parseFunctionExpr(String shardingExpr) { Expression expression = parseFunctionExpr(subExprLiteral); return FunctionExpr.of(functionName, singletonList(expression)); } + + /** TODO The timezone configured via `table.local-time-zone` should be used. */ + public static TimeZone getFlinkTimeZone() { + return TimeZone.getDefault(); + } } diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java index 52927be..3267e24 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java @@ -83,9 +83,9 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) { case Array: String arrayBaseType = getInternalClickHouseType(clickHouseColumnInfo.getOriginalTypeName()); - String arrayColumnName = clickHouseColumnInfo.getColumnName() + ".array_base"; + String arrayBaseName = clickHouseColumnInfo.getColumnName() + ".array_base"; ClickHouseColumn clickHouseColumn = - ClickHouseColumn.of(arrayColumnName, arrayBaseType); + ClickHouseColumn.of(arrayBaseName, arrayBaseType); return DataTypes.ARRAY(toFlinkType(clickHouseColumn)); case Map: return DataTypes.MAP( @@ -106,8 +106,7 @@ private static String getInternalClickHouseType(String clickHouseTypeLiteral) { return matcher.group("type"); } else { throw new CatalogException( - java.lang.String.format( - "No content found in the bucket of '%s'", clickHouseTypeLiteral)); + String.format("No content found in the bucket of '%s'", clickHouseTypeLiteral)); } } } diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/FilterPushDownHelper.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/FilterPushDownHelper.java index aaa32db..803c12e 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/FilterPushDownHelper.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/FilterPushDownHelper.java @@ -22,6 +22,7 @@ import static java.util.stream.Collectors.joining; import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.EMPTY; +import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone; import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.quoteIdentifier; import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toLocalDateTime; import static org.apache.flink.connector.clickhouse.util.SqlClause.AND; @@ -177,38 +178,30 @@ private static Optional convertLiteral(ValueLiteralExpression expression .map( o -> { TimeZone timeZone = getFlinkTimeZone(); - String value; + Object value; if (o instanceof Time) { - ClickHouseDateTimeValue clickHouseDateTimeValue = + value = ClickHouseDateTimeValue.of( toLocalDateTime(((Time) o).toLocalTime()), 0, timeZone); - value = clickHouseDateTimeValue.asString(); } else if (o instanceof LocalTime) { - ClickHouseDateTimeValue clickHouseDateTimeValue = + value = ClickHouseDateTimeValue.of( toLocalDateTime(((LocalTime) o)), 0, timeZone); - value = clickHouseDateTimeValue.asString(); } else if (o instanceof Instant) { Instant instant = (Instant) o; - ClickHouseDateTimeValue clickHouseDateTimeValue = + value = ClickHouseDateTimeValue.of( instant.atZone(timeZone.toZoneId()) .toLocalDateTime(), 0, timeZone); - value = clickHouseDateTimeValue.asString(); } else { - // TODO Object and other - value = ClickHouseValues.convertToQuotedString(o); + value = o; } - return value; - }); - } - /** TODO The timezone configured via `table.local-time-zone` should be used. */ - private static TimeZone getFlinkTimeZone() { - return TimeZone.getDefault(); + return ClickHouseValues.convertToSqlExpression(value); + }); } } diff --git a/flink-connector-clickhouse/src/test/java/org/apache/flink/connector/clickhouse/AppTest.java b/flink-connector-clickhouse/src/test/java/org/apache/flink/connector/clickhouse/AppTest.java index a2b97db..db2c92d 100644 --- a/flink-connector-clickhouse/src/test/java/org/apache/flink/connector/clickhouse/AppTest.java +++ b/flink-connector-clickhouse/src/test/java/org/apache/flink/connector/clickhouse/AppTest.java @@ -16,7 +16,7 @@ import java.io.Serializable; import java.math.BigDecimal; -import java.sql.Timestamp; +import java.time.Duration; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -31,6 +31,7 @@ import static org.apache.flink.connector.clickhouse.util.ClickHouseJdbcUtil.DISTRIBUTED_TABLE_ENGINE_PATTERN; import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.parseShardingKey; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; /** Unit test for simple App. */ @@ -42,23 +43,15 @@ public void timestampLtzTest() { TimestampData timestampData = TimestampData.fromInstant(now); Instant instant = timestampData.toInstant(); LocalDateTime localDateTime = timestampData.toLocalDateTime(); - String clickHouseDateTimeValueInstant = - ClickHouseDateTimeValue.of( - instant.atZone(TimeZone.getDefault().toZoneId()).toLocalDateTime(), - 0, - TimeZone.getDefault()) - .asString(); - String clickHouseDateTimeValueLocalDateTime = - ClickHouseDateTimeValue.of(localDateTime, 0, TimeZone.getDefault()).asString(); - System.out.println(clickHouseDateTimeValueInstant); - System.out.println(clickHouseDateTimeValueLocalDateTime); + LocalDateTime zoneDateTime = + instant.atZone(TimeZone.getDefault().toZoneId()).toLocalDateTime(); + assertFalse(Duration.between(localDateTime, zoneDateTime).isZero()); } @Test public void timeTest() { LocalTime localTime = LocalTime.ofSecondOfDay(60 * 60); LocalDateTime localDateTime = localTime.atDate(LocalDate.ofEpochDay(1)); - Timestamp timestamp = Timestamp.valueOf(localDateTime); String dateTimeStr = ClickHouseDateTimeValue.of(localDateTime, 0, TimeZone.getDefault()).asString(); assertEquals("1970-01-02 01:00:00", dateTimeStr);