From 54dab35aa07d20546acb93e20b2c7c904000e2cf Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 3 Jul 2022 21:24:28 +0800 Subject: [PATCH] Add support for date, time_millis, and timestamp_millis to AvroColumnDecoder squash. handle exactly time_millis and timestamp_millis type squash. test avro decoder squash. test avro schema converter squash. handle DateType when serialize record squash. fix style and revert trino-kafka changes squash. revert all trino-kafka changes squash. Update lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/AvroColumnDecoder.java squash. pass logical type in tests Signed-off-by: tison --- .../trino/decoder/avro/AvroColumnDecoder.java | 27 +++++++++++++-- .../trino/decoder/avro/TestAvroDecoder.java | 33 +++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/AvroColumnDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/AvroColumnDecoder.java index f276a52606463..b6903244e7de6 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/AvroColumnDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/AvroColumnDecoder.java @@ -25,6 +25,7 @@ import io.trino.spi.type.ArrayType; import io.trino.spi.type.BigintType; import io.trino.spi.type.BooleanType; +import io.trino.spi.type.DateType; import io.trino.spi.type.DoubleType; import io.trino.spi.type.IntegerType; import io.trino.spi.type.MapType; @@ -32,6 +33,9 @@ import io.trino.spi.type.RowType; import io.trino.spi.type.RowType.Field; import io.trino.spi.type.SmallintType; +import io.trino.spi.type.TimeType; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.Timestamps; import io.trino.spi.type.TinyintType; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; @@ -65,7 +69,10 @@ public class AvroColumnDecoder BigintType.BIGINT, RealType.REAL, DoubleType.DOUBLE, - VarbinaryType.VARBINARY); + VarbinaryType.VARBINARY, + DateType.DATE, + TimeType.TIME_MILLIS, + TimestampType.TIMESTAMP_MILLIS); private final Type columnType; private final String columnMapping; @@ -185,6 +192,12 @@ public boolean getBoolean() public long getLong() { if (value instanceof Long || value instanceof Integer) { + if (columnType == TimestampType.TIMESTAMP_MILLIS) { + return ((Number) value).longValue() * Timestamps.MICROSECONDS_PER_MILLISECOND; + } + if (columnType == TimeType.TIME_MILLIS) { + return ((Number) value).longValue() * Timestamps.PICOSECONDS_PER_MILLISECOND; + } return ((Number) value).longValue(); } if (value instanceof Float && columnType == RealType.REAL) { @@ -275,7 +288,17 @@ private static void serializePrimitive(BlockBuilder blockBuilder, Object value, return; } - if ((value instanceof Integer || value instanceof Long) && (type instanceof BigintType || type instanceof IntegerType || type instanceof SmallintType || type instanceof TinyintType)) { + if (type == TimestampType.TIMESTAMP_MILLIS) { + type.writeLong(blockBuilder, ((Number) value).longValue() * Timestamps.MICROSECONDS_PER_MILLISECOND); + return; + } + + if (type == TimeType.TIME_MILLIS) { + type.writeLong(blockBuilder, ((Number) value).longValue() * Timestamps.PICOSECONDS_PER_MILLISECOND); + return; + } + + if ((value instanceof Integer || value instanceof Long) && (type instanceof BigintType || type instanceof IntegerType || type instanceof SmallintType || type instanceof TinyintType || type instanceof DateType)) { type.writeLong(blockBuilder, ((Number) value).longValue()); return; } diff --git a/lib/trino-record-decoder/src/test/java/io/trino/decoder/avro/TestAvroDecoder.java b/lib/trino-record-decoder/src/test/java/io/trino/decoder/avro/TestAvroDecoder.java index 914fad9a87d0e..fa0193f97462f 100644 --- a/lib/trino-record-decoder/src/test/java/io/trino/decoder/avro/TestAvroDecoder.java +++ b/lib/trino-record-decoder/src/test/java/io/trino/decoder/avro/TestAvroDecoder.java @@ -29,6 +29,7 @@ import io.trino.spi.type.DecimalType; import io.trino.spi.type.DoubleType; import io.trino.spi.type.RowType; +import io.trino.spi.type.Timestamps; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; import org.apache.avro.AvroTypeException; @@ -64,10 +65,13 @@ import static io.trino.decoder.util.DecoderTestUtil.checkValue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimeType.TIME_MILLIS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.TypeSignature.mapType; import static io.trino.spi.type.VarbinaryType.VARBINARY; @@ -369,6 +373,14 @@ public void testSchemaEvolutionToIncompatibleType() .hasMessageMatching("Decoding Avro record failed."); } + @Test + public void testLongDecodedAsTimestampMillis() + { + DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", TIMESTAMP_MILLIS, "id", null, null, false, false, false); + Map decodedRow = buildAndDecodeColumn(row, "id", "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}", 1658463479L); + checkValue(decodedRow, row, 1658463479L * Timestamps.MICROSECONDS_PER_MILLISECOND); + } + @Test public void testLongDecodedAsBigint() { @@ -378,6 +390,24 @@ public void testLongDecodedAsBigint() checkValue(decodedRow, row, 493857959588286460L); } + @Test + public void testIntDecodedAsDate() + { + DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", DATE, "id", null, null, false, false, false); + Map decodedRow = buildAndDecodeColumn(row, "id", "{\"type\":\"int\",\"logicalType\":\"date\"}", 100); + + checkValue(decodedRow, row, 100); + } + + @Test + public void testIntDecodedAsTimeMillis() + { + DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", TIME_MILLIS, "id", null, null, false, false, false); + Map decodedRow = buildAndDecodeColumn(row, "id", "{\"type\":\"int\",\"logicalType\":\"time-millis\"}", 100); + + checkValue(decodedRow, row, 100L * Timestamps.PICOSECONDS_PER_MILLISECOND); + } + @Test public void testIntDecodedAsBigint() { @@ -1200,6 +1230,9 @@ public void testSupportedDataTypeValidation() singleColumnDecoder(new ArrayType(BigintType.BIGINT)); singleColumnDecoder(VARCHAR_MAP_TYPE); singleColumnDecoder(DOUBLE_MAP_TYPE); + singleColumnDecoder(DATE); + singleColumnDecoder(TIME_MILLIS); + singleColumnDecoder(TIMESTAMP_MILLIS); // some unsupported types assertUnsupportedColumnTypeException(() -> singleColumnDecoder(DecimalType.createDecimalType(10, 4)));