Skip to content

Commit

Permalink
Data: Fix Parquet and Avro defaults date/time representation (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue authored Dec 18, 2024
1 parent 7e1a4c9 commit d0effc6
Show file tree
Hide file tree
Showing 17 changed files with 742 additions and 652 deletions.
25 changes: 24 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand All @@ -44,6 +45,7 @@
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.UUIDUtil;
Expand Down Expand Up @@ -199,6 +201,25 @@ public static List<Pair<Integer, ValueReader<?>>> buildReadPlan(
Schema record,
List<ValueReader<?>> fieldReaders,
Map<Integer, ?> idToConstant) {
return buildReadPlan(expected, record, fieldReaders, idToConstant, (type, value) -> value);
}

/**
* Builds a read plan for record classes that use planned reads instead of a ResolvingDecoder.
*
* @param expected expected StructType
* @param record Avro record schema
* @param fieldReaders list of readers for each field in the Avro record schema
* @param idToConstant a map of field ID to constants values
* @param convert function to convert from internal classes to the target object model
* @return a read plan that is a list of (position, reader) pairs
*/
public static List<Pair<Integer, ValueReader<?>>> buildReadPlan(
Types.StructType expected,
Schema record,
List<ValueReader<?>> fieldReaders,
Map<Integer, ?> idToConstant,
BiFunction<Type, Object, Object> convert) {
Map<Integer, Integer> idToPos = idToPos(expected);

List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
Expand Down Expand Up @@ -228,7 +249,9 @@ public static List<Pair<Integer, ValueReader<?>>> buildReadPlan(
if (constant != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
} else if (field.initialDefault() != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(field.initialDefault())));
readPlan.add(
Pair.of(
pos, ValueReaders.constant(convert.apply(field.type(), field.initialDefault()))));
} else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
} else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
Expand Down
60 changes: 60 additions & 0 deletions core/src/main/java/org/apache/iceberg/data/GenericDataUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data;

import java.nio.ByteBuffer;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;

/** Utility methods for working with Iceberg's generic data model */
public class GenericDataUtil {
private GenericDataUtil() {}

/**
* Convert a value from Iceberg's internal data model to the generic data model.
*
* @param type a data type
* @param value value to convert
* @return the value in the generic data model representation
*/
public static Object internalToGeneric(Type type, Object value) {
if (null == value) {
return null;
}

switch (type.typeId()) {
case DATE:
return DateTimeUtil.dateFromDays((Integer) value);
case TIME:
return DateTimeUtil.timeFromMicros((Long) value);
case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
return DateTimeUtil.timestamptzFromMicros((Long) value);
} else {
return DateTimeUtil.timestampFromMicros((Long) value);
}
case FIXED:
return ByteBuffers.toByteArray((ByteBuffer) value);
}

return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.GenericDataUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -97,7 +98,8 @@ public ValueReader<?> record(Type partner, Schema record, List<ValueReader<?>> f

Types.StructType expected = partner.asStructType();
List<Pair<Integer, ValueReader<?>>> readPlan =
ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant);
ValueReaders.buildReadPlan(
expected, record, fieldReaders, idToConstant, GenericDataUtil::internalToGeneric);

return GenericReaders.struct(readPlan, expected);
}
Expand Down
Loading

0 comments on commit d0effc6

Please sign in to comment.