Skip to content

Commit

Permalink
Fix when reading struct-type data without an id in iceberg-parquet, i…
Browse files Browse the repository at this point in the history
…t returns null values.
  • Loading branch information
tianzhu.wen committed Nov 25, 2024
1 parent cb1ad79 commit 73776cc
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public ParquetValueReader<?> struct(
if (fieldReader != null) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
int id = fieldType.getId().intValue();
int id = findIdFromStruct(expected, fieldType);
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
Expand Down Expand Up @@ -291,6 +291,13 @@ public ParquetValueReader<?> struct(
return createStructReader(types, reorderedFields, expected);
}

private int findIdFromStruct(Types.StructType expected, Type field) {
if (field.getId() != null) {
return field.getId().intValue();
}
return expected.field(field.getName()).fieldId();
}

@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final long targetRowGroupSize;
private final Map<String, String> metadata;
private final ParquetProperties props;
private final CodecFactory.BytesCompressor compressor;
private final CodecFactory.BytesInputCompressor compressor;
private final MessageType parquetSchema;
private final ParquetValueWriter<T> model;
private final MetricsConfig metricsConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,25 @@ private static <T> List<T> visitFields(
Types.StructType struct, GroupType group, TypeWithSchemaVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
for (Type field : group.getFields()) {
int id = -1;
if (field.getId() != null) {
id = field.getId().intValue();
}
Types.NestedField iField = (struct != null && id >= 0) ? struct.field(id) : null;
Types.NestedField iField = findField(struct, field);
results.add(visitField(iField, field, visitor));
}

return results;
}

private static Types.NestedField findField(Types.StructType struct, Type field) {
if (struct == null) {
return null;
}

if (field.getId() != null) {
return struct.field(field.getId().intValue());
}

return struct.field(field.getName());
}

public T message(Types.StructType iStruct, MessageType message, List<T> fields) {
return null;
}
Expand Down
102 changes: 102 additions & 0 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -259,4 +260,105 @@ private Pair<File, Long> generateFile(
records.toArray(new GenericData.Record[] {}));
return Pair.of(file, size);
}

@Test
public void testReadNestedStructWithoutId() throws IOException {
// Create a schema with nested struct type (with IDs)
Schema icebergSchema = new Schema(
Types.NestedField.required(1, "outer_struct",
Types.StructType.of(
Types.NestedField.optional(2, "middle_struct",
Types.StructType.of(
Types.NestedField.optional(3, "inner_struct",
Types.StructType.of(
Types.NestedField.optional(4, "value_field", Types.StringType.get())
))
))
))
);

// Create Avro schema without IDs (to simulate the scenario where Parquet data was written by non-Iceberg writer)
String avroSchemaStr = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"test_record\",\n" +
" \"fields\": [{\n" +
" \"name\": \"outer_struct\",\n" +
" \"type\": {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"outer_struct_type\",\n" +
" \"fields\": [{\n" +
" \"name\": \"middle_struct\",\n" +
" \"type\": [\"null\", {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"middle_struct_type\",\n" +
" \"fields\": [{\n" +
" \"name\": \"inner_struct\",\n" +
" \"type\": [\"null\", {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"inner_struct_type\",\n" +
" \"fields\": [{\n" +
" \"name\": \"value_field\",\n" +
" \"type\": [\"null\", \"string\"]\n" +
" }]\n" +
" }]\n" +
" }]\n" +
" }]\n" +
" }]\n" +
" }\n" +
" }]\n" +
"}";

org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(avroSchemaStr);

// Create test data using Avro schema (without IDs)
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
org.apache.avro.Schema outerSchema = avroSchema.getField("outer_struct").schema();
org.apache.avro.Schema middleSchema = outerSchema.getField("middle_struct").schema().getTypes().get(1);
org.apache.avro.Schema innerSchema = middleSchema.getField("inner_struct").schema().getTypes().get(1);

// Build the nested structure
GenericRecordBuilder innerBuilder = new GenericRecordBuilder(innerSchema);
innerBuilder.set("value_field", "test_value");

GenericRecordBuilder middleBuilder = new GenericRecordBuilder(middleSchema);
middleBuilder.set("inner_struct", innerBuilder.build());

GenericRecordBuilder outerBuilder = new GenericRecordBuilder(outerSchema);
outerBuilder.set("middle_struct", middleBuilder.build());

recordBuilder.set("outer_struct", outerBuilder.build());
GenericData.Record record = recordBuilder.build();

// Write test data to Parquet file using Avro schema (without IDs)
File file = createTempFile(temp);
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(
new org.apache.hadoop.fs.Path(file.toURI()))
.withSchema(avroSchema)
.withDataModel(GenericData.get())
.build()) {
writer.write(record);
}

// Read using Iceberg schema (with IDs) and verify
org.apache.iceberg.data.Record readRecord = Iterables.getOnlyElement(
Parquet.read(Files.localInput(file))
.project(icebergSchema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(icebergSchema, fileSchema))
.build());

// Verify the entire nested structure
org.apache.iceberg.data.Record readOuterStruct =
(org.apache.iceberg.data.Record) readRecord.get(0);
assertThat(readOuterStruct).isNotNull();

org.apache.iceberg.data.Record readMiddleStruct =
(org.apache.iceberg.data.Record) readOuterStruct.get(0);
assertThat(readMiddleStruct).isNotNull();

org.apache.iceberg.data.Record readInnerStruct =
(org.apache.iceberg.data.Record) readMiddleStruct.get(0);
assertThat(readInnerStruct).isNotNull();

assertThat(readInnerStruct.get(0).toString()).isEqualTo("test_value");
}
}

0 comments on commit 73776cc

Please sign in to comment.