Skip to content

Commit

Permalink
Support DateTime64 in ClickHouse
Browse files Browse the repository at this point in the history
TIMESTAMP(p)
TIMESTAMP(p) WITH TIMEZONE
  • Loading branch information
ssheikin committed Oct 15, 2024
1 parent 2800452 commit 5ae0bf5
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
import io.trino.plugin.base.expression.ConnectorExpressionRewriter;
import io.trino.plugin.base.expression.ConnectorExpressionRule.RewriteContext;
import io.trino.plugin.base.mapping.IdentifierMapping;
import io.trino.plugin.clickhouse.expression.RewriteComparison;
import io.trino.plugin.clickhouse.expression.RewriteLike;
import io.trino.plugin.clickhouse.expression.RewriteStringComparison;
import io.trino.plugin.clickhouse.expression.RewriteStringIn;
import io.trino.plugin.clickhouse.expression.RewriteTimestampConstant;
import io.trino.plugin.jdbc.BaseJdbcClient;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ColumnMapping;
Expand All @@ -44,6 +45,7 @@
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.LongReadFunction;
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.ObjectReadFunction;
import io.trino.plugin.jdbc.ObjectWriteFunction;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.RemoteTableName;
Expand Down Expand Up @@ -73,7 +75,11 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
Expand Down Expand Up @@ -103,12 +109,16 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static com.clickhouse.data.ClickHouseDataType.DateTime;
import static com.clickhouse.data.ClickHouseDataType.DateTime64;
import static com.clickhouse.data.ClickHouseDataType.FixedString;
import static com.clickhouse.data.ClickHouseValues.convertToQuotedString;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.emptyToNull;
Expand Down Expand Up @@ -145,12 +155,14 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longTimestampWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.realWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryColumnMapping;
Expand All @@ -173,11 +185,15 @@
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_SECONDS;
import static io.trino.spi.type.TimestampType.createTimestampType;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_SECONDS;
import static io.trino.spi.type.TimestampWithTimeZoneType.createTimestampWithTimeZoneType;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
Expand All @@ -199,6 +215,7 @@
public class ClickHouseClient
extends BaseJdbcClient
{
public static final int CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION = 9;
private static final Splitter TABLE_PROPERTY_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

private static final DecimalType UINT64_TYPE = createDecimalType(20, 0);
Expand Down Expand Up @@ -229,7 +246,9 @@ public ClickHouseClient(
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
this.connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
.addStandardRules(this::quoted)
.add(new RewriteStringComparison())
.add(new RewriteTimestampConstant())
.add(new RewriteComparison(ImmutableList.of(CharType.class, VarcharType.class), ImmutableSet.of(FixedString, ClickHouseDataType.String)))
.add(new RewriteComparison(ImmutableList.of(TimestampType.class, TimestampWithTimeZoneType.class), ImmutableSet.of(DateTime, DateTime64)))
.add(new RewriteStringIn())
.add(new RewriteLike())
.map("$not(value: boolean)").to("NOT value")
Expand Down Expand Up @@ -720,9 +739,9 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
timestampReadFunction(TIMESTAMP_SECONDS),
timestampSecondsWriteFunction(getClickHouseServerVersion(session))));
}
// TODO (https://github.com/trinodb/trino/issues/10537) Add support for Datetime64 type
return Optional.of(timestampColumnMapping(TIMESTAMP_MILLIS));

if (columnDataType == ClickHouseDataType.DateTime64) {
return Optional.of(timestampColumnMapping(createTimestampType(column.getScale())));
}
case Types.TIMESTAMP_WITH_TIMEZONE:

Check failure on line 745 in plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java

View workflow job for this annotation

GitHub Actions / error-prone-checks

Execution may fall through from the previous case; add a `// fall through` comment before this line if it was deliberate

Check failure on line 745 in plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java

View workflow job for this annotation

GitHub Actions / error-prone-checks

Execution may fall through from the previous case; add a `// fall through` comment before this line if it was deliberate
if (columnDataType == ClickHouseDataType.DateTime) {
// ClickHouse DateTime does not have sub-second precision
Expand All @@ -732,6 +751,9 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
shortTimestampWithTimeZoneReadFunction(),
shortTimestampWithTimeZoneWriteFunction(column.getTimeZone())));
}
if (columnDataType == ClickHouseDataType.DateTime64) {
return Optional.of(timestampWithTimeZoneColumnMapping(column));
}
}

if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
Expand Down Expand Up @@ -787,12 +809,85 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
if (type == TIMESTAMP_SECONDS) {
return WriteMapping.longMapping("DateTime", timestampSecondsWriteFunction(getClickHouseServerVersion(session)));
}
if (type instanceof TimestampType timestampType) {
return timestampWriteMapping(timestampType);
}
if (type instanceof TimestampWithTimeZoneType timestampWithTimeZoneType) {
return timestampWithTimeZoneWriteMapping(timestampWithTimeZoneType);
}
if (type.equals(uuidType)) {
return WriteMapping.sliceMapping("UUID", uuidWriteFunction());
}
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
}

