diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java index 86404959735a..baae91dd1882 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java @@ -37,16 +37,28 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +/** + * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead. + */ +@Deprecated public class FlinkAvroReader implements DatumReader, SupportsRowPosition { private final Schema readSchema; private final ValueReader reader; private Schema fileSchema = null; + /** + * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead. + */ + @Deprecated public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { this(expectedSchema, readSchema, ImmutableMap.of()); } + /** + * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead. + */ + @Deprecated @SuppressWarnings("unchecked") public FlinkAvroReader( org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map constants) { diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java new file mode 100644 index 000000000000..b7a81752d4a0 --- /dev/null +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.avro.AvroWithPartnerVisitor; +import org.apache.iceberg.avro.SupportsRowPosition; +import org.apache.iceberg.avro.ValueReader; +import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; + +public class FlinkPlannedAvroReader implements DatumReader, SupportsRowPosition { + + private final Types.StructType expectedType; + private final Map idToConstant; + private ValueReader reader; + + public static FlinkPlannedAvroReader create(org.apache.iceberg.Schema schema) { + return create(schema, ImmutableMap.of()); + } + + public static FlinkPlannedAvroReader create( + org.apache.iceberg.Schema schema, Map constants) { + return new FlinkPlannedAvroReader(schema, constants); + } + + private FlinkPlannedAvroReader( + org.apache.iceberg.Schema expectedSchema, Map constants) { + this.expectedType = expectedSchema.asStruct(); + this.idToConstant = constants; + } + + @Override + @SuppressWarnings("unchecked") + public void setSchema(Schema fileSchema) { + this.reader = + (ValueReader) + AvroWithPartnerVisitor.visit( + expectedType, + fileSchema, + new ReadBuilder(idToConstant), + AvroWithPartnerVisitor.FieldIDAccessors.get()); + } + + @Override + public RowData read(RowData reuse, Decoder decoder) throws IOException { + return reader.read(decoder, reuse); + } + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); + } + } + + private static class ReadBuilder extends AvroWithPartnerVisitor> { + private final Map idToConstant; + + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public ValueReader record(Type partner, Schema record, List> fieldReaders) { + if (partner == null) { + return ValueReaders.skipStruct(fieldReaders); + } + + Types.StructType expected = partner.asStructType(); + List>> readPlan = + ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant); + + // TODO: should this pass expected so that struct.get can reuse containers? + return FlinkValueReaders.struct(readPlan, expected.fields().size()); + } + + @Override + public ValueReader union(Type partner, Schema union, List> options) { + return ValueReaders.union(options); + } + + @Override + public ValueReader array(Type partner, Schema array, ValueReader elementReader) { + return FlinkValueReaders.array(elementReader); + } + + @Override + public ValueReader arrayMap( + Type partner, Schema map, ValueReader keyReader, ValueReader valueReader) { + return FlinkValueReaders.arrayMap(keyReader, valueReader); + } + + @Override + public ValueReader map(Type partner, Schema map, ValueReader valueReader) { + return FlinkValueReaders.map(FlinkValueReaders.strings(), valueReader); + } + + @Override + public ValueReader primitive(Type partner, Schema primitive) { + LogicalType logicalType = primitive.getLogicalType(); + if (logicalType != null) { + switch (logicalType.getName()) { + case "date": + // Flink uses the same representation + return ValueReaders.ints(); + + case "time-micros": + return FlinkValueReaders.timeMicros(); + + case "timestamp-millis": + return FlinkValueReaders.timestampMills(); + + case "timestamp-micros": + return FlinkValueReaders.timestampMicros(); + + case "decimal": + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return FlinkValueReaders.decimal( + ValueReaders.decimalBytesReader(primitive), + decimal.getPrecision(), + decimal.getScale()); + + case "uuid": + return FlinkValueReaders.uuids(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalType.getName()); + } + } + + switch (primitive.getType()) { + case NULL: + return ValueReaders.nulls(); + case BOOLEAN: + return ValueReaders.booleans(); + case INT: + if (partner != null && partner.typeId() == Type.TypeID.LONG) { + return ValueReaders.intsAsLongs(); + } + return ValueReaders.ints(); + case LONG: + return ValueReaders.longs(); + case FLOAT: + if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) { + return ValueReaders.floatsAsDoubles(); + } + return ValueReaders.floats(); + case DOUBLE: + return ValueReaders.doubles(); + case STRING: + return FlinkValueReaders.strings(); + case FIXED: + return ValueReaders.fixed(primitive.getFixedSize()); + case BYTES: + return ValueReaders.bytes(); + case ENUM: + return FlinkValueReaders.enums(primitive.getEnumSymbols()); + default: + throw new IllegalArgumentException("Unsupported type: " + primitive); + } + } + } +} diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index 32f6c3a2ccfd..0c6ff2411160 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -40,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; public class FlinkValueReaders { @@ -86,6 +87,10 @@ static ValueReader map(ValueReader keyReader, ValueReader valueRe return new MapReader(keyReader, valueReader); } + static ValueReader struct(List>> readPlan, int numFields) { + return new PlannedStructReader(readPlan, numFields); + } + static ValueReader struct( List> readers, Types.StructType struct, Map idToConstant) { return new StructReader(readers, struct, idToConstant); @@ -282,6 +287,33 @@ public MapData read(Decoder decoder, Object reuse) throws IOException { } } + private static class PlannedStructReader extends ValueReaders.PlannedStructReader { + private final int numFields; + + private PlannedStructReader(List>> readPlan, int numFields) { + super(readPlan); + this.numFields = numFields; + } + + @Override + protected RowData reuseOrCreate(Object reuse) { + if (reuse instanceof GenericRowData && ((GenericRowData) reuse).getArity() == numFields) { + return (RowData) reuse; + } + return new GenericRowData(numFields); + } + + @Override + protected Object get(RowData struct, int pos) { + return null; + } + + @Override + protected void set(RowData struct, int pos, Object value) { + ((GenericRowData) struct).setField(pos, value); + } + } + private static class StructReader extends ValueReaders.StructReader { private final int numFields; diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 88364f4e87b1..9c75a5e0f0fc 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -35,9 +35,9 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkSourceFilter; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.FlinkAvroReader; import org.apache.iceberg.flink.data.FlinkOrcReader; import org.apache.iceberg.flink.data.FlinkParquetReaders; +import org.apache.iceberg.flink.data.FlinkPlannedAvroReader; import org.apache.iceberg.flink.data.RowDataProjection; import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.io.CloseableIterable; @@ -154,7 +154,7 @@ private CloseableIterable newAvroIterable( .reuseContainers() .project(schema) .split(task.start(), task.length()) - .createReaderFunc(readSchema -> new FlinkAvroReader(schema, readSchema, idToConstant)); + .createReaderFunc(readSchema -> FlinkPlannedAvroReader.create(schema, idToConstant)); if (nameMapping != null) { builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java similarity index 96% rename from flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java rename to flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java index a1039d27d888..cbf49ae6faa9 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java @@ -48,7 +48,7 @@ import org.apache.iceberg.util.DateTimeUtil; import org.junit.jupiter.api.Test; -public class TestFlinkAvroReaderWriter extends DataTest { +public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest { private static final int NUM_RECORDS = 100; @@ -70,6 +70,8 @@ protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, expectedRecords, NUM_RECORDS); } + protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); + private void writeAndValidate(Schema schema, List expectedRecords, int numRecord) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(schema); @@ -88,11 +90,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n writer.addAll(expectedRecords); } - try (CloseableIterable reader = - Avro.read(Files.localInput(recordsFile)) - .project(schema) - .createReaderFunc(FlinkAvroReader::new) - .build()) { + try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); for (int i = 0; i < numRecord; i++) { @@ -156,7 +154,6 @@ private Record recordNumType( @Test public void testNumericTypes() throws IOException { - List expected = ImmutableList.of( recordNumType( diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java new file mode 100644 index 000000000000..03910f4fda47 --- /dev/null +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.io.File; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; + +/** + * @deprecated should be removed in 1.8.0; along with FlinkAvroReader. + */ +@Deprecated +public class TestFlinkAvroDeprecatedReaderWriter extends AbstractTestFlinkAvroReaderWriter { + + @Override + protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema) { + return Avro.read(Files.localInput(recordsFile)) + .project(schema) + .createReaderFunc(FlinkAvroReader::new); + } +} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java new file mode 100644 index 000000000000..102a26a94784 --- /dev/null +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.io.File; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; + +public class TestFlinkAvroPlannedReaderWriter extends AbstractTestFlinkAvroReaderWriter { + + @Override + protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema) { + return Avro.read(Files.localInput(recordsFile)) + .project(schema) + .createResolvingReader(FlinkPlannedAvroReader::create); + } +} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java index e76452b7cea0..282a6055cbd3 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java @@ -24,6 +24,8 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.GenericArrayData; @@ -32,6 +34,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -40,13 +45,23 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +@ExtendWith(ParameterizedTestExtension.class) public class TestRowProjection { @TempDir private Path temp; + @Parameter(index = 0) + protected Boolean useAvroPlannedReader; + + @Parameters(name = "useAvroPlannedReader={0}") + protected static List parameters() { + return Arrays.asList(new Object[] {Boolean.FALSE}, new Object[] {Boolean.TRUE}); + } + private RowData writeAndRead(String desc, Schema writeSchema, Schema readSchema, RowData row) throws IOException { File file = File.createTempFile("junit", desc + ".avro", temp.toFile()); @@ -60,16 +75,23 @@ private RowData writeAndRead(String desc, Schema writeSchema, Schema readSchema, appender.add(row); } - Iterable records = + Avro.ReadBuilder builder = Avro.read(Files.localInput(file)) .project(readSchema) - .createReaderFunc(FlinkAvroReader::new) - .build(); + .createReaderFunc(FlinkAvroReader::new); + if (useAvroPlannedReader) { + builder = + Avro.read(Files.localInput(file)) + .project(readSchema) + .createResolvingReader(FlinkPlannedAvroReader::create); + } + + Iterable records = builder.build(); return Iterables.getOnlyElement(records); } - @Test + @TestTemplate public void testFullProjection() throws Exception { Schema schema = new Schema( @@ -84,7 +106,7 @@ public void testFullProjection() throws Exception { assertThat(projected.getString(1)).asString().isEqualTo("test"); } - @Test + @TestTemplate public void testSpecialCharacterProjection() throws Exception { Schema schema = new Schema( @@ -104,7 +126,7 @@ public void testSpecialCharacterProjection() throws Exception { assertThat(projected.getString(0)).asString().isEqualTo("test"); } - @Test + @TestTemplate public void testReorderedFullProjection() throws Exception { Schema schema = new Schema( @@ -124,7 +146,7 @@ public void testReorderedFullProjection() throws Exception { assertThat(projected.getLong(1)).isEqualTo(34); } - @Test + @TestTemplate public void testReorderedProjection() throws Exception { Schema schema = new Schema( @@ -146,7 +168,7 @@ public void testReorderedProjection() throws Exception { assertThat(projected.isNullAt(2)).isTrue(); } - @Test + @TestTemplate public void testRenamedAddedField() throws Exception { Schema schema = new Schema( @@ -176,7 +198,7 @@ public void testRenamedAddedField() throws Exception { assertThat(projected.isNullAt(3)).as("Should contain empty value on new column 4").isTrue(); } - @Test + @TestTemplate public void testEmptyProjection() throws Exception { Schema schema = new Schema( @@ -191,7 +213,7 @@ public void testEmptyProjection() throws Exception { assertThat(projected.getArity()).isEqualTo(0); } - @Test + @TestTemplate public void testBasicProjection() throws Exception { Schema writeSchema = new Schema( @@ -214,7 +236,7 @@ public void testBasicProjection() throws Exception { assertThat(projected.getString(0)).asString().isEqualTo("test"); } - @Test + @TestTemplate public void testRename() throws Exception { Schema writeSchema = new Schema( @@ -237,7 +259,7 @@ public void testRename() throws Exception { .isEqualTo("test"); } - @Test + @TestTemplate public void testNestedStructProjection() throws Exception { Schema writeSchema = new Schema( @@ -303,7 +325,7 @@ public void testNestedStructProjection() throws Exception { .isEqualTo(-1.539054f, withPrecision(0.000001f)); } - @Test + @TestTemplate public void testMapProjection() throws IOException { Schema writeSchema = new Schema( @@ -357,7 +379,7 @@ public void testMapProjection() throws IOException { return stringMap; } - @Test + @TestTemplate public void testMapOfStructsProjection() throws IOException { Schema writeSchema = new Schema( @@ -457,7 +479,7 @@ public void testMapOfStructsProjection() throws IOException { .isEqualTo(52.995143f, withPrecision(0.000001f)); } - @Test + @TestTemplate public void testListProjection() throws IOException { Schema writeSchema = new Schema( @@ -486,7 +508,7 @@ public void testListProjection() throws IOException { assertThat(projected.getArray(0)).isEqualTo(values); } - @Test + @TestTemplate @SuppressWarnings("unchecked") public void testListOfStructsProjection() throws IOException { Schema writeSchema = @@ -563,7 +585,7 @@ public void testListOfStructsProjection() throws IOException { assertThat(projectedP2.isNullAt(0)).as("Should project null z").isTrue(); } - @Test + @TestTemplate public void testAddedFieldsWithRequiredChildren() throws Exception { Schema schema = new Schema(Types.NestedField.required(1, "a", Types.LongType.get())); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java index 86404959735a..baae91dd1882 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java @@ -37,16 +37,28 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +/** + * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead. + */ +@Deprecated public class FlinkAvroReader implements DatumReader, SupportsRowPosition { private final Schema readSchema; private final ValueReader reader; private Schema fileSchema = null; + /** + * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead. + */ + @Deprecated public FlinkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { this(expectedSchema, readSchema, ImmutableMap.of()); } + /** + * @deprecated will be removed in 1.8.0; use FlinkPlannedAvroReader instead. + */ + @Deprecated @SuppressWarnings("unchecked") public FlinkAvroReader( org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map constants) { diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java new file mode 100644 index 000000000000..b7a81752d4a0 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.avro.AvroWithPartnerVisitor; +import org.apache.iceberg.avro.SupportsRowPosition; +import org.apache.iceberg.avro.ValueReader; +import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; + +public class FlinkPlannedAvroReader implements DatumReader, SupportsRowPosition { + + private final Types.StructType expectedType; + private final Map idToConstant; + private ValueReader reader; + + public static FlinkPlannedAvroReader create(org.apache.iceberg.Schema schema) { + return create(schema, ImmutableMap.of()); + } + + public static FlinkPlannedAvroReader create( + org.apache.iceberg.Schema schema, Map constants) { + return new FlinkPlannedAvroReader(schema, constants); + } + + private FlinkPlannedAvroReader( + org.apache.iceberg.Schema expectedSchema, Map constants) { + this.expectedType = expectedSchema.asStruct(); + this.idToConstant = constants; + } + + @Override + @SuppressWarnings("unchecked") + public void setSchema(Schema fileSchema) { + this.reader = + (ValueReader) + AvroWithPartnerVisitor.visit( + expectedType, + fileSchema, + new ReadBuilder(idToConstant), + AvroWithPartnerVisitor.FieldIDAccessors.get()); + } + + @Override + public RowData read(RowData reuse, Decoder decoder) throws IOException { + return reader.read(decoder, reuse); + } + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); + } + } + + private static class ReadBuilder extends AvroWithPartnerVisitor> { + private final Map idToConstant; + + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public ValueReader record(Type partner, Schema record, List> fieldReaders) { + if (partner == null) { + return ValueReaders.skipStruct(fieldReaders); + } + + Types.StructType expected = partner.asStructType(); + List>> readPlan = + ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant); + + // TODO: should this pass expected so that struct.get can reuse containers? + return FlinkValueReaders.struct(readPlan, expected.fields().size()); + } + + @Override + public ValueReader union(Type partner, Schema union, List> options) { + return ValueReaders.union(options); + } + + @Override + public ValueReader array(Type partner, Schema array, ValueReader elementReader) { + return FlinkValueReaders.array(elementReader); + } + + @Override + public ValueReader arrayMap( + Type partner, Schema map, ValueReader keyReader, ValueReader valueReader) { + return FlinkValueReaders.arrayMap(keyReader, valueReader); + } + + @Override + public ValueReader map(Type partner, Schema map, ValueReader valueReader) { + return FlinkValueReaders.map(FlinkValueReaders.strings(), valueReader); + } + + @Override + public ValueReader primitive(Type partner, Schema primitive) { + LogicalType logicalType = primitive.getLogicalType(); + if (logicalType != null) { + switch (logicalType.getName()) { + case "date": + // Flink uses the same representation + return ValueReaders.ints(); + + case "time-micros": + return FlinkValueReaders.timeMicros(); + + case "timestamp-millis": + return FlinkValueReaders.timestampMills(); + + case "timestamp-micros": + return FlinkValueReaders.timestampMicros(); + + case "decimal": + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return FlinkValueReaders.decimal( + ValueReaders.decimalBytesReader(primitive), + decimal.getPrecision(), + decimal.getScale()); + + case "uuid": + return FlinkValueReaders.uuids(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalType.getName()); + } + } + + switch (primitive.getType()) { + case NULL: + return ValueReaders.nulls(); + case BOOLEAN: + return ValueReaders.booleans(); + case INT: + if (partner != null && partner.typeId() == Type.TypeID.LONG) { + return ValueReaders.intsAsLongs(); + } + return ValueReaders.ints(); + case LONG: + return ValueReaders.longs(); + case FLOAT: + if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) { + return ValueReaders.floatsAsDoubles(); + } + return ValueReaders.floats(); + case DOUBLE: + return ValueReaders.doubles(); + case STRING: + return FlinkValueReaders.strings(); + case FIXED: + return ValueReaders.fixed(primitive.getFixedSize()); + case BYTES: + return ValueReaders.bytes(); + case ENUM: + return FlinkValueReaders.enums(primitive.getEnumSymbols()); + default: + throw new IllegalArgumentException("Unsupported type: " + primitive); + } + } + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java index 32f6c3a2ccfd..0c6ff2411160 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueReaders.java @@ -40,6 +40,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; public class FlinkValueReaders { @@ -86,6 +87,10 @@ static ValueReader map(ValueReader keyReader, ValueReader valueRe return new MapReader(keyReader, valueReader); } + static ValueReader struct(List>> readPlan, int numFields) { + return new PlannedStructReader(readPlan, numFields); + } + static ValueReader struct( List> readers, Types.StructType struct, Map idToConstant) { return new StructReader(readers, struct, idToConstant); @@ -282,6 +287,33 @@ public MapData read(Decoder decoder, Object reuse) throws IOException { } } + private static class PlannedStructReader extends ValueReaders.PlannedStructReader { + private final int numFields; + + private PlannedStructReader(List>> readPlan, int numFields) { + super(readPlan); + this.numFields = numFields; + } + + @Override + protected RowData reuseOrCreate(Object reuse) { + if (reuse instanceof GenericRowData && ((GenericRowData) reuse).getArity() == numFields) { + return (RowData) reuse; + } + return new GenericRowData(numFields); + } + + @Override + protected Object get(RowData struct, int pos) { + return null; + } + + @Override + protected void set(RowData struct, int pos, Object value) { + ((GenericRowData) struct).setField(pos, value); + } + } + private static class StructReader extends ValueReaders.StructReader { private final int numFields; diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 88364f4e87b1..9c75a5e0f0fc 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -35,9 +35,9 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkSourceFilter; import org.apache.iceberg.flink.RowDataWrapper; -import org.apache.iceberg.flink.data.FlinkAvroReader; import org.apache.iceberg.flink.data.FlinkOrcReader; import org.apache.iceberg.flink.data.FlinkParquetReaders; +import org.apache.iceberg.flink.data.FlinkPlannedAvroReader; import org.apache.iceberg.flink.data.RowDataProjection; import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.io.CloseableIterable; @@ -154,7 +154,7 @@ private CloseableIterable newAvroIterable( .reuseContainers() .project(schema) .split(task.start(), task.length()) - .createReaderFunc(readSchema -> new FlinkAvroReader(schema, readSchema, idToConstant)); + .createReaderFunc(readSchema -> FlinkPlannedAvroReader.create(schema, idToConstant)); if (nameMapping != null) { builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java similarity index 96% rename from flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java rename to flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java index a1039d27d888..cbf49ae6faa9 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java @@ -48,7 +48,7 @@ import org.apache.iceberg.util.DateTimeUtil; import org.junit.jupiter.api.Test; -public class TestFlinkAvroReaderWriter extends DataTest { +public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest { private static final int NUM_RECORDS = 100; @@ -70,6 +70,8 @@ protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, expectedRecords, NUM_RECORDS); } + protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); + private void writeAndValidate(Schema schema, List expectedRecords, int numRecord) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(schema); @@ -88,11 +90,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n writer.addAll(expectedRecords); } - try (CloseableIterable reader = - Avro.read(Files.localInput(recordsFile)) - .project(schema) - .createReaderFunc(FlinkAvroReader::new) - .build()) { + try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); for (int i = 0; i < numRecord; i++) { @@ -156,7 +154,6 @@ private Record recordNumType( @Test public void testNumericTypes() throws IOException { - List expected = ImmutableList.of( recordNumType( diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java new file mode 100644 index 000000000000..03910f4fda47 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.io.File; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; + +/** + * @deprecated should be removed in 1.8.0; along with FlinkAvroReader. + */ +@Deprecated +public class TestFlinkAvroDeprecatedReaderWriter extends AbstractTestFlinkAvroReaderWriter { + + @Override + protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema) { + return Avro.read(Files.localInput(recordsFile)) + .project(schema) + .createReaderFunc(FlinkAvroReader::new); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java new file mode 100644 index 000000000000..102a26a94784 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.data; + +import java.io.File; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; + +public class TestFlinkAvroPlannedReaderWriter extends AbstractTestFlinkAvroReaderWriter { + + @Override + protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema) { + return Avro.read(Files.localInput(recordsFile)) + .project(schema) + .createResolvingReader(FlinkPlannedAvroReader::create); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java index 7dd4e8759c0e..f76e4c4942bd 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java @@ -24,6 +24,8 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.GenericArrayData; @@ -32,6 +34,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -41,13 +46,23 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +@ExtendWith(ParameterizedTestExtension.class) public class TestRowProjection { @TempDir private Path temp; + @Parameter(index = 0) + protected Boolean useAvroPlannedReader; + + @Parameters(name = "useAvroPlannedReader={0}") + protected static List parameters() { + return Arrays.asList(new Object[] {Boolean.FALSE}, new Object[] {Boolean.TRUE}); + } + private RowData writeAndRead(String desc, Schema writeSchema, Schema readSchema, RowData row) throws IOException { File file = File.createTempFile("junit", desc + ".avro", temp.toFile()); @@ -61,16 +76,23 @@ private RowData writeAndRead(String desc, Schema writeSchema, Schema readSchema, appender.add(row); } - Iterable records = + Avro.ReadBuilder builder = Avro.read(Files.localInput(file)) .project(readSchema) - .createReaderFunc(FlinkAvroReader::new) - .build(); + .createReaderFunc(FlinkAvroReader::new); + if (useAvroPlannedReader) { + builder = + Avro.read(Files.localInput(file)) + .project(readSchema) + .createResolvingReader(FlinkPlannedAvroReader::create); + } + + Iterable records = builder.build(); return Iterables.getOnlyElement(records); } - @Test + @TestTemplate public void testFullProjection() throws Exception { Schema schema = new Schema( @@ -85,7 +107,7 @@ public void testFullProjection() throws Exception { assertThat(projected.getString(1)).asString().isEqualTo("test"); } - @Test + @TestTemplate public void testSpecialCharacterProjection() throws Exception { Schema schema = new Schema( @@ -105,7 +127,7 @@ public void testSpecialCharacterProjection() throws Exception { assertThat(projected.getString(0)).asString().isEqualTo("test"); } - @Test + @TestTemplate public void testReorderedFullProjection() throws Exception { Schema schema = new Schema( @@ -125,7 +147,7 @@ public void testReorderedFullProjection() throws Exception { assertThat(projected.getLong(1)).isEqualTo(34); } - @Test + @TestTemplate public void testReorderedProjection() throws Exception { Schema schema = new Schema( @@ -147,7 +169,7 @@ public void testReorderedProjection() throws Exception { assertThat(projected.isNullAt(2)).isTrue(); } - @Test + @TestTemplate public void testRenamedAddedField() throws Exception { Schema schema = new Schema( @@ -177,7 +199,7 @@ public void testRenamedAddedField() throws Exception { assertThat(projected.isNullAt(3)).as("Should contain empty value on new column 4").isTrue(); } - @Test + @TestTemplate public void testEmptyProjection() throws Exception { Schema schema = new Schema( @@ -192,7 +214,7 @@ public void testEmptyProjection() throws Exception { assertThat(projected.getArity()).isEqualTo(0); } - @Test + @TestTemplate public void testBasicProjection() throws Exception { Schema writeSchema = new Schema( @@ -216,7 +238,7 @@ public void testBasicProjection() throws Exception { assertThat(projected.getString(0)).asString().isEqualTo("test"); } - @Test + @TestTemplate public void testRename() throws Exception { Schema writeSchema = new Schema( @@ -239,7 +261,7 @@ public void testRename() throws Exception { .isEqualTo("test"); } - @Test + @TestTemplate public void testNestedStructProjection() throws Exception { Schema writeSchema = new Schema( @@ -305,7 +327,7 @@ public void testNestedStructProjection() throws Exception { .isEqualTo(-1.539054f, withPrecision(0.000001f)); } - @Test + @TestTemplate public void testMapProjection() throws IOException { Schema writeSchema = new Schema( @@ -359,7 +381,7 @@ public void testMapProjection() throws IOException { return stringMap; } - @Test + @TestTemplate public void testMapOfStructsProjection() throws IOException { Schema writeSchema = new Schema( @@ -459,7 +481,7 @@ public void testMapOfStructsProjection() throws IOException { .isEqualTo(52.995143f, withPrecision(0.000001f)); } - @Test + @TestTemplate public void testListProjection() throws IOException { Schema writeSchema = new Schema( @@ -488,7 +510,7 @@ public void testListProjection() throws IOException { assertThat(projected.getArray(0)).isEqualTo(values); } - @Test + @TestTemplate @SuppressWarnings("unchecked") public void testListOfStructsProjection() throws IOException { Schema writeSchema = @@ -565,7 +587,7 @@ public void testListOfStructsProjection() throws IOException { assertThat(projectedP2.isNullAt(0)).as("Should project null z").isTrue(); } - @Test + @TestTemplate public void testAddedFieldsWithRequiredChildren() throws Exception { Schema schema = new Schema(Types.NestedField.required(1, "a", Types.LongType.get()));