diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 67f53d3636a6..66cc2495b016 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.function.BiFunction; import java.util.function.Supplier; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -44,6 +45,7 @@ import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.UUIDUtil; @@ -199,6 +201,25 @@ public static List>> buildReadPlan( Schema record, List> fieldReaders, Map idToConstant) { + return buildReadPlan(expected, record, fieldReaders, idToConstant, (type, value) -> value); + } + + /** + * Builds a read plan for record classes that use planned reads instead of a ResolvingDecoder. + * + * @param expected expected StructType + * @param record Avro record schema + * @param fieldReaders list of readers for each field in the Avro record schema + * @param idToConstant a map of field ID to constants values + * @param convert function to convert from internal classes to the target object model + * @return a read plan that is a list of (position, reader) pairs + */ + public static List>> buildReadPlan( + Types.StructType expected, + Schema record, + List> fieldReaders, + Map idToConstant, + BiFunction convert) { Map idToPos = idToPos(expected); List>> readPlan = Lists.newArrayList(); @@ -228,7 +249,9 @@ public static List>> buildReadPlan( if (constant != null) { readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); } else if (field.initialDefault() != null) { - readPlan.add(Pair.of(pos, ValueReaders.constant(field.initialDefault()))); + readPlan.add( + Pair.of( + pos, ValueReaders.constant(convert.apply(field.type(), field.initialDefault())))); } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) { readPlan.add(Pair.of(pos, ValueReaders.constant(false))); } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) { diff --git a/core/src/main/java/org/apache/iceberg/data/GenericDataUtil.java b/core/src/main/java/org/apache/iceberg/data/GenericDataUtil.java new file mode 100644 index 000000000000..152ef31ac876 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/data/GenericDataUtil.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.nio.ByteBuffer; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.DateTimeUtil; + +/** Utility methods for working with Iceberg's generic data model */ +public class GenericDataUtil { + private GenericDataUtil() {} + + /** + * Convert a value from Iceberg's internal data model to the generic data model. + * + * @param type a data type + * @param value value to convert + * @return the value in the generic data model representation + */ + public static Object internalToGeneric(Type type, Object value) { + if (null == value) { + return null; + } + + switch (type.typeId()) { + case DATE: + return DateTimeUtil.dateFromDays((Integer) value); + case TIME: + return DateTimeUtil.timeFromMicros((Long) value); + case TIMESTAMP: + if (((Types.TimestampType) type).shouldAdjustToUTC()) { + return DateTimeUtil.timestamptzFromMicros((Long) value); + } else { + return DateTimeUtil.timestampFromMicros((Long) value); + } + case FIXED: + return ByteBuffers.toByteArray((ByteBuffer) value); + } + + return value; + } +} diff --git a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java index c7ec2e6091cc..64b3e943e270 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java @@ -32,6 +32,7 @@ import org.apache.iceberg.avro.SupportsRowPosition; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.data.GenericDataUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -97,7 +98,8 @@ public ValueReader record(Type partner, Schema record, List> f Types.StructType expected = partner.asStructType(); List>> readPlan = - ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant); + ValueReaders.buildReadPlan( + expected, record, fieldReaders, idToConstant, GenericDataUtil::internalToGeneric); return GenericReaders.struct(readPlan, expected); } diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java b/data/src/test/java/org/apache/iceberg/data/DataTest.java index 638a344cd2bc..657fa805e5a6 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTest.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTest.java @@ -20,24 +20,45 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.ListType; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.DateTimeUtil; +import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public abstract class DataTest { protected abstract void writeAndValidate(Schema schema) throws IOException; + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + throw new UnsupportedEncodingException( + "Cannot run test, writeAndValidate(Schema, Schema) is not implemented"); + } + + protected boolean supportsDefaultValues() { + return false; + } + protected static final StructType SUPPORTED_PRIMITIVES = StructType.of( required(100, "id", LongType.get()), @@ -194,4 +215,275 @@ public void testMixedTypes() throws IOException { writeAndValidate(schema); } + + @Test + public void testMissingRequiredWithoutDefault() { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = new Schema(required(1, "id", Types.LongType.get())); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.required("missing_str") + .withId(6) + .ofType(Types.StringType.get()) + .withDoc("Missing required field with no default") + .build()); + + assertThatThrownBy(() -> writeAndValidate(writeSchema, expectedSchema)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Missing required field: missing_str"); + } + + @Test + public void testDefaultValues() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.required("missing_str") + .withId(6) + .ofType(Types.StringType.get()) + .withInitialDefault("orange") + .build(), + Types.NestedField.optional("missing_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testNullDefaultValue() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("missing_date") + .withId(3) + .ofType(Types.DateType.get()) + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testNestedDefaultValue() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested") + .withId(3) + .ofType(Types.StructType.of(required(4, "inner", Types.StringType.get()))) + .withDoc("Used to test nested field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested") + .withId(3) + .ofType( + Types.StructType.of( + required(4, "inner", Types.StringType.get()), + Types.NestedField.optional("missing_inner_float") + .withId(5) + .ofType(Types.FloatType.get()) + .withInitialDefault(-0.0F) + .build())) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testMapNestedDefaultValue() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested_map") + .withId(3) + .ofType( + Types.MapType.ofOptional( + 4, + 5, + Types.StringType.get(), + Types.StructType.of(required(6, "value_str", Types.StringType.get())))) + .withDoc("Used to test nested map value field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested_map") + .withId(3) + .ofType( + Types.MapType.ofOptional( + 4, + 5, + Types.StringType.get(), + Types.StructType.of( + required(6, "value_str", Types.StringType.get()), + Types.NestedField.optional("value_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()))) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testListNestedDefaultValue() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested_list") + .withId(3) + .ofType( + Types.ListType.ofOptional( + 4, Types.StructType.of(required(5, "element_str", Types.StringType.get())))) + .withDoc("Used to test nested field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested_list") + .withId(3) + .ofType( + Types.ListType.ofOptional( + 4, + Types.StructType.of( + required(5, "element_str", Types.StringType.get()), + Types.NestedField.optional("element_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()))) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + private static Stream primitiveTypesAndDefaults() { + return Stream.of( + Arguments.of(Types.BooleanType.get(), false), + Arguments.of(Types.IntegerType.get(), 34), + Arguments.of(Types.LongType.get(), 4900000000L), + Arguments.of(Types.FloatType.get(), 12.21F), + Arguments.of(Types.DoubleType.get(), -0.0D), + Arguments.of(Types.DateType.get(), DateTimeUtil.isoDateToDays("2024-12-17")), + Arguments.of(Types.TimeType.get(), DateTimeUtil.isoTimeToMicros("23:59:59.999999")), + Arguments.of( + Types.TimestampType.withZone(), + DateTimeUtil.isoTimestamptzToMicros("2024-12-17T23:59:59.999999+00:00")), + Arguments.of( + Types.TimestampType.withoutZone(), + DateTimeUtil.isoTimestampToMicros("2024-12-17T23:59:59.999999")), + Arguments.of(Types.StringType.get(), "iceberg"), + Arguments.of(Types.UUIDType.get(), UUID.randomUUID()), + Arguments.of( + Types.FixedType.ofLength(4), ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d})), + Arguments.of(Types.BinaryType.get(), ByteBuffer.wrap(new byte[] {0x0a, 0x0b})), + Arguments.of(Types.DecimalType.of(9, 2), new BigDecimal("12.34"))); + } + + @ParameterizedTest + @MethodSource("primitiveTypesAndDefaults") + public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defaultValue) + throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = new Schema(required(1, "id", Types.LongType.get())); + + Schema readSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("col_with_default") + .withId(2) + .ofType(type) + .withInitialDefault(defaultValue) + .build()); + + writeAndValidate(writeSchema, readSchema); + } } diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index 72e3973382af..f8536dfd01c5 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -36,7 +36,10 @@ public static void assertEquals(Types.StructType struct, Record expected, Record assertEquals( field.type(), expected.getField(expectedField.name()), actual.getField(field.name())); } else { - assertThat(actual.getField(field.name())).isEqualTo(field.initialDefault()); + assertEquals( + field.type(), + GenericDataUtil.internalToGeneric(field.type(), field.initialDefault()), + actual.getField(field.name())); } } } diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java index 651df22cfc15..bf5160fd18dc 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java @@ -18,9 +18,7 @@ */ package org.apache.iceberg.data.avro; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -35,8 +33,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -import org.junit.jupiter.api.Test; public class TestGenericData extends DataTest { @Override @@ -44,6 +40,7 @@ protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { List expected = RandomGenericData.generate(writeSchema, 100, 0L); @@ -75,219 +72,8 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw } } - @Test - public void testMissingRequiredWithoutDefault() { - Schema writeSchema = new Schema(required(1, "id", Types.LongType.get())); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.required("missing_str") - .withId(6) - .ofType(Types.StringType.get()) - .withDoc("Missing required field with no default") - .build()); - - assertThatThrownBy(() -> writeAndValidate(writeSchema, expectedSchema)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Missing required field: missing_str"); - } - - @Test - public void testDefaultValues() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.required("missing_str") - .withId(6) - .ofType(Types.StringType.get()) - .withInitialDefault("orange") - .build(), - Types.NestedField.optional("missing_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testNullDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.optional("missing_date") - .withId(3) - .ofType(Types.DateType.get()) - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - Types.NestedField.optional("nested") - .withId(3) - .ofType(Types.StructType.of(required(4, "inner", Types.StringType.get()))) - .withDoc("Used to test nested field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.optional("nested") - .withId(3) - .ofType( - Types.StructType.of( - required(4, "inner", Types.StringType.get()), - Types.NestedField.optional("missing_inner_float") - .withId(5) - .ofType(Types.FloatType.get()) - .withInitialDefault(-0.0F) - .build())) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testMapNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - Types.NestedField.optional("nested_map") - .withId(3) - .ofType( - Types.MapType.ofOptional( - 4, - 5, - Types.StringType.get(), - Types.StructType.of(required(6, "value_str", Types.StringType.get())))) - .withDoc("Used to test nested map value field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.optional("nested_map") - .withId(3) - .ofType( - Types.MapType.ofOptional( - 4, - 5, - Types.StringType.get(), - Types.StructType.of( - required(6, "value_str", Types.StringType.get()), - Types.NestedField.optional("value_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()))) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testListNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - Types.NestedField.optional("nested_list") - .withId(3) - .ofType( - Types.ListType.ofOptional( - 4, Types.StructType.of(required(5, "element_str", Types.StringType.get())))) - .withDoc("Used to test nested field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.optional("nested_list") - .withId(3) - .ofType( - Types.ListType.ofOptional( - 4, - Types.StructType.of( - required(5, "element_str", Types.StringType.get()), - Types.NestedField.optional("element_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()))) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); + @Override + protected boolean supportsDefaultValues() { + return true; } } diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index 5c7c11f1d231..5a63f7a1fc9d 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -19,9 +19,7 @@ package org.apache.iceberg.data.parquet; import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -43,7 +41,6 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; -import org.apache.iceberg.types.Types.NestedField; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.junit.jupiter.api.Test; @@ -54,6 +51,7 @@ protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { List expected = RandomGenericData.generate(writeSchema, 100, 12228L); @@ -98,6 +96,11 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw } } + @Override + protected boolean supportsDefaultValues() { + return true; + } + @Test public void testTwoLevelList() throws IOException { Schema schema = @@ -144,217 +147,4 @@ public void testTwoLevelList() throws IOException { assertThat(Lists.newArrayList(reader).size()).isEqualTo(1); } } - - @Test - public void testMissingRequiredWithoutDefault() { - Schema writeSchema = new Schema(required(1, "id", Types.LongType.get())); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.required("missing_str") - .withId(6) - .ofType(Types.StringType.get()) - .withDoc("Missing required field with no default") - .build()); - - assertThatThrownBy(() -> writeAndValidate(writeSchema, expectedSchema)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Missing required field: missing_str"); - } - - @Test - public void testDefaultValues() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - NestedField.required("missing_str") - .withId(6) - .ofType(Types.StringType.get()) - .withInitialDefault("orange") - .build(), - NestedField.optional("missing_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testNullDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - NestedField.optional("missing_date").withId(3).ofType(Types.DateType.get()).build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - NestedField.optional("nested") - .withId(3) - .ofType(Types.StructType.of(required(4, "inner", Types.StringType.get()))) - .withDoc("Used to test nested field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - NestedField.optional("nested") - .withId(3) - .ofType( - Types.StructType.of( - required(4, "inner", Types.StringType.get()), - NestedField.optional("missing_inner_float") - .withId(5) - .ofType(Types.FloatType.get()) - .withInitialDefault(-0.0F) - .build())) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testMapNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - NestedField.optional("nested_map") - .withId(3) - .ofType( - Types.MapType.ofOptional( - 4, - 5, - Types.StringType.get(), - Types.StructType.of(required(6, "value_str", Types.StringType.get())))) - .withDoc("Used to test nested map value field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - NestedField.optional("nested_map") - .withId(3) - .ofType( - Types.MapType.ofOptional( - 4, - 5, - Types.StringType.get(), - Types.StructType.of( - required(6, "value_str", Types.StringType.get()), - Types.NestedField.optional("value_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()))) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testListNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - NestedField.optional("nested_list") - .withId(3) - .ofType( - Types.ListType.ofOptional( - 4, Types.StructType.of(required(5, "element_str", Types.StringType.get())))) - .withDoc("Used to test nested field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - NestedField.optional("nested_list") - .withId(3) - .ofType( - Types.ListType.ofOptional( - 4, - Types.StructType.of( - required(5, "element_str", Types.StringType.get()), - Types.NestedField.optional("element_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()))) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index b3a1948d0a35..efdf9cc9b01d 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -76,6 +76,10 @@ protected ParquetValueReader createReader( protected abstract ParquetValueReader createStructReader( List types, List> fieldReaders, Types.StructType structType); + protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { + return value; + } + private class FallbackReadBuilder extends ReadBuilder { private FallbackReadBuilder(MessageType type, Map idToConstant) { super(type, idToConstant); @@ -283,7 +287,7 @@ public ParquetValueReader struct( } else if (field.initialDefault() != null) { reorderedFields.add( ParquetValueReaders.constant( - field.initialDefault(), + convertConstant(field.type(), field.initialDefault()), maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); types.add(typesById.get(id)); } else if (field.isOptional()) { diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 99b8c9baad64..8023cef71dae 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericDataUtil; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetValueReader; @@ -51,6 +52,11 @@ protected ParquetValueReader createStructReader( return new RecordReader(types, fieldReaders, structType); } + @Override + protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { + return GenericDataUtil.internalToGeneric(type, value); + } + private static class RecordReader extends StructReader { private final GenericRecord template; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java index 4bd2e9c21551..c88a907e0f29 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java @@ -287,7 +287,7 @@ private static String toExecutorLocation(BlockManagerId id) { * @param value a value that is an instance of {@link Type.TypeID#javaClass()} * @return the value converted for Spark */ - public static Object convertConstant(Type type, Object value) { + public static Object internalToSpark(Type type, Object value) { if (value == null) { return null; } @@ -295,6 +295,7 @@ public static Object convertConstant(Type type, Object value) { switch (type.typeId()) { case DECIMAL: return Decimal.apply((BigDecimal) value); + case UUID: case STRING: if (value instanceof Utf8) { Utf8 utf8 = (Utf8) value; @@ -325,7 +326,7 @@ public static Object convertConstant(Type type, Object value) { Types.NestedField field = fields.get(index); Type fieldType = field.type(); values[index] = - convertConstant(fieldType, struct.get(index, fieldType.typeId().javaClass())); + internalToSpark(fieldType, struct.get(index, fieldType.typeId().javaClass())); } return new GenericInternalRow(values); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 65e5843e39b3..3ce54d2d9ffa 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -186,7 +186,7 @@ public ParquetValueReader struct( } else if (field.initialDefault() != null) { reorderedFields.add( ParquetValueReaders.constant( - SparkUtil.convertConstant(field.type(), field.initialDefault()), + SparkUtil.internalToSpark(field.type(), field.initialDefault()), maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); types.add(typesById.get(id)); } else if (field.isOptional()) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkPlannedAvroReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkPlannedAvroReader.java index dc4af24685b3..7bcd8881c10b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkPlannedAvroReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkPlannedAvroReader.java @@ -32,6 +32,7 @@ import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; @@ -97,7 +98,8 @@ public ValueReader record(Type partner, Schema record, List> f Types.StructType expected = partner.asStructType(); List>> readPlan = - ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant); + ValueReaders.buildReadPlan( + expected, record, fieldReaders, idToConstant, SparkUtil::internalToSpark); // TODO: should this pass expected so that struct.get can reuse containers? return SparkValueReaders.struct(readPlan, expected.fields().size()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 6f0ee1d2e2a0..6b3c3d3f2cf3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -184,9 +184,9 @@ private Map inputFiles() { protected Map constantsMap(ContentScanTask task, Schema readSchema) { if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { StructType partitionType = Partitioning.partitionType(table); - return PartitionUtil.constantsMap(task, partitionType, SparkUtil::convertConstant); + return PartitionUtil.constantsMap(task, partitionType, SparkUtil::internalToSpark); } else { - return PartitionUtil.constantsMap(task, SparkUtil::convertConstant); + return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java index 8f90a51a6e30..4f7eab30a47d 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -20,28 +20,49 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.ListType; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.spark.sql.internal.SQLConf; +import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public abstract class AvroDataTest { protected abstract void writeAndValidate(Schema schema) throws IOException; + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + throw new UnsupportedEncodingException( + "Cannot run test, writeAndValidate(Schema, Schema) is not implemented"); + } + + protected boolean supportsDefaultValues() { + return false; + } + protected static final StructType SUPPORTED_PRIMITIVES = StructType.of( required(100, "id", LongType.get()), @@ -238,6 +259,277 @@ public void testTimestampWithoutZone() throws IOException { writeAndValidate(schema); } + @Test + public void testMissingRequiredWithoutDefault() { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = new Schema(required(1, "id", Types.LongType.get())); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.required("missing_str") + .withId(6) + .ofType(Types.StringType.get()) + .withDoc("Missing required field with no default") + .build()); + + assertThatThrownBy(() -> writeAndValidate(writeSchema, expectedSchema)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Missing required field: missing_str"); + } + + @Test + public void testDefaultValues() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.required("missing_str") + .withId(6) + .ofType(Types.StringType.get()) + .withInitialDefault("orange") + .build(), + Types.NestedField.optional("missing_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testNullDefaultValue() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("missing_date") + .withId(3) + .ofType(Types.DateType.get()) + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testNestedDefaultValue() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested") + .withId(3) + .ofType(Types.StructType.of(required(4, "inner", Types.StringType.get()))) + .withDoc("Used to test nested field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested") + .withId(3) + .ofType( + Types.StructType.of( + required(4, "inner", Types.StringType.get()), + Types.NestedField.optional("missing_inner_float") + .withId(5) + .ofType(Types.FloatType.get()) + .withInitialDefault(-0.0F) + .build())) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testMapNestedDefaultValue() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested_map") + .withId(3) + .ofType( + Types.MapType.ofOptional( + 4, + 5, + Types.StringType.get(), + Types.StructType.of(required(6, "value_str", Types.StringType.get())))) + .withDoc("Used to test nested map value field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested_map") + .withId(3) + .ofType( + Types.MapType.ofOptional( + 4, + 5, + Types.StringType.get(), + Types.StructType.of( + required(6, "value_str", Types.StringType.get()), + Types.NestedField.optional("value_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()))) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testListNestedDefaultValue() throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested_list") + .withId(3) + .ofType( + Types.ListType.ofOptional( + 4, Types.StructType.of(required(5, "element_str", Types.StringType.get())))) + .withDoc("Used to test nested field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested_list") + .withId(3) + .ofType( + Types.ListType.ofOptional( + 4, + Types.StructType.of( + required(5, "element_str", Types.StringType.get()), + Types.NestedField.optional("element_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()))) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + private static Stream primitiveTypesAndDefaults() { + return Stream.of( + Arguments.of(Types.BooleanType.get(), false), + Arguments.of(Types.IntegerType.get(), 34), + Arguments.of(Types.LongType.get(), 4900000000L), + Arguments.of(Types.FloatType.get(), 12.21F), + Arguments.of(Types.DoubleType.get(), -0.0D), + Arguments.of(Types.DateType.get(), DateTimeUtil.isoDateToDays("2024-12-17")), + // Arguments.of(Types.TimeType.get(), DateTimeUtil.isoTimeToMicros("23:59:59.999999")), + Arguments.of( + Types.TimestampType.withZone(), + DateTimeUtil.isoTimestamptzToMicros("2024-12-17T23:59:59.999999+00:00")), + Arguments.of( + Types.TimestampType.withoutZone(), + DateTimeUtil.isoTimestampToMicros("2024-12-17T23:59:59.999999")), + Arguments.of(Types.StringType.get(), "iceberg"), + Arguments.of(Types.UUIDType.get(), UUID.randomUUID()), + Arguments.of( + Types.FixedType.ofLength(4), ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d})), + Arguments.of(Types.BinaryType.get(), ByteBuffer.wrap(new byte[] {0x0a, 0x0b})), + Arguments.of(Types.DecimalType.of(9, 2), new BigDecimal("12.34"))); + } + + @ParameterizedTest + @MethodSource("primitiveTypesAndDefaults") + public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defaultValue) + throws IOException { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema writeSchema = new Schema(required(1, "id", Types.LongType.get())); + + Schema readSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("col_with_default") + .withId(2) + .ofType(type) + .withInitialDefault(defaultValue) + .build()); + + writeAndValidate(writeSchema, readSchema); + } + protected void withSQLConf(Map conf, Action action) throws IOException { SQLConf sqlConf = SQLConf.get(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index 5511ce24337e..d3d69e4b9d86 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -60,6 +60,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DeleteFileSet; import org.apache.orc.storage.serde2.io.DateWritable; import org.apache.spark.sql.Column; @@ -356,11 +357,21 @@ private static void assertEqualsUnsafe(Type type, Object expected, Object actual .isEqualTo(String.valueOf(expected)); break; case FIXED: - assertThat(expected).as("Should expect a Fixed").isInstanceOf(GenericData.Fixed.class); + // generated data is written using Avro or Parquet/Avro so generated rows use + // GenericData.Fixed, but default values are converted from Iceberg's internal + // representation so the expected value may be either class. + byte[] expectedBytes; + if (expected instanceof ByteBuffer) { + expectedBytes = ByteBuffers.toByteArray((ByteBuffer) expected); + } else if (expected instanceof GenericData.Fixed) { + expectedBytes = ((GenericData.Fixed) expected).bytes(); + } else { + throw new IllegalStateException( + "Invalid expected value, not byte[] or Fixed: " + expected); + } + assertThat(actual).as("Should be a byte[]").isInstanceOf(byte[].class); - assertThat(actual) - .as("Bytes should match") - .isEqualTo(((GenericData.Fixed) expected).bytes()); + assertThat(actual).as("Bytes should match").isEqualTo(expectedBytes); break; case BINARY: assertThat(expected).as("Should expect a ByteBuffer").isInstanceOf(ByteBuffer.class); @@ -384,12 +395,12 @@ private static void assertEqualsUnsafe(Type type, Object expected, Object actual assertThat(expected).as("Should expect a Collection").isInstanceOf(Collection.class); assertThat(actual).as("Should be an ArrayData").isInstanceOf(ArrayData.class); assertEqualsUnsafe( - type.asNestedType().asListType(), (Collection) expected, (ArrayData) actual); + type.asNestedType().asListType(), (Collection) expected, (ArrayData) actual); break; case MAP: assertThat(expected).as("Should expect a Map").isInstanceOf(Map.class); assertThat(actual).as("Should be an ArrayBasedMapData").isInstanceOf(MapData.class); - assertEqualsUnsafe(type.asNestedType().asMapType(), (Map) expected, (MapData) actual); + assertEqualsUnsafe(type.asNestedType().asMapType(), (Map) expected, (MapData) actual); break; case TIME: default: diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java index 7f9bcbacf298..002539a97620 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReader.java @@ -36,13 +36,18 @@ public class TestSparkAvroReader extends AvroDataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { - List expected = RandomData.generateList(schema, 100, 0L); + writeAndValidate(schema, schema); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expected = RandomData.generateList(writeSchema, 100, 0L); File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).as("Delete should succeed").isTrue(); try (FileAppender writer = - Avro.write(Files.localOutput(testFile)).schema(schema).named("test").build()) { + Avro.write(Files.localOutput(testFile)).schema(writeSchema).named("test").build()) { for (Record rec : expected) { writer.add(rec); } @@ -52,13 +57,18 @@ protected void writeAndValidate(Schema schema) throws IOException { try (AvroIterable reader = Avro.read(Files.localInput(testFile)) .createResolvingReader(SparkPlannedAvroReader::create) - .project(schema) + .project(expectedSchema) .build()) { rows = Lists.newArrayList(reader); } for (int i = 0; i < expected.size(); i += 1) { - assertEqualsUnsafe(schema.asStruct(), expected.get(i), rows.get(i)); + assertEqualsUnsafe(expectedSchema.asStruct(), expected.get(i), rows.get(i)); } } + + @Override + protected boolean supportsDefaultValues() { + return true; + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index 0ac0bb530c77..1cd4fccfdd3f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -67,6 +67,7 @@ protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { assumeThat( TypeUtil.find( @@ -99,6 +100,11 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw } } + @Override + protected boolean supportsDefaultValues() { + return true; + } + protected List rowsFromFile(InputFile inputFile, Schema schema) throws IOException { try (CloseableIterable reader = Parquet.read(inputFile) @@ -225,202 +231,4 @@ public void testMissingRequiredWithoutDefault() { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Missing required field: missing_str"); } - - @Test - public void testDefaultValues() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.required("missing_str") - .withId(6) - .ofType(Types.StringType.get()) - .withInitialDefault("orange") - .build(), - Types.NestedField.optional("missing_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testNullDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.optional("missing_date") - .withId(3) - .ofType(Types.DateType.get()) - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - Types.NestedField.optional("nested") - .withId(3) - .ofType(Types.StructType.of(required(4, "inner", Types.StringType.get()))) - .withDoc("Used to test nested field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.optional("nested") - .withId(3) - .ofType( - Types.StructType.of( - required(4, "inner", Types.StringType.get()), - Types.NestedField.optional("missing_inner_float") - .withId(5) - .ofType(Types.FloatType.get()) - .withInitialDefault(-0.0F) - .build())) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testMapNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - Types.NestedField.optional("nested_map") - .withId(3) - .ofType( - Types.MapType.ofOptional( - 4, - 5, - Types.StringType.get(), - Types.StructType.of(required(6, "value_str", Types.StringType.get())))) - .withDoc("Used to test nested map value field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.optional("nested_map") - .withId(3) - .ofType( - Types.MapType.ofOptional( - 4, - 5, - Types.StringType.get(), - Types.StructType.of( - required(6, "value_str", Types.StringType.get()), - Types.NestedField.optional("value_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()))) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } - - @Test - public void testListNestedDefaultValue() throws IOException { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build(), - Types.NestedField.optional("nested_list") - .withId(3) - .ofType( - Types.ListType.ofOptional( - 4, Types.StructType.of(required(5, "element_str", Types.StringType.get())))) - .withDoc("Used to test nested field defaults") - .build()); - - Schema expectedSchema = - new Schema( - required(1, "id", Types.LongType.get()), - Types.NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), - Types.NestedField.optional("nested_list") - .withId(3) - .ofType( - Types.ListType.ofOptional( - 4, - Types.StructType.of( - required(5, "element_str", Types.StringType.get()), - Types.NestedField.optional("element_int") - .withId(7) - .ofType(Types.IntegerType.get()) - .withInitialDefault(34) - .build()))) - .withDoc("Used to test nested field defaults") - .build()); - - writeAndValidate(writeSchema, expectedSchema); - } }