private WriteMapping timestampWriteMapping(TimestampType timestampType)
{
int precision = timestampType.getPrecision();
String dataType = "DateTime64(%s)".formatted(precision);
if (precision <= TimestampType.MAX_SHORT_PRECISION) {
return WriteMapping.longMapping(dataType, timestampWriteFunction(createTimestampType(precision)));
}
checkArgument(precision <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION, "Precision is out of range: %s", precision);
return WriteMapping.objectMapping(dataType, longTimestampWriteFunction(timestampType, precision));
}

private static ColumnMapping timestampWithTimeZoneColumnMapping(ClickHouseColumn clickHouseColumn)
{
int precision = clickHouseColumn.getScale();
TimeZone columnTimeZone = clickHouseColumn.getTimeZone();
checkArgument(precision <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION, "unsupported precision value %s", precision);
TimestampWithTimeZoneType trinoType = createTimestampWithTimeZoneType(precision);
if (precision <= TimestampWithTimeZoneType.MAX_SHORT_PRECISION) {
return ColumnMapping.longMapping(
trinoType,
shortTimestampWithTimeZoneReadFunction(),
shortTimestampWithTimeZoneWriteFunction(columnTimeZone));
}
return ColumnMapping.objectMapping(
trinoType,
longTimestampWithTimeZoneReadFunction(),
longTimestampWithTimeZoneWriteFunction(columnTimeZone));
}

private static WriteMapping timestampWithTimeZoneWriteMapping(TimestampWithTimeZoneType timestampWithTimeZoneType)
{
verify(timestampWithTimeZoneType.getPrecision() <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION);
// Clickhouse DateTime64(precision, [timezone])
// The time zone is not stored in the rows of the table (or in resultset), but is stored in the column metadata.
// Timezone agnostic Unix timestamp is stored in tables
// In trino timezone is not available at this point of time
// So create columns within UTC timezone
// TODO get timezone from session?
String dataType = format("DateTime64(%d, 'UTC')", timestampWithTimeZoneType.getPrecision());
if (timestampWithTimeZoneType.isShort()) {
return WriteMapping.longMapping(dataType, shortTimestampWithTimeZoneWriteFunction(TimeZone.getTimeZone("UTC")));
}
return WriteMapping.objectMapping(dataType, longTimestampWithTimeZoneWriteFunction(TimeZone.getTimeZone("UTC")));
}

private static ObjectReadFunction longTimestampWithTimeZoneReadFunction()
{
return ObjectReadFunction.of(LongTimestampWithTimeZone.class, (resultSet, columnIndex) -> {
ZonedDateTime timestamp = resultSet.getObject(columnIndex, ZonedDateTime.class);
return LongTimestampWithTimeZone.fromEpochSecondsAndFraction(timestamp.toEpochSecond(),
(long) timestamp.getNano() * PICOSECONDS_PER_NANOSECOND,
TimeZoneKey.getTimeZoneKey(timestamp.getZone().getId()));
});
}

private static ObjectWriteFunction longTimestampWithTimeZoneWriteFunction(TimeZone columnTimeZone)
{
return ObjectWriteFunction.of(LongTimestampWithTimeZone.class, (statement, index, value) -> {
long epochMillis = value.getEpochMillis();
long epochSeconds = Math.floorDiv(epochMillis, MILLISECONDS_PER_SECOND);
long adjustNanoSeconds = (long) Math.floorMod(epochMillis, MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND + value.getPicosOfMilli() / PICOSECONDS_PER_NANOSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, adjustNanoSeconds);
statement.setObject(index, ZonedDateTime.ofInstant(instant, columnTimeZone.toZoneId()));
});
}


private ClickHouseVersion getClickHouseServerVersion(ConnectorSession session)
{
return clickHouseVersion.updateAndGet(current -> {
Expand Down Expand Up @@ -984,17 +1079,14 @@ private static SliceWriteFunction uuidWriteFunction()
return (statement, index, value) -> statement.setObject(index, trinoUuidToJavaUuid(value), Types.OTHER);
}

public static boolean supportsPushdown(Variable variable, RewriteContext<ParameterizedExpression> context)
public static boolean supportsPushdown(Variable variable, RewriteContext<ParameterizedExpression> context, Set<ClickHouseDataType> nativeTypes)
{
JdbcTypeHandle typeHandle = ((JdbcColumnHandle) context.getAssignment(variable.getName()))
.getJdbcTypeHandle();
String jdbcTypeName = typeHandle.jdbcTypeName()
.orElseThrow(() -> new TrinoException(JDBC_ERROR, "Type name is missing: " + typeHandle));
ClickHouseColumn column = ClickHouseColumn.of("", jdbcTypeName);
ClickHouseDataType columnDataType = column.getDataType();
return switch (columnDataType) {
case FixedString, String -> true;
default -> false;
};
return nativeTypes.contains(columnDataType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.plugin.clickhouse.expression;

import com.clickhouse.data.ClickHouseDataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.matching.Capture;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
Expand All @@ -24,9 +26,8 @@
import io.trino.spi.expression.Call;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Variable;
import io.trino.spi.type.CharType;
import io.trino.spi.type.VarcharType;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

Expand All @@ -43,26 +44,34 @@
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static java.lang.String.format;

public class RewriteStringComparison
public class RewriteComparison
implements ConnectorExpressionRule<Call, ParameterizedExpression>
{
private static final Capture<ConnectorExpression> LEFT = newCapture();
private static final Capture<ConnectorExpression> RIGHT = newCapture();
private static final Pattern<Call> PATTERN = call()
.with(type().equalTo(BOOLEAN))
.with(functionName().matching(Stream.of(ComparisonOperator.values())
.filter(comparison -> comparison != ComparisonOperator.IDENTICAL)
.map(ComparisonOperator::getFunctionName)
.collect(toImmutableSet())
::contains))
.with(argumentCount().equalTo(2))
.with(argument(0).matching(expression().with(type().matching(type -> type instanceof CharType || type instanceof VarcharType)).capturedAs(LEFT)))
.with(argument(1).matching(expression().with(type().matching(type -> type instanceof CharType || type instanceof VarcharType)).capturedAs(RIGHT)));

private final Pattern<Call> pattern;
private final ImmutableSet<ClickHouseDataType> nativeTypes;

public RewriteComparison(List<Class<?>> classes, ImmutableSet<ClickHouseDataType> nativeTypes)
{
pattern = call()
.with(type().equalTo(BOOLEAN))
.with(functionName().matching(Stream.of(ComparisonOperator.values())
.filter(comparison -> comparison != ComparisonOperator.IDENTICAL)
.map(ComparisonOperator::getFunctionName)
.collect(toImmutableSet())
::contains))
.with(argumentCount().equalTo(2))
.with(argument(0).matching(expression().with(type().matching(type -> classes.stream().anyMatch(aClass -> aClass.isInstance(type)))).capturedAs(LEFT)))
.with(argument(1).matching(expression().with(type().matching(type -> classes.stream().anyMatch(aClass -> aClass.isInstance(type)))).capturedAs(RIGHT)));
this.nativeTypes = nativeTypes;
}

@Override
public Pattern<Call> getPattern()
{
return PATTERN;
return pattern;
}

@Override
Expand All @@ -72,11 +81,11 @@ public Optional<ParameterizedExpression> rewrite(Call expression, Captures captu
ConnectorExpression leftExpression = captures.get(LEFT);
ConnectorExpression rightExpression = captures.get(RIGHT);

if (leftExpression instanceof Variable variable && !supportsPushdown(variable, context)) {
if (leftExpression instanceof Variable variable && !supportsPushdown(variable, context, nativeTypes)) {
return Optional.empty();
}

if (rightExpression instanceof Variable variable && !supportsPushdown(variable, context)) {
if (rightExpression instanceof Variable variable && !supportsPushdown(variable, context, nativeTypes)) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.plugin.clickhouse.expression;

import com.clickhouse.data.ClickHouseDataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import io.trino.matching.Capture;
import io.trino.matching.Captures;
Expand All @@ -29,6 +31,7 @@

import java.util.Optional;

import static com.clickhouse.data.ClickHouseDataType.FixedString;
import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.matching.Capture.newCapture;
import static io.trino.plugin.base.expression.ConnectorExpressionPatterns.argument;
Expand Down Expand Up @@ -59,7 +62,7 @@ public class RewriteLike
.with(argumentCount().equalTo(2))
.with(argument(0).matching(variable()
.with(type().matching(type -> type instanceof CharType || type instanceof VarcharType))
.matching((Variable variable, RewriteContext<ParameterizedExpression> context) -> supportsPushdown(variable, context))
.matching((Variable variable, RewriteContext<ParameterizedExpression> context) -> supportsPushdown(variable, context, ImmutableSet.of(FixedString, ClickHouseDataType.String)))
.capturedAs(LIKE_VALUE)))
.with(argument(1).matching(constant()
.with(type().matching(type -> type instanceof CharType || type instanceof VarcharType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package io.trino.plugin.clickhouse.expression;

import com.clickhouse.data.ClickHouseDataType;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.matching.Capture;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
Expand All @@ -31,6 +33,7 @@
import java.util.List;
import java.util.Optional;

import static com.clickhouse.data.ClickHouseDataType.FixedString;
import static com.google.common.base.Verify.verify;
import static io.trino.matching.Capture.newCapture;
import static io.trino.plugin.base.expression.ConnectorExpressionPatterns.argument;
Expand Down Expand Up @@ -59,7 +62,7 @@ public class RewriteStringIn
.with(argumentCount().equalTo(2))
.with(argument(0).matching(variable()
.with(type().matching(type -> type instanceof CharType || type instanceof VarcharType))
.matching((Variable variable, RewriteContext<ParameterizedExpression> context) -> supportsPushdown(variable, context))
.matching((Variable variable, RewriteContext<ParameterizedExpression> context) -> supportsPushdown(variable, context, ImmutableSet.of(FixedString, ClickHouseDataType.String)))
.capturedAs(VALUE)))
.with(argument(1).matching(call()
.with(functionName().equalTo(ARRAY_CONSTRUCTOR_FUNCTION_NAME))
Expand Down
Loading

0 comments on commit 5ae0bf5

Please sign in to comment.