Skip to content

Commit

Permalink
Fix when reading struct-type data without an id in iceberg-parquet, i…
Browse files Browse the repository at this point in the history
…t returns null values.
  • Loading branch information
tianzhu.wen committed Nov 25, 2024
1 parent cb1ad79 commit 3bc6636
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public ParquetValueReader<?> struct(
if (fieldReader != null) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
int id = fieldType.getId().intValue();
int id = findIdFromStruct(expected, fieldType);
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
Expand Down Expand Up @@ -291,6 +291,13 @@ public ParquetValueReader<?> struct(
return createStructReader(types, reorderedFields, expected);
}

private int findIdFromStruct(Types.StructType expected, Type field) {
if (field.getId() != null) {
return field.getId().intValue();
}
return expected.field(field.getName()).fieldId();
}

@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final long targetRowGroupSize;
private final Map<String, String> metadata;
private final ParquetProperties props;
private final CodecFactory.BytesCompressor compressor;
private final CodecFactory.BytesInputCompressor compressor;
private final MessageType parquetSchema;
private final ParquetValueWriter<T> model;
private final MetricsConfig metricsConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,25 @@ private static <T> List<T> visitFields(
Types.StructType struct, GroupType group, TypeWithSchemaVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
for (Type field : group.getFields()) {
int id = -1;
if (field.getId() != null) {
id = field.getId().intValue();
}
Types.NestedField iField = (struct != null && id >= 0) ? struct.field(id) : null;
Types.NestedField iField = findField(struct, field);
results.add(visitField(iField, field, visitor));
}

return results;
}

private static Types.NestedField findField(Types.StructType struct, Type field) {
if (struct == null) {
return null;
}

if (field.getId() != null) {
return struct.field(field.getId().intValue());
}

return struct.field(field.getName());
}

public T message(Types.StructType iStruct, MessageType message, List<T> fields) {
return null;
}
Expand Down
144 changes: 144 additions & 0 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -259,4 +262,145 @@ private Pair<File, Long> generateFile(
records.toArray(new GenericData.Record[] {}));
return Pair.of(file, size);
}

@Test
public void testReadNestedStructWithoutId() throws IOException {
Schema icebergSchema =
new Schema(
Types.NestedField.required(
1,
"outer_struct",
Types.StructType.of(
Types.NestedField.optional(
2,
"middle_struct",
Types.StructType.of(
Types.NestedField.optional(
3,
"inner_struct",
Types.StructType.of(
Types.NestedField.optional(
4, "value_field", Types.StringType.get()))))))));

// Create Avro schema without IDs.
org.apache.avro.Schema avroSchema = createAvroSchemaWithoutIds();

// Write test data to Parquet file.
File file = createTempFile(temp);
writeParquetFile(file, avroSchema);

// Read and verify the data.
try (CloseableIterable<Record> reader =
Parquet.read(Files.localInput(file))
.project(icebergSchema)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(icebergSchema, fileSchema))
.build()) {

org.apache.iceberg.data.Record readRecord = Iterables.getOnlyElement(reader);
verifyNestedStructData(readRecord);
}
}

private org.apache.avro.Schema createAvroSchemaWithoutIds() {
org.apache.avro.Schema innerStructSchema =
org.apache.avro.Schema.createRecord("inner_struct_type", null, null, false);
innerStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"value_field",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING))),
null,
org.apache.avro.JsonProperties.NULL_VALUE
)));

org.apache.avro.Schema middleStructSchema =
org.apache.avro.Schema.createRecord("middle_struct_type", null, null, false);
middleStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"inner_struct",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
innerStructSchema)),
null,
org.apache.avro.JsonProperties.NULL_VALUE)));

org.apache.avro.Schema outerStructSchema =
org.apache.avro.Schema.createRecord("outer_struct_type", null, null, false);
outerStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"middle_struct",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
middleStructSchema)),
null,
org.apache.avro.JsonProperties.NULL_VALUE)));

org.apache.avro.Schema recordSchema =
org.apache.avro.Schema.createRecord("test_record", null, null, false);
recordSchema.setFields(
List.of(new org.apache.avro.Schema.Field("outer_struct", outerStructSchema, null, null)));

return recordSchema;
}

private void writeParquetFile(File file, org.apache.avro.Schema avroSchema) throws IOException {
// Create test data.
GenericData.Record record = createNestedRecord(avroSchema);

// Write to Parquet file.
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(file.toURI()))
.withSchema(avroSchema)
.withDataModel(GenericData.get())
.build()) {
writer.write(record);
}
}

private GenericData.Record createNestedRecord(org.apache.avro.Schema avroSchema) {
org.apache.avro.Schema outerSchema = avroSchema.getField("outer_struct").schema();
org.apache.avro.Schema middleSchema =
outerSchema.getField("middle_struct").schema().getTypes().get(1);
org.apache.avro.Schema innerSchema =
middleSchema.getField("inner_struct").schema().getTypes().get(1);

GenericRecordBuilder innerBuilder = new GenericRecordBuilder(innerSchema);
innerBuilder.set("value_field", "test_value");

GenericRecordBuilder middleBuilder = new GenericRecordBuilder(middleSchema);
middleBuilder.set("inner_struct", innerBuilder.build());

GenericRecordBuilder outerBuilder = new GenericRecordBuilder(outerSchema);
outerBuilder.set("middle_struct", middleBuilder.build());

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
recordBuilder.set("outer_struct", outerBuilder.build());

return recordBuilder.build();
}

private void verifyNestedStructData(org.apache.iceberg.data.Record record) {
org.apache.iceberg.data.Record outerStruct = (org.apache.iceberg.data.Record) record.get(0);
assertThat(outerStruct).isNotNull().withFailMessage("Outer struct should not be null");

org.apache.iceberg.data.Record middleStruct =
(org.apache.iceberg.data.Record) outerStruct.get(0);
assertThat(middleStruct).isNotNull().withFailMessage("Middle struct should not be null");

org.apache.iceberg.data.Record innerStruct =
(org.apache.iceberg.data.Record) middleStruct.get(0);
assertThat(innerStruct).isNotNull().withFailMessage("Inner struct should not be null");

assertThat(innerStruct.get(0).toString())
.isEqualTo("test_value")
.withFailMessage("Inner value field should match expected value");
}
}

0 comments on commit 3bc6636

Please sign in to comment.