Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PARQUET-2450: Fix Avro projection for single-field repeated record types
Browse files Browse the repository at this point in the history
clairemcginty committed Mar 14, 2024
1 parent 4d53f98 commit ddef7f7
Showing 3 changed files with 101 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -930,6 +930,7 @@ public void end() {
static boolean isElementType(Type repeatedType, Schema elementSchema) {
if (repeatedType.isPrimitive()
|| repeatedType.asGroupType().getFieldCount() > 1
|| repeatedType.getName().equals("array")
|| repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
// The repeated type must be the element type because it is an invalid
// synthetic wrapper. Must be a group with one optional or required field
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import org.apache.parquet.DirectWriterTest;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -603,7 +604,7 @@ public void testAvroCompatOptionalGroupInList() throws Exception {
public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
Path test = writeDirect(
"message AvroCompatOptionalGroupInListWithSchema {" + " optional group locations (LIST) {"
+ " repeated group array {"
+ " repeated group list {"
+ " optional group element {"
+ " required double latitude;"
+ " required double longitude;"
@@ -616,7 +617,7 @@ public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
rc.startField("locations", 0);

rc.startGroup();
rc.startField("array", 0); // start writing array contents
rc.startField("list", 0); // start writing array contents

// write a non-null element
rc.startGroup(); // array level
@@ -1103,6 +1104,68 @@ public void testListOfSingleElementStructsWithElementField() throws Exception {
assertReaderContains(newBehaviorReader(test, newDoubleSchema), newDoubleSchema, newDoubleRecord);
}

@Test
public void testIsElementTypeRequiredRepeatedRecord() throws Exception {
// Test `_tuple` style naming
MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
+ " required group list_field (LIST) {\n"
+ " repeated group list_field_tuple (LIST) {\n"
+ " repeated int32 int_field;\n"
+ " }\n"
+ " }\n"
+ "}");
Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);

Assert.assertTrue(AvroRecordConverter.isElementType(
parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
avroSchema.getFields().get(0).schema()));

// Test `array` style naming
parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
+ " required group list_field (LIST) {\n"
+ " repeated group array {\n"
+ " required int32 a;\n"
+ " }\n"
+ " }\n"
+ "}");
avroSchema = new AvroSchemaConverter().convert(parquetSchema);

Assert.assertTrue(AvroRecordConverter.isElementType(
parquetSchema.getType("list_field"),
avroSchema.getFields().get(0).schema()));
}

@Test
public void testIsElementTypeOptionalRepeatedRecord() throws Exception {
// Test `_tuple` style naming
MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
+ " optional group list_field (LIST) {\n"
+ " repeated group list_field_tuple (LIST) {\n"
+ " repeated int32 int_field;\n"
+ " }\n"
+ " }\n"
+ "}");
Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);

Assert.assertTrue(AvroRecordConverter.isElementType(
parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
avroSchema.getFields().get(0).schema()));

// Test `array` style naming
parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n"
+ " optional group list_field (LIST) {\n"
+ " repeated group array {\n"
+ " required int32 a;\n"
+ " }\n"
+ " }\n"
+ "}");
avroSchema = new AvroSchemaConverter().convert(parquetSchema);

Assert.assertTrue(AvroRecordConverter.isElementType(
parquetSchema.getType("list_field"),
avroSchema.getFields().get(0).schema()));
}

public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(Path path) throws IOException {
return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path);
}
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
@@ -232,6 +233,40 @@ public void testProjection() throws IOException {
}
}

@Test
public void testRepeatedRecordProjection() throws IOException {
Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);
Configuration conf = new Configuration(testConf);
Schema schema = Car.getClassSchema();

// Project a single field from repeated record schema
final Schema projectedSchema = SchemaBuilder.builder(schema.getNamespace())
.record("Car")
.fields()
.name("serviceHistory")
.type(SchemaBuilder.unionOf()
.nullBuilder()
.endNull()
.and()
.array()
.items(SchemaBuilder.builder(schema.getNamespace())
.record("Service")
.fields()
.requiredString("mechanic")
.endRecord())
.endUnion())
.noDefault()
.endRecord();

AvroReadSupport.setRequestedProjection(conf, projectedSchema);

try (ParquetReader<Car> reader = new AvroParquetReader<>(conf, path)) {
for (Car car = reader.read(); car != null; car = reader.read()) {
assertNotNull(car.getServiceHistory());
}
}
}

@Test
public void testAvroReadSchema() throws IOException {
Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);

0 comments on commit ddef7f7

Please sign in to comment.