From 669cea5ef0f81e6b0ed71aa242d40415ee257f09 Mon Sep 17 00:00:00 2001 From: edigonzales Date: Fri, 16 Jun 2023 20:14:20 +0200 Subject: [PATCH] expose local timestamp millis and micro to schema converter --- .../parquet/avro/AvroSchemaConverter.java | 28 ++++++++--- .../parquet/avro/TestAvroSchemaConverter.java | 50 +++++++++++++++++++ 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 7d1f3cab9f..0314bcd71a 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -456,8 +456,12 @@ private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) { return timeType(true, MICROS); } else if (logicalType instanceof LogicalTypes.TimestampMillis) { return timestampType(true, MILLIS); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) { + return timestampType(false, MILLIS); } else if (logicalType instanceof LogicalTypes.TimestampMicros) { return timestampType(true, MICROS); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) { + return timestampType(false, MICROS); } else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) { return uuidType(); } @@ -494,13 +498,25 @@ public Optional visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotati @Override public Optional visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit(); - switch (unit) { - case MILLIS: - return of(LogicalTypes.timestampMillis()); - case MICROS: - return of(LogicalTypes.timestampMicros()); + boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC(); + + if (isAdjustedToUTC) { + switch (unit) { + case MILLIS: + return of(LogicalTypes.timestampMillis()); + case MICROS: + return of(LogicalTypes.timestampMicros()); + } + return empty(); + } else { + switch (unit) { + case MILLIS: + return of(LogicalTypes.localTimestampMillis()); + case MICROS: + return of(LogicalTypes.localTimestampMicros()); + } + return empty(); } - return empty(); } @Override diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index 1bafdec1e3..642c48ca23 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -712,6 +712,31 @@ public void testTimestampMillisType() throws Exception { } } + @Test + public void testLocalTimestampMillisType() throws Exception { + Schema date = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(LONG)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("timestamp", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int64 timestamp (TIMESTAMP(MILLIS,false));\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS); + } + + assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive, + IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type))); + } + } + @Test public void testTimestampMicrosType() throws Exception { Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG)); @@ -737,6 +762,31 @@ public void testTimestampMicrosType() throws Exception { } } + @Test + public void testLocalTimestampMicrosType() throws Exception { + Schema date = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(LONG)); + Schema expected = Schema.createRecord("myrecord", null, null, false, + Arrays.asList(new Schema.Field("timestamp", date, null, null))); + + testRoundTripConversion(expected, + "message myrecord {\n" + + " required int64 timestamp (TIMESTAMP(MICROS,false));\n" + + "}\n"); + + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] + {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { + final PrimitiveType type; + if (primitive == FIXED_LEN_BYTE_ARRAY) { + type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS); + } else { + type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS); + } + + assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive, + IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type))); + } + } + @Test public void testReuseNameInNestedStructure() throws Exception { Schema innerA1 = record("a1", "a12",