From ddef7f7786cc2336e609847ace5a02c1d5faf7d7 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 14 Mar 2024 13:47:51 -0400 Subject: [PATCH] PARQUET-2450: Fix Avro projection for single-field repeated record types --- .../parquet/avro/AvroRecordConverter.java | 1 + .../parquet/avro/TestArrayCompatibility.java | 67 ++++++++++++++++++- .../parquet/avro/TestSpecificReadWrite.java | 35 ++++++++++ 3 files changed, 101 insertions(+), 2 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 87325a0af8..62d1f89fd0 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -930,6 +930,7 @@ public void end() { static boolean isElementType(Type repeatedType, Schema elementSchema) { if (repeatedType.isPrimitive() || repeatedType.asGroupType().getFieldCount() > 1 + || repeatedType.getName().equals("array") || repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) { // The repeated type must be the element type because it is an invalid // synthetic wrapper. Must be a group with one optional or required field diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java index b4b5433402..cb3540cb45 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java @@ -38,6 +38,7 @@ import org.apache.parquet.DirectWriterTest; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; @@ -603,7 +604,7 @@ public void testAvroCompatOptionalGroupInList() throws Exception { public void testAvroCompatOptionalGroupInListWithSchema() throws Exception { Path test = writeDirect( "message AvroCompatOptionalGroupInListWithSchema {" + " optional group locations (LIST) {" - + " repeated group array {" + + " repeated group list {" + " optional group element {" + " required double latitude;" + " required double longitude;" @@ -616,7 +617,7 @@ public void testAvroCompatOptionalGroupInListWithSchema() throws Exception { rc.startField("locations", 0); rc.startGroup(); - rc.startField("array", 0); // start writing array contents + rc.startField("list", 0); // start writing array contents // write a non-null element rc.startGroup(); // array level @@ -1103,6 +1104,68 @@ public void testListOfSingleElementStructsWithElementField() throws Exception { assertReaderContains(newBehaviorReader(test, newDoubleSchema), newDoubleSchema, newDoubleRecord); } + @Test + public void testIsElementTypeRequiredRepeatedRecord() throws Exception { + // Test `_tuple` style naming + MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" + + " required group list_field (LIST) {\n" + + " repeated group list_field_tuple (LIST) {\n" + + " repeated int32 int_field;\n" + + " }\n" + + " }\n" + + "}"); + Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema); + + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"), + avroSchema.getFields().get(0).schema())); + + // Test `array` style naming + parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" + + " required group list_field (LIST) {\n" + + " repeated group array {\n" + + " required int32 a;\n" + + " }\n" + + " }\n" + + "}"); + avroSchema = new AvroSchemaConverter().convert(parquetSchema); + + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list_field"), + avroSchema.getFields().get(0).schema())); + } + + @Test + public void testIsElementTypeOptionalRepeatedRecord() throws Exception { + // Test `_tuple` style naming + MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" + + " optional group list_field (LIST) {\n" + + " repeated group list_field_tuple (LIST) {\n" + + " repeated int32 int_field;\n" + + " }\n" + + " }\n" + + "}"); + Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema); + + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"), + avroSchema.getFields().get(0).schema())); + + // Test `array` style naming + parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" + + " optional group list_field (LIST) {\n" + + " repeated group array {\n" + + " required int32 a;\n" + + " }\n" + + " }\n" + + "}"); + avroSchema = new AvroSchemaConverter().convert(parquetSchema); + + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list_field"), + avroSchema.getFields().get(0).schema())); + } + public AvroParquetReader oldBehaviorReader(Path path) throws IOException { return new AvroParquetReader(OLD_BEHAVIOR_CONF, path); } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java index c7874fe536..5dde12d301 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java @@ -38,6 +38,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetReader; @@ -232,6 +233,40 @@ public void testProjection() throws IOException { } } + @Test + public void testRepeatedRecordProjection() throws IOException { + Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false); + Configuration conf = new Configuration(testConf); + Schema schema = Car.getClassSchema(); + + // Project a single field from repeated record schema + final Schema projectedSchema = SchemaBuilder.builder(schema.getNamespace()) + .record("Car") + .fields() + .name("serviceHistory") + .type(SchemaBuilder.unionOf() + .nullBuilder() + .endNull() + .and() + .array() + .items(SchemaBuilder.builder(schema.getNamespace()) + .record("Service") + .fields() + .requiredString("mechanic") + .endRecord()) + .endUnion()) + .noDefault() + .endRecord(); + + AvroReadSupport.setRequestedProjection(conf, projectedSchema); + + try (ParquetReader reader = new AvroParquetReader<>(conf, path)) { + for (Car car = reader.read(); car != null; car = reader.read()) { + assertNotNull(car.getServiceHistory()); + } + } + } + @Test public void testAvroReadSchema() throws IOException { Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);