Skip to content

Commit

Permalink
[Fix] Adjust data conversion logic for compatibility with clickhouse-…
Browse files Browse the repository at this point in the history
…jdbc 0.6 #136
  • Loading branch information
itinycheng committed Jul 8, 2024
1 parent 7346c14 commit 7cce8b1
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;

import com.clickhouse.data.value.UnsignedByte;
import com.clickhouse.data.value.UnsignedInteger;
import com.clickhouse.data.value.UnsignedLong;
import com.clickhouse.data.value.UnsignedShort;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.sql.Array;
import java.sql.Date;
import java.sql.SQLException;
Expand All @@ -39,8 +45,10 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone;
import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toEpochDayOneTimestamp;
Expand Down Expand Up @@ -129,35 +137,51 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case INTEGER:
case BIGINT:
case TINYINT:
case BINARY:
case VARBINARY:
return value;
case TINYINT:
return ((Integer) value).byteValue();
case SMALLINT:
return value instanceof Integer ? ((Integer) value).shortValue() : value;
return value instanceof UnsignedByte ? ((UnsignedByte) value).shortValue() : value;
case INTEGER:
return value instanceof UnsignedShort ? ((UnsignedShort) value).intValue() : value;
case BIGINT:
return value instanceof UnsignedInteger
? ((UnsignedInteger) value).longValue()
: value;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return value instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) value, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) value, precision, scale);
BigDecimal decimalValue =
value instanceof BigDecimal
? (BigDecimal) value
: new BigDecimal(
value instanceof UnsignedLong
? ((UnsignedLong) value).bigIntegerValue()
: (BigInteger) value);
return DecimalData.fromBigDecimal(decimalValue, precision, scale);
case DATE:
return (int) (((Date) value).toLocalDate().toEpochDay());
return (int) (((LocalDate) value).toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return (int) (((Time) value).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TimestampData.fromLocalDateTime((LocalDateTime) value);
return TimestampData.fromLocalDateTime(
value instanceof OffsetDateTime
? ((OffsetDateTime) value).toLocalDateTime()
: (LocalDateTime) value);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return TimestampData.fromInstant(
((LocalDateTime) value).atZone(getFlinkTimeZone().toZoneId()).toInstant());
case CHAR:
case VARCHAR:
return StringData.fromString((String) value);
if (value instanceof UUID) {
return StringData.fromString(value.toString());
} else if (value instanceof InetAddress) {
return StringData.fromString(((InetAddress) value).getHostAddress());
} else {
return StringData.fromString((String) value);
}
case ARRAY:
LogicalType elementType =
type.getChildren().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;

import com.clickhouse.data.value.UnsignedByte;
import com.clickhouse.data.value.UnsignedInteger;
import com.clickhouse.data.value.UnsignedLong;
import com.clickhouse.data.value.UnsignedShort;
import com.clickhouse.jdbc.ClickHousePreparedStatement;
import com.clickhouse.jdbc.ClickHouseResultSet;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -45,6 +50,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.UUID;

import static org.apache.flink.connector.clickhouse.internal.converter.ClickHouseConverterUtils.BOOL_TRUE;
Expand Down Expand Up @@ -109,30 +115,41 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case INTEGER:
case BIGINT:
case TINYINT:
case BINARY:
case VARBINARY:
return val -> val;
case TINYINT:
return val -> ((Integer) val).byteValue();
case SMALLINT:
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
return val -> val instanceof UnsignedByte ? ((UnsignedByte) val).shortValue() : val;
case INTEGER:
return val -> val instanceof UnsignedShort ? ((UnsignedShort) val).intValue() : val;
case BIGINT:
return val ->
val instanceof UnsignedInteger ? ((UnsignedInteger) val).longValue() : val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
return val -> {
BigDecimal decimalValue =
val instanceof BigDecimal
? (BigDecimal) val
: new BigDecimal(
val instanceof UnsignedLong
? ((UnsignedLong) val).bigIntegerValue()
: (BigInteger) val);
return DecimalData.fromBigDecimal(decimalValue, precision, scale);
};
case DATE:
return val -> (int) ((Date) val).toLocalDate().toEpochDay();
return val -> (int) ((LocalDate) val).toEpochDay();
case TIME_WITHOUT_TIME_ZONE:
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val -> TimestampData.fromLocalDateTime((LocalDateTime) val);
return val ->
TimestampData.fromLocalDateTime(
val instanceof OffsetDateTime
? ((OffsetDateTime) val).toLocalDateTime()
: (LocalDateTime) val);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val ->
TimestampData.fromInstant(
Expand All @@ -141,10 +158,15 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
.toInstant());
case CHAR:
case VARCHAR:
return val ->
val instanceof UUID
? StringData.fromString(val.toString())
: StringData.fromString((String) val);
return val -> {
if (val instanceof UUID) {
return StringData.fromString(val.toString());
} else if (val instanceof InetAddress) {
return StringData.fromString(((InetAddress) val).getHostAddress());
} else {
return StringData.fromString((String) val);
}
};
case ARRAY:
case MAP:
return val -> ClickHouseConverterUtils.toInternal(val, type);
Expand Down Expand Up @@ -242,6 +264,7 @@ private SerializationConverter createToExternalConverter(LogicalType type) {

@FunctionalInterface
interface SerializationConverter extends Serializable {

/**
* Convert an internal field to java object and fill into the {@link
* ClickHousePreparedStatement}.
Expand All @@ -252,6 +275,7 @@ void serialize(RowData rowData, int index, ClickHouseStatementWrapper statement)

@FunctionalInterface
interface DeserializationConverter extends Serializable {

/**
* Convert an object of {@link ClickHouseResultSet} to the internal data structure object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) {
switch (clickHouseColumnInfo.getDataType()) {
case Int8:
return DataTypes.TINYINT();
case Int16:
case Bool:
return DataTypes.BOOLEAN();
case Int16:
case UInt8:
return DataTypes.SMALLINT();
case Int32:
Expand Down Expand Up @@ -92,6 +92,7 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) {
case UUID:
return DataTypes.VARCHAR(clickHouseColumnInfo.getPrecision());
case Date:
case Date32:
return DataTypes.DATE();
case DateTime:
case DateTime32:
Expand Down

0 comments on commit 7cce8b1

Please sign in to comment.