diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 38fd69393023..42beb674fbff 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -245,7 +245,7 @@ public ParquetValueReader struct( if (fieldReader != null) { Type fieldType = fields.get(i); int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; - int id = fieldType.getId().intValue(); + int id = findIdFromStruct(expected, fieldType); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); typesById.put(id, fieldType); if (idToConstant.containsKey(id)) { @@ -291,6 +291,13 @@ public ParquetValueReader struct( return createStructReader(types, reorderedFields, expected); } + private int findIdFromStruct(Types.StructType expected, Type field) { + if (field.getId() != null) { + return field.getId().intValue(); + } + return expected.field(field.getName()).fieldId(); + } + @Override public ParquetValueReader list( Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 099cffc33bb8..31ad16fabd02 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -49,7 +49,7 @@ class ParquetWriter implements FileAppender, Closeable { private final long targetRowGroupSize; private final Map metadata; private final ParquetProperties props; - private final CodecFactory.BytesCompressor compressor; + private final CodecFactory.BytesInputCompressor compressor; private final MessageType parquetSchema; private final ParquetValueWriter model; private final MetricsConfig metricsConfig; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java index e0c07d31755e..302d1b42d84a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java @@ -190,17 +190,25 @@ private static List visitFields( Types.StructType struct, GroupType group, TypeWithSchemaVisitor visitor) { List results = Lists.newArrayListWithExpectedSize(group.getFieldCount()); for (Type field : group.getFields()) { - int id = -1; - if (field.getId() != null) { - id = field.getId().intValue(); - } - Types.NestedField iField = (struct != null && id >= 0) ? struct.field(id) : null; + Types.NestedField iField = findField(struct, field); results.add(visitField(iField, field, visitor)); } return results; } + private static Types.NestedField findField(Types.StructType struct, Type field) { + if (struct == null) { + return null; + } + + if (field.getId() != null) { + return struct.field(field.getId().intValue()); + } + + return struct.field(field.getName()); + } + public T message(Types.StructType iStruct, MessageType message, List fields) { return null; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index ae0a822d3464..080b80128b8b 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -44,6 +44,7 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -259,4 +260,105 @@ private Pair generateFile( records.toArray(new GenericData.Record[] {})); return Pair.of(file, size); } + + @Test + public void testReadNestedStructWithoutId() throws IOException { + // Create a schema with nested struct type (with IDs) + Schema icebergSchema = new Schema( + Types.NestedField.required(1, "outer_struct", + Types.StructType.of( + Types.NestedField.optional(2, "middle_struct", + Types.StructType.of( + Types.NestedField.optional(3, "inner_struct", + Types.StructType.of( + Types.NestedField.optional(4, "value_field", Types.StringType.get()) + )) + )) + )) + ); + + // Create Avro schema without IDs (to simulate the scenario where Parquet data was written by non-Iceberg writer) + String avroSchemaStr = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"test_record\",\n" + + " \"fields\": [{\n" + + " \"name\": \"outer_struct\",\n" + + " \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"outer_struct_type\",\n" + + " \"fields\": [{\n" + + " \"name\": \"middle_struct\",\n" + + " \"type\": [\"null\", {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"middle_struct_type\",\n" + + " \"fields\": [{\n" + + " \"name\": \"inner_struct\",\n" + + " \"type\": [\"null\", {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"inner_struct_type\",\n" + + " \"fields\": [{\n" + + " \"name\": \"value_field\",\n" + + " \"type\": [\"null\", \"string\"]\n" + + " }]\n" + + " }]\n" + + " }]\n" + + " }]\n" + + " }]\n" + + " }\n" + + " }]\n" + + "}"; + + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(avroSchemaStr); + + // Create test data using Avro schema (without IDs) + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + org.apache.avro.Schema outerSchema = avroSchema.getField("outer_struct").schema(); + org.apache.avro.Schema middleSchema = outerSchema.getField("middle_struct").schema().getTypes().get(1); + org.apache.avro.Schema innerSchema = middleSchema.getField("inner_struct").schema().getTypes().get(1); + + // Build the nested structure + GenericRecordBuilder innerBuilder = new GenericRecordBuilder(innerSchema); + innerBuilder.set("value_field", "test_value"); + + GenericRecordBuilder middleBuilder = new GenericRecordBuilder(middleSchema); + middleBuilder.set("inner_struct", innerBuilder.build()); + + GenericRecordBuilder outerBuilder = new GenericRecordBuilder(outerSchema); + outerBuilder.set("middle_struct", middleBuilder.build()); + + recordBuilder.set("outer_struct", outerBuilder.build()); + GenericData.Record record = recordBuilder.build(); + + // Write test data to Parquet file using Avro schema (without IDs) + File file = createTempFile(temp); + try (ParquetWriter writer = AvroParquetWriter.builder( + new org.apache.hadoop.fs.Path(file.toURI())) + .withSchema(avroSchema) + .withDataModel(GenericData.get()) + .build()) { + writer.write(record); + } + + // Read using Iceberg schema (with IDs) and verify + org.apache.iceberg.data.Record readRecord = Iterables.getOnlyElement( + Parquet.read(Files.localInput(file)) + .project(icebergSchema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(icebergSchema, fileSchema)) + .build()); + + // Verify the entire nested structure + org.apache.iceberg.data.Record readOuterStruct = + (org.apache.iceberg.data.Record) readRecord.get(0); + assertThat(readOuterStruct).isNotNull(); + + org.apache.iceberg.data.Record readMiddleStruct = + (org.apache.iceberg.data.Record) readOuterStruct.get(0); + assertThat(readMiddleStruct).isNotNull(); + + org.apache.iceberg.data.Record readInnerStruct = + (org.apache.iceberg.data.Record) readMiddleStruct.get(0); + assertThat(readInnerStruct).isNotNull(); + + assertThat(readInnerStruct.get(0).toString()).isEqualTo("test_value"); + } }