Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2315: Expose local timestamp millis and micro to schema converter #1115

Merged
merged 1 commit into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,12 @@ private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) {
return timeType(true, MICROS);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return timestampType(true, MILLIS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
return timestampType(false, MILLIS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return timestampType(true, MICROS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
return timestampType(false, MICROS);
} else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) {
return uuidType();
}
Expand Down Expand Up @@ -494,13 +498,25 @@ public Optional<LogicalType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotati
@Override
public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
switch (unit) {
case MILLIS:
return of(LogicalTypes.timestampMillis());
case MICROS:
return of(LogicalTypes.timestampMicros());
boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC();

if (isAdjustedToUTC) {
switch (unit) {
case MILLIS:
return of(LogicalTypes.timestampMillis());
case MICROS:
return of(LogicalTypes.timestampMicros());
}
return empty();
} else {
switch (unit) {
case MILLIS:
return of(LogicalTypes.localTimestampMillis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this change does break compatibility with Avro 1.8 and 1.9 -- LogicalTypes#localTimestamp{Millis, Micros} weren't introduced until Avro 1.10 and are not present in earlier versions.

Maybe we could gate this change around a check of Avro runtime version, like we do here?
cc @wgtmac

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good!

case MICROS:
return of(LogicalTypes.localTimestampMicros());
}
return empty();
}
return empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,31 @@ public void testTimestampMillisType() throws Exception {
}
}

@Test
public void testLocalTimestampMillisType() throws Exception {
Schema date = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(LONG));
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field("timestamp", date, null, null)));

testRoundTripConversion(expected,
"message myrecord {\n" +
" required int64 timestamp (TIMESTAMP(MILLIS,false));\n" +
"}\n");

for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
{INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
final PrimitiveType type;
if (primitive == FIXED_LEN_BYTE_ARRAY) {
type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS);
} else {
type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS);
}

assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}

@Test
public void testTimestampMicrosType() throws Exception {
Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG));
Expand All @@ -737,6 +762,31 @@ public void testTimestampMicrosType() throws Exception {
}
}

@Test
public void testLocalTimestampMicrosType() throws Exception {
Schema date = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(LONG));
Schema expected = Schema.createRecord("myrecord", null, null, false,
Arrays.asList(new Schema.Field("timestamp", date, null, null)));

testRoundTripConversion(expected,
"message myrecord {\n" +
" required int64 timestamp (TIMESTAMP(MICROS,false));\n" +
"}\n");

for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
{INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
final PrimitiveType type;
if (primitive == FIXED_LEN_BYTE_ARRAY) {
type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS);
} else {
type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS);
}

assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type)));
}
}

@Test
public void testReuseNameInNestedStructure() throws Exception {
Schema innerA1 = record("a1", "a12",
Expand Down