Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for date, time_millis, and timestamp_millis to AvroColumnDecoder #13070

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.RowType.Field;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
Expand Down Expand Up @@ -65,7 +69,10 @@ public class AvroColumnDecoder
BigintType.BIGINT,
RealType.REAL,
DoubleType.DOUBLE,
VarbinaryType.VARBINARY);
VarbinaryType.VARBINARY,
DateType.DATE,
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
TimeType.TIME_MILLIS,
TimestampType.TIMESTAMP_MILLIS);

private final Type columnType;
private final String columnMapping;
Expand Down Expand Up @@ -185,6 +192,12 @@ public boolean getBoolean()
public long getLong()
{
if (value instanceof Long || value instanceof Integer) {
if (columnType == TimestampType.TIMESTAMP_MILLIS) {
return ((Number) value).longValue() * Timestamps.MICROSECONDS_PER_MILLISECOND;
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
}
if (columnType == TimeType.TIME_MILLIS) {
return ((Number) value).longValue() * Timestamps.PICOSECONDS_PER_MILLISECOND;
}
return ((Number) value).longValue();
}
if (value instanceof Float && columnType == RealType.REAL) {
Expand Down Expand Up @@ -275,7 +288,17 @@ private static void serializePrimitive(BlockBuilder blockBuilder, Object value,
return;
}

if ((value instanceof Integer || value instanceof Long) && (type instanceof BigintType || type instanceof IntegerType || type instanceof SmallintType || type instanceof TinyintType)) {
if (type == TimestampType.TIMESTAMP_MILLIS) {
type.writeLong(blockBuilder, ((Number) value).longValue() * Timestamps.MICROSECONDS_PER_MILLISECOND);
return;
}

if (type == TimeType.TIME_MILLIS) {
type.writeLong(blockBuilder, ((Number) value).longValue() * Timestamps.PICOSECONDS_PER_MILLISECOND);
return;
}

if ((value instanceof Integer || value instanceof Long) && (type instanceof BigintType || type instanceof IntegerType || type instanceof SmallintType || type instanceof TinyintType || type instanceof DateType)) {
type.writeLong(blockBuilder, ((Number) value).longValue());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import org.apache.avro.AvroTypeException;
Expand Down Expand Up @@ -64,10 +65,13 @@
import static io.trino.decoder.util.DecoderTestUtil.checkValue;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.TypeSignature.mapType;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
Expand Down Expand Up @@ -369,6 +373,14 @@ public void testSchemaEvolutionToIncompatibleType()
.hasMessageMatching("Decoding Avro record failed.");
}

@Test
public void testLongDecodedAsTimestampMillis()
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", TIMESTAMP_MILLIS, "id", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "id", "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}", 1658463479L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is the correct way to test it. Since we pass the value to be serialized as long - Avro serializes/de-serializes it as long but as per specification it looks like for Java we would pass the input as Instant for Timestamp logical type - register the conversion for GeneircRecord and serialize the data so when we de-serialize we would get them as Instant instead of long (we could do the conversion manually). Similar mapping/conversion is implemented for Java classes generated by Avro.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());

TypeConversion can be added using similar snippets.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avro serializes/de-serializes it as long but as per specification it looks like for Java we would pass the input as Instant for Timestamp logical type

Yes. To support many and many cases, there's a lot of work to do. I don't think one PR should handle all of it or you can try it out. I'd say the patch here can resolve the specific issue of bumping trino version for Pulsar Trino Plugin.

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());

What does it mean? If I get it right you suggest one more possible case avro timestamp can be represented as. Then my answer is as above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree but I'm afraid if this is the correct approach. One major thing which bugs me is that we manually do the conversion from a long to a micro-second precision compatible format - but ideally it should happen as a part of Avro library and it does support it. And one additional benefit is that in case if Avro modifies their internal representation it shouldn't affect us.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but ideally it should happen as a part of Avro library and it does support it

Can you show me how Avro "does support it"? I'm glad to adapt such a way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I'm looking into it this week :)

Copy link
Author

@tisonkun tisonkun Sep 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Praveen2112 after a review I think the conversion is inevitable because Trino has it's own representation, introduced by:

I take a look at the TimeConvertions, but it seems to be used on the fly data in-memory processing, not during deserialization. Pulsar Trino Plugin users meet a use case that the actual payload is int/long according to the logical type, and there's no Instant or LocalTime to be converted.

I don't have a good idea to make further revision on this codepath. You're welcome to show a patch.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any update on this patch?

cc @Praveen2112 @kokosing @hashhar

checkValue(decodedRow, row, 1658463479L * Timestamps.MICROSECONDS_PER_MILLISECOND);
}

@Test
public void testLongDecodedAsBigint()
{
Expand All @@ -378,6 +390,24 @@ public void testLongDecodedAsBigint()
checkValue(decodedRow, row, 493857959588286460L);
}

@Test
public void testIntDecodedAsDate()
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", DATE, "id", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "id", "{\"type\":\"int\",\"logicalType\":\"date\"}", 100);

checkValue(decodedRow, row, 100);
}

@Test
public void testIntDecodedAsTimeMillis()
{
DecoderTestColumnHandle row = new DecoderTestColumnHandle(0, "row", TIME_MILLIS, "id", null, null, false, false, false);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = buildAndDecodeColumn(row, "id", "{\"type\":\"int\",\"logicalType\":\"time-millis\"}", 100);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about for time-micros ? I guess it is also an valid Avro logical type ? Similarly for timestamp

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can work. For the original purpose, I prepare this patch only to adapt Pulsar Trino Plugin's use case. Since the review process goes so long now, I'd prefer after a good result on the mills part and I may continue the micros part. Otherwise, it's extending the scope and I don't see an end.

BTW, Trino itself handles layout changes of TimestampType and TimeType in different PRs, so I think it's not a requirement we must do it at once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree we can have different PRs for a dedicated datatype. But is there a way to differentiate between if the data type is of time-micros or time-mills. If a datatype is annotated with a LogicalType time-micro how do we restrict decoding that datatype ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the convertor's responsibility to pass the correct Trino type. AvroColumnDecoder accepts the Trino type the caller passes to it.


checkValue(decodedRow, row, 100L * Timestamps.PICOSECONDS_PER_MILLISECOND);
}

@Test
public void testIntDecodedAsBigint()
{
Expand Down Expand Up @@ -1200,6 +1230,9 @@ public void testSupportedDataTypeValidation()
singleColumnDecoder(new ArrayType(BigintType.BIGINT));
singleColumnDecoder(VARCHAR_MAP_TYPE);
singleColumnDecoder(DOUBLE_MAP_TYPE);
singleColumnDecoder(DATE);
singleColumnDecoder(TIME_MILLIS);
singleColumnDecoder(TIMESTAMP_MILLIS);

// some unsupported types
assertUnsupportedColumnTypeException(() -> singleColumnDecoder(DecimalType.createDecimalType(10, 4)));
Expand Down