Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DateTime64 pushdown to ClickHouse #23789

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
import io.trino.plugin.base.expression.ConnectorExpressionRewriter;
import io.trino.plugin.base.expression.ConnectorExpressionRule.RewriteContext;
import io.trino.plugin.base.mapping.IdentifierMapping;
import io.trino.plugin.clickhouse.expression.RewriteComparison;
import io.trino.plugin.clickhouse.expression.RewriteLike;
import io.trino.plugin.clickhouse.expression.RewriteStringComparison;
import io.trino.plugin.clickhouse.expression.RewriteStringIn;
import io.trino.plugin.clickhouse.expression.RewriteTimestampConstant;
import io.trino.plugin.jdbc.BaseJdbcClient;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ColumnMapping;
Expand All @@ -44,6 +45,7 @@
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.LongReadFunction;
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.ObjectReadFunction;
import io.trino.plugin.jdbc.ObjectWriteFunction;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.RemoteTableName;
Expand Down Expand Up @@ -73,8 +75,11 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
Expand All @@ -97,18 +102,24 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static com.clickhouse.data.ClickHouseDataType.DateTime;
import static com.clickhouse.data.ClickHouseDataType.DateTime64;
import static com.clickhouse.data.ClickHouseDataType.FixedString;
import static com.clickhouse.data.ClickHouseValues.convertToQuotedString;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.emptyToNull;
Expand Down Expand Up @@ -145,12 +156,14 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.longTimestampWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.realWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.timestampWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryColumnMapping;
Expand All @@ -167,18 +180,21 @@
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static io.trino.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.trino.spi.type.DateTimeEncoding.unpackZoneKey;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.DecimalType.createDecimalType;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_SECONDS;
import static io.trino.spi.type.TimestampType.createTimestampType;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_SECONDS;
import static io.trino.spi.type.TimestampWithTimeZoneType.createTimestampWithTimeZoneType;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.UuidType.javaUuidToTrinoUuid;
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;
Expand All @@ -200,6 +216,7 @@
public class ClickHouseClient
extends BaseJdbcClient
{
public static final int CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION = 9;
private static final Splitter TABLE_PROPERTY_SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

private static final DecimalType UINT64_TYPE = createDecimalType(20, 0);
Expand Down Expand Up @@ -230,7 +247,9 @@ public ClickHouseClient(
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
this.connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
.addStandardRules(this::quoted)
.add(new RewriteStringComparison())
.add(new RewriteTimestampConstant())
.add(new RewriteComparison(ImmutableList.of(CharType.class, VarcharType.class), ImmutableSet.of(FixedString, ClickHouseDataType.String)))
.add(new RewriteComparison(ImmutableList.of(TimestampType.class, TimestampWithTimeZoneType.class), ImmutableSet.of(DateTime, DateTime64)))
.add(new RewriteStringIn())
.add(new RewriteLike())
.map("$not(value: boolean)").to("NOT value")
Expand Down Expand Up @@ -721,17 +740,21 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect
timestampReadFunction(TIMESTAMP_SECONDS),
timestampSecondsWriteFunction(getClickHouseServerVersion(session))));
}
// TODO (https://github.com/trinodb/trino/issues/10537) Add support for Datetime64 type
return Optional.of(timestampColumnMapping(TIMESTAMP_MILLIS));

if (columnDataType == ClickHouseDataType.DateTime64) {
return Optional.of(timestampColumnMapping(createTimestampType(column.getScale())));
}
break;
case Types.TIMESTAMP_WITH_TIMEZONE:
if (columnDataType == ClickHouseDataType.DateTime) {
// ClickHouse DateTime does not have sub-second precision
verify(typeHandle.requiredDecimalDigits() == 0, "Expected 0 as timestamp with time zone precision, but got %s", typeHandle.requiredDecimalDigits());
return Optional.of(ColumnMapping.longMapping(
TIMESTAMP_TZ_SECONDS,
shortTimestampWithTimeZoneReadFunction(),
shortTimestampWithTimeZoneWriteFunction()));
shortTimestampWithTimeZoneWriteFunction(column.getTimeZone())));
}
if (columnDataType == ClickHouseDataType.DateTime64) {
return Optional.of(timestampWithTimeZoneColumnMapping(column));
}
}

Expand Down Expand Up @@ -788,12 +811,85 @@ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
if (type == TIMESTAMP_SECONDS) {
return WriteMapping.longMapping("DateTime", timestampSecondsWriteFunction(getClickHouseServerVersion(session)));
}
if (type instanceof TimestampType timestampType) {
return timestampWriteMapping(timestampType);
}
if (type instanceof TimestampWithTimeZoneType timestampWithTimeZoneType) {
return timestampWithTimeZoneWriteMapping(timestampWithTimeZoneType);
}
if (type.equals(uuidType)) {
return WriteMapping.sliceMapping("UUID", uuidWriteFunction());
}
throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
}

private WriteMapping timestampWriteMapping(TimestampType timestampType)
{
int precision = timestampType.getPrecision();
String dataType = "DateTime64(%s)".formatted(precision);
if (precision <= TimestampType.MAX_SHORT_PRECISION) {
return WriteMapping.longMapping(dataType, timestampWriteFunction(createTimestampType(precision)));
}
checkArgument(precision <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION, "Precision is out of range: %s", precision);
return WriteMapping.objectMapping(dataType, longTimestampWriteFunction(timestampType, precision));
}

private static ColumnMapping timestampWithTimeZoneColumnMapping(ClickHouseColumn clickHouseColumn)
{
int precision = clickHouseColumn.getScale();
TimeZone columnTimeZone = clickHouseColumn.getTimeZone();
checkArgument(precision <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION, "unsupported precision value %s", precision);
TimestampWithTimeZoneType trinoType = createTimestampWithTimeZoneType(precision);
if (precision <= TimestampWithTimeZoneType.MAX_SHORT_PRECISION) {
return ColumnMapping.longMapping(
trinoType,
shortTimestampWithTimeZoneReadFunction(),
shortTimestampWithTimeZoneWriteFunction(columnTimeZone));
}
return ColumnMapping.objectMapping(
trinoType,
longTimestampWithTimeZoneReadFunction(),
longTimestampWithTimeZoneWriteFunction(columnTimeZone));
}

private static WriteMapping timestampWithTimeZoneWriteMapping(TimestampWithTimeZoneType timestampWithTimeZoneType)
{
verify(timestampWithTimeZoneType.getPrecision() <= CLICKHOUSE_MAX_SUPPORTED_TIMESTAMP_PRECISION);
// Clickhouse DateTime64(precision, [timezone])
// The time zone is not stored in the rows of the table (or in resultset), but is stored in the column metadata.
// Timezone agnostic Unix timestamp is stored in tables
// In trino timezone is not available at this point of time
// So create columns within UTC timezone
// TODO get timezone from session?
TimeZone timeZone = TimeZone.getTimeZone(ZoneId.of("UTC"));
String dataType = "DateTime64(%d, '%s')".formatted(timestampWithTimeZoneType.getPrecision(), timeZone.toZoneId());
if (timestampWithTimeZoneType.isShort()) {
return WriteMapping.longMapping(dataType, shortTimestampWithTimeZoneWriteFunction(timeZone));
}
return WriteMapping.objectMapping(dataType, longTimestampWithTimeZoneWriteFunction(timeZone));
}

private static ObjectReadFunction longTimestampWithTimeZoneReadFunction()
{
return ObjectReadFunction.of(LongTimestampWithTimeZone.class, (resultSet, columnIndex) -> {
ZonedDateTime timestamp = resultSet.getObject(columnIndex, ZonedDateTime.class);
return LongTimestampWithTimeZone.fromEpochSecondsAndFraction(timestamp.toEpochSecond(),
(long) timestamp.getNano() * PICOSECONDS_PER_NANOSECOND,
TimeZoneKey.getTimeZoneKey(timestamp.getZone().getId()));
});
}

private static ObjectWriteFunction longTimestampWithTimeZoneWriteFunction(TimeZone columnTimeZone)
{
return ObjectWriteFunction.of(LongTimestampWithTimeZone.class, (statement, index, value) -> {
long epochMillis = value.getEpochMillis();
long epochSeconds = Math.floorDiv(epochMillis, MILLISECONDS_PER_SECOND);
long adjustNanoSeconds = (long) Math.floorMod(epochMillis, MILLISECONDS_PER_SECOND) * NANOSECONDS_PER_MILLISECOND + value.getPicosOfMilli() / PICOSECONDS_PER_NANOSECOND;
Instant instant = Instant.ofEpochSecond(epochSeconds, adjustNanoSeconds);
statement.setObject(index, ZonedDateTime.ofInstant(instant, columnTimeZone.toZoneId()));
});
}

private ClickHouseVersion getClickHouseServerVersion(ConnectorSession session)
{
return clickHouseVersion.updateAndGet(current -> {
Expand Down Expand Up @@ -913,12 +1009,11 @@ private static LongReadFunction shortTimestampWithTimeZoneReadFunction()
};
}

private static LongWriteFunction shortTimestampWithTimeZoneWriteFunction()
private static LongWriteFunction shortTimestampWithTimeZoneWriteFunction(TimeZone columnTimeZone)
{
return (statement, index, value) -> {
long millisUtc = unpackMillisUtc(value);
TimeZoneKey timeZoneKey = unpackZoneKey(value);
statement.setObject(index, Instant.ofEpochMilli(millisUtc).atZone(timeZoneKey.getZoneId()));
statement.setObject(index, Instant.ofEpochMilli(millisUtc).atZone(columnTimeZone.toZoneId()));
};
}

Expand Down Expand Up @@ -986,17 +1081,14 @@ private static SliceWriteFunction uuidWriteFunction()
return (statement, index, value) -> statement.setObject(index, trinoUuidToJavaUuid(value), Types.OTHER);
}

public static boolean supportsPushdown(Variable variable, RewriteContext<ParameterizedExpression> context)
public static boolean supportsPushdown(Variable variable, RewriteContext<ParameterizedExpression> context, Set<ClickHouseDataType> nativeTypes)
{
JdbcTypeHandle typeHandle = ((JdbcColumnHandle) context.getAssignment(variable.getName()))
.getJdbcTypeHandle();
String jdbcTypeName = typeHandle.jdbcTypeName()
.orElseThrow(() -> new TrinoException(JDBC_ERROR, "Type name is missing: " + typeHandle));
ClickHouseColumn column = ClickHouseColumn.of("", jdbcTypeName);
ClickHouseDataType columnDataType = column.getDataType();
return switch (columnDataType) {
case FixedString, String -> true;
default -> false;
};
return nativeTypes.contains(columnDataType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.plugin.clickhouse.expression;

import com.clickhouse.data.ClickHouseDataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.trino.matching.Capture;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
Expand All @@ -24,9 +26,8 @@
import io.trino.spi.expression.Call;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Variable;
import io.trino.spi.type.CharType;
import io.trino.spi.type.VarcharType;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

Expand All @@ -43,26 +44,34 @@
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static java.lang.String.format;

public class RewriteStringComparison
public class RewriteComparison
implements ConnectorExpressionRule<Call, ParameterizedExpression>
{
private static final Capture<ConnectorExpression> LEFT = newCapture();
private static final Capture<ConnectorExpression> RIGHT = newCapture();
private static final Pattern<Call> PATTERN = call()
.with(type().equalTo(BOOLEAN))
.with(functionName().matching(Stream.of(ComparisonOperator.values())
.filter(comparison -> comparison != ComparisonOperator.IDENTICAL)
.map(ComparisonOperator::getFunctionName)
.collect(toImmutableSet())
::contains))
.with(argumentCount().equalTo(2))
.with(argument(0).matching(expression().with(type().matching(type -> type instanceof CharType || type instanceof VarcharType)).capturedAs(LEFT)))
.with(argument(1).matching(expression().with(type().matching(type -> type instanceof CharType || type instanceof VarcharType)).capturedAs(RIGHT)));

private final Pattern<Call> pattern;
private final ImmutableSet<ClickHouseDataType> nativeTypes;

public RewriteComparison(List<Class<?>> classes, ImmutableSet<ClickHouseDataType> nativeTypes)
{
pattern = call()
.with(type().equalTo(BOOLEAN))
.with(functionName().matching(Stream.of(ComparisonOperator.values())
.filter(comparison -> comparison != ComparisonOperator.IDENTICAL)
.map(ComparisonOperator::getFunctionName)
.collect(toImmutableSet())
::contains))
.with(argumentCount().equalTo(2))
.with(argument(0).matching(expression().with(type().matching(type -> classes.stream().anyMatch(aClass -> aClass.isInstance(type)))).capturedAs(LEFT)))
.with(argument(1).matching(expression().with(type().matching(type -> classes.stream().anyMatch(aClass -> aClass.isInstance(type)))).capturedAs(RIGHT)));
this.nativeTypes = nativeTypes;
}

@Override
public Pattern<Call> getPattern()
{
return PATTERN;
return pattern;
}

@Override
Expand All @@ -72,11 +81,11 @@ public Optional<ParameterizedExpression> rewrite(Call expression, Captures captu
ConnectorExpression leftExpression = captures.get(LEFT);
ConnectorExpression rightExpression = captures.get(RIGHT);

if (leftExpression instanceof Variable variable && !supportsPushdown(variable, context)) {
if (leftExpression instanceof Variable variable && !supportsPushdown(variable, context, nativeTypes)) {
return Optional.empty();
}

if (rightExpression instanceof Variable variable && !supportsPushdown(variable, context)) {
if (rightExpression instanceof Variable variable && !supportsPushdown(variable, context, nativeTypes)) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package io.trino.plugin.clickhouse.expression;

import com.clickhouse.data.ClickHouseDataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import io.trino.matching.Capture;
import io.trino.matching.Captures;
Expand All @@ -29,6 +31,7 @@

import java.util.Optional;

import static com.clickhouse.data.ClickHouseDataType.FixedString;
import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.matching.Capture.newCapture;
import static io.trino.plugin.base.expression.ConnectorExpressionPatterns.argument;
Expand Down Expand Up @@ -59,7 +62,7 @@ public class RewriteLike
.with(argumentCount().equalTo(2))
.with(argument(0).matching(variable()
.with(type().matching(type -> type instanceof CharType || type instanceof VarcharType))
.matching((Variable variable, RewriteContext<ParameterizedExpression> context) -> supportsPushdown(variable, context))
.matching((Variable variable, RewriteContext<ParameterizedExpression> context) -> supportsPushdown(variable, context, ImmutableSet.of(FixedString, ClickHouseDataType.String)))
.capturedAs(LIKE_VALUE)))
.with(argument(1).matching(constant()
.with(type().matching(type -> type instanceof CharType || type instanceof VarcharType))
Expand Down
Loading