Skip to content

Commit

Permalink
PARQUET-2992: Gate LocalTimestamp references in AvroSchemaConverter
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Aug 19, 2024
1 parent d4384d3 commit 7c2513d
Showing 1 changed file with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.Optional.of;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
import static org.apache.parquet.avro.AvroRecordConverter.getRuntimeAvroVersion;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
Expand Down Expand Up @@ -488,15 +489,22 @@ private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) {
return timeType(true, MICROS);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return timestampType(true, MILLIS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
return timestampType(false, MILLIS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return timestampType(true, MICROS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
return timestampType(false, MICROS);
} else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) {
return uuidType();
} else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) {
return uuidType();
}

if (avroVersionSupportsLocalTimestampTypes()) {
if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
return timestampType(false, MILLIS);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
return timestampType(false, MICROS);
}
}

return null;
}

Expand Down Expand Up @@ -538,7 +546,7 @@ public Optional<LogicalType> visit(
LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC();

if (isAdjustedToUTC) {
if (isAdjustedToUTC || !avroVersionSupportsLocalTimestampTypes()) {
switch (unit) {
case MILLIS:
return of(LogicalTypes.timestampMillis());
Expand Down Expand Up @@ -605,4 +613,14 @@ private static String namespace(String name, Map<String, Integer> names) {
Integer nameCount = names.merge(name, 1, (oldValue, value) -> oldValue + 1);
return nameCount > 1 ? name + nameCount : null;
}

/* Avro <= 1.9 does not support conversions to LocalTimestamp{Micros, Millis} classes */
private static boolean avroVersionSupportsLocalTimestampTypes() {
final String avroVersion = getRuntimeAvroVersion();

return avroVersion == null
|| !(avroVersion.startsWith("1.7.")
|| avroVersion.startsWith("1.8.")
|| avroVersion.startsWith("1.9."));
}
}

0 comments on commit 7c2513d

Please sign in to comment.