Skip to content

Commit

Permalink
Support DateTime64 in ClickHouse
Browse files Browse the repository at this point in the history
TIMESTAMP(p)
TIMESTAMP(p) WITH TIME ZONE
  • Loading branch information
ssheikin committed Oct 15, 2024
1 parent e44964f commit bb0b7b1
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.LongReadFunction;
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.ObjectReadFunction;
import io.trino.plugin.jdbc.ObjectWriteFunction;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.RemoteTableName;
Expand Down Expand Up @@ -73,7 +74,11 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
Expand All @@ -96,6 +101,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -145,12 +151,14 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longTimestampWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.realWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryColumnMapping;
Expand All @@ -173,11 +181,15 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_SECONDS;
import static io.trino.spi.type.TimestampType.createTimestampType;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_SECONDS;
import static io.trino.spi.type.TimestampWithTimeZoneType.createTimestampWithTimeZoneType;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
Expand All @@ -199,6 +211,7 @@
public class ClickHouseClient
extends BaseJdbcClient
{
public static final int CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION = 9;
private static final Splitter TABLE_PROPERTY_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

private static final DecimalType UINT64_TYPE = createDecimalType(20, 0);
Expand Down Expand Up @@ -720,9 +733,10 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
timestampReadFunction(TIMESTAMP_SECONDS),
timestampSecondsWriteFunction(getClickHouseServerVersion(session))));
}
// TODO (https://github.com/trinodb/trino/issues/10537) Add support for Datetime64 type
return Optional.of(timestampColumnMapping(TIMESTAMP_MILLIS));

if (columnDataType == ClickHouseDataType.DateTime64) {
return Optional.of(timestampColumnMapping(createTimestampType(column.getScale())));
}
break;
case Types.TIMESTAMP_WITH_TIMEZONE:
if (columnDataType == ClickHouseDataType.DateTime) {
// ClickHouse DateTime does not have sub-second precision
Expand All @@ -732,6 +746,9 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
shortTimestampWithTimeZoneReadFunction(),
shortTimestampWithTimeZoneWriteFunction(column.getTimeZone())));
}
if (columnDataType == ClickHouseDataType.DateTime64) {
return Optional.of(timestampWithTimeZoneColumnMapping(column));
}
}

if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
Expand Down Expand Up @@ -787,12 +804,85 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
if (type == TIMESTAMP_SECONDS) {
return WriteMapping.longMapping("DateTime", timestampSecondsWriteFunction(getClickHouseServerVersion(session)));
}
if (type instanceof TimestampType timestampType) {
return timestampWriteMapping(timestampType);
}
if (type instanceof TimestampWithTimeZoneType timestampWithTimeZoneType) {
return timestampWithTimeZoneWriteMapping(timestampWithTimeZoneType);
}
if (type.equals(uuidType)) {
return WriteMapping.sliceMapping("UUID", uuidWriteFunction());
}
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
}

private WriteMapping timestampWriteMapping(TimestampType timestampType)
{
int precision = timestampType.getPrecision();
String dataType = "DateTime64(%s)".formatted(precision);
if (precision <= TimestampType.MAX_SHORT_PRECISION) {
return WriteMapping.longMapping(dataType, timestampWriteFunction(createTimestampType(precision)));
}
checkArgument(precision <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION, "Precision is out of range: %s", precision);
return WriteMapping.objectMapping(dataType, longTimestampWriteFunction(timestampType, precision));
}

private static ColumnMapping timestampWithTimeZoneColumnMapping(ClickHouseColumn clickHouseColumn)
{
int precision = clickHouseColumn.getScale();
TimeZone columnTimeZone = clickHouseColumn.getTimeZone();
checkArgument(precision <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION, "unsupported precision value %s", precision);
TimestampWithTimeZoneType trinoType = createTimestampWithTimeZoneType(precision);
if (precision <= TimestampWithTimeZoneType.MAX_SHORT_PRECISION) {
return ColumnMapping.longMapping(
trinoType,
shortTimestampWithTimeZoneReadFunction(),
shortTimestampWithTimeZoneWriteFunction(columnTimeZone));
}
return ColumnMapping.objectMapping(
trinoType,
longTimestampWithTimeZoneReadFunction(),
longTimestampWithTimeZoneWriteFunction(columnTimeZone));
}

private static WriteMapping timestampWithTimeZoneWriteMapping(TimestampWithTimeZoneType timestampWithTimeZoneType)
{
verify(timestampWithTimeZoneType.getPrecision() <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION);
// Clickhouse DateTime64(precision, [timezone])
// The time zone is not stored in the rows of the table (or in resultset), but is stored in the column metadata.
// Timezone agnostic Unix timestamp is stored in tables
// In trino timezone is not available at this point of time
// So create columns within UTC timezone
// TODO get timezone from session?
TimeZone timeZone = TimeZone.getTimeZone(ZoneId.of("UTC"));
String dataType = "DateTime64(%d, '%s')".formatted(timestampWithTimeZoneType.getPrecision(), timeZone);
if (timestampWithTimeZoneType.isShort()) {
return WriteMapping.longMapping(dataType, shortTimestampWithTimeZoneWriteFunction(timeZone));
}
return WriteMapping.objectMapping(dataType, longTimestampWithTimeZoneWriteFunction(timeZone));
}

private static ObjectReadFunction longTimestampWithTimeZoneReadFunction()
{
return ObjectReadFunction.of(LongTimestampWithTimeZone.class, (resultSet, columnIndex) -> {
ZonedDateTime timestamp = resultSet.getObject(columnIndex, ZonedDateTime.class);
return LongTimestampWithTimeZone.fromEpochSecondsAndFraction(timestamp.toEpochSecond(),
(long) timestamp.getNano() * PICOSECONDS_PER_NANOSECOND,
TimeZoneKey.getTimeZoneKey(timestamp.getZone().getId()));
});
}

private static ObjectWriteFunction longTimestampWithTimeZoneWriteFunction(TimeZone columnTimeZone)
{
return ObjectWriteFunction.of(LongTimestampWithTimeZone.class, (statement, index, value) -> {
long epochMillis = value.getEpochMillis();
long epochSeconds = Math.floorDiv(epochMillis, MILLISECONDS_PER_SECOND);
long adjustNanoSeconds = (long) Math.floorMod(epochMillis, MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND + value.getPicosOfMilli() / PICOSECONDS_PER_NANOSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, adjustNanoSeconds);
statement.setObject(index, ZonedDateTime.ofInstant(instant, columnTimeZone.toZoneId()));
});
}

private ClickHouseVersion getClickHouseServerVersion(ConnectorSession session)
{
return clickHouseVersion.updateAndGet(current -> {
Expand Down
Loading

0 comments on commit bb0b7b1

Please sign in to comment.