Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix when reading struct-type data without an id in iceberg-parquet #11378

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public ParquetValueReader<?> struct(
if (fieldReader != null) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
int id = fieldType.getId().intValue();
int id = findIdFromStruct(expected, fieldType);
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
Expand Down Expand Up @@ -291,6 +291,13 @@ public ParquetValueReader<?> struct(
return createStructReader(types, reorderedFields, expected);
}

private int findIdFromStruct(Types.StructType expected, Type field) {
if (field.getId() != null) {
return field.getId().intValue();
}
return expected.field(field.getName()).fieldId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially falling back to name-based schema resolution.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the field-ids are null, how can I find the expected field?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also left a comment on the issue itself #11214 (comment). I don't think this is a good solution since it hides the problem, as mentioned before, if you rename a struct field, it will break the table. Instead, I would leverage name mapping as suggested on the issue.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments, I will reconsider and try to re-address the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any time, thank you for the PR, but I'm afraid that it will cause issues downstream

}

@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final long targetRowGroupSize;
private final Map<String, String> metadata;
private final ParquetProperties props;
private final CodecFactory.BytesCompressor compressor;
private final CodecFactory.BytesInputCompressor compressor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be an unrelated change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will remove it.

private final MessageType parquetSchema;
private final ParquetValueWriter<T> model;
private final MetricsConfig metricsConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,25 @@ private static <T> List<T> visitFields(
Types.StructType struct, GroupType group, TypeWithSchemaVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
for (Type field : group.getFields()) {
int id = -1;
if (field.getId() != null) {
id = field.getId().intValue();
}
Types.NestedField iField = (struct != null && id >= 0) ? struct.field(id) : null;
Types.NestedField iField = findField(struct, field);
results.add(visitField(iField, field, visitor));
}

return results;
}

private static Types.NestedField findField(Types.StructType struct, Type field) {
if (struct == null) {
return null;
}

if (field.getId() != null) {
return struct.field(field.getId().intValue());
}

return struct.field(field.getName());
}

public T message(Types.StructType iStruct, MessageType message, List<T> fields) {
return null;
}
Expand Down
143 changes: 143 additions & 0 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -259,4 +262,144 @@ private Pair<File, Long> generateFile(
records.toArray(new GenericData.Record[] {}));
return Pair.of(file, size);
}

@Test
public void testReadNestedStructWithoutId() throws IOException {
Schema icebergSchema =
new Schema(
Types.NestedField.required(
1,
"outer_struct",
Types.StructType.of(
Types.NestedField.optional(
2,
"middle_struct",
Types.StructType.of(
Types.NestedField.optional(
3,
"inner_struct",
Types.StructType.of(
Types.NestedField.optional(
4, "value_field", Types.StringType.get()))))))));

// Create Avro schema without IDs.
org.apache.avro.Schema avroSchema = createAvroSchemaWithoutIds();

// Write test data to Parquet file.
File file = createTempFile(temp);
writeParquetFile(file, avroSchema);

// Read and verify the data.
try (CloseableIterable<Record> reader =
Parquet.read(Files.localInput(file))
.project(icebergSchema)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(icebergSchema, fileSchema))
.build()) {

org.apache.iceberg.data.Record readRecord = Iterables.getOnlyElement(reader);
verifyNestedStructData(readRecord);
}
}

private org.apache.avro.Schema createAvroSchemaWithoutIds() {
org.apache.avro.Schema innerStructSchema =
org.apache.avro.Schema.createRecord("inner_struct_type", null, null, false);
innerStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"value_field",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING))),
null,
org.apache.avro.JsonProperties.NULL_VALUE)));

org.apache.avro.Schema middleStructSchema =
org.apache.avro.Schema.createRecord("middle_struct_type", null, null, false);
middleStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"inner_struct",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
innerStructSchema)),
null,
org.apache.avro.JsonProperties.NULL_VALUE)));

org.apache.avro.Schema outerStructSchema =
org.apache.avro.Schema.createRecord("outer_struct_type", null, null, false);
outerStructSchema.setFields(
List.of(
new org.apache.avro.Schema.Field(
"middle_struct",
org.apache.avro.Schema.createUnion(
List.of(
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.NULL),
middleStructSchema)),
null,
org.apache.avro.JsonProperties.NULL_VALUE)));

org.apache.avro.Schema recordSchema =
org.apache.avro.Schema.createRecord("test_record", null, null, false);
recordSchema.setFields(
List.of(new org.apache.avro.Schema.Field("outer_struct", outerStructSchema, null, null)));

return recordSchema;
}

private void writeParquetFile(File file, org.apache.avro.Schema avroSchema) throws IOException {
// Create test data.
GenericData.Record record = createNestedRecord(avroSchema);

// Write to Parquet file.
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new org.apache.hadoop.fs.Path(file.toURI()))
.withSchema(avroSchema)
.withDataModel(GenericData.get())
.build()) {
writer.write(record);
}
}

private GenericData.Record createNestedRecord(org.apache.avro.Schema avroSchema) {
org.apache.avro.Schema outerSchema = avroSchema.getField("outer_struct").schema();
org.apache.avro.Schema middleSchema =
outerSchema.getField("middle_struct").schema().getTypes().get(1);
org.apache.avro.Schema innerSchema =
middleSchema.getField("inner_struct").schema().getTypes().get(1);

GenericRecordBuilder innerBuilder = new GenericRecordBuilder(innerSchema);
innerBuilder.set("value_field", "test_value");

GenericRecordBuilder middleBuilder = new GenericRecordBuilder(middleSchema);
middleBuilder.set("inner_struct", innerBuilder.build());

GenericRecordBuilder outerBuilder = new GenericRecordBuilder(outerSchema);
outerBuilder.set("middle_struct", middleBuilder.build());

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
recordBuilder.set("outer_struct", outerBuilder.build());

return recordBuilder.build();
}

private void verifyNestedStructData(org.apache.iceberg.data.Record record) {
org.apache.iceberg.data.Record outerStruct = (org.apache.iceberg.data.Record) record.get(0);
assertThat(outerStruct).isNotNull().withFailMessage("Outer struct should not be null");

org.apache.iceberg.data.Record middleStruct =
(org.apache.iceberg.data.Record) outerStruct.get(0);
assertThat(middleStruct).isNotNull().withFailMessage("Middle struct should not be null");

org.apache.iceberg.data.Record innerStruct =
(org.apache.iceberg.data.Record) middleStruct.get(0);
assertThat(innerStruct).isNotNull().withFailMessage("Inner struct should not be null");

assertThat(innerStruct.get(0).toString())
.isEqualTo("test_value")
.withFailMessage("Inner value field should match expected value");
}
}