From bc3213093fe52a8b1157b63ad6290f81db107706 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Fri, 16 Feb 2024 08:26:38 -0800 Subject: [PATCH] Core: add JSON serialization for BaseFilesTable.ManifestReadTask, AllManifestsTable.ManifestListReadTask, and BaseEntriesTable.ManifestReadTask --- .palantir/revapi.yml | 4 + .../org/apache/iceberg/AllManifestsTable.java | 41 ++- .../iceberg/AllManifestsTableTaskParser.java | 107 +++++++ .../org/apache/iceberg/BaseEntriesTable.java | 51 ++-- .../org/apache/iceberg/BaseFilesTable.java | 50 +++- .../apache/iceberg/FilesTableTaskParser.java | 99 +++++++ .../apache/iceberg/GenericManifestFile.java | 36 +++ .../ManifestEntriesTableTaskParser.java | 98 +++++++ .../apache/iceberg/ManifestFileParser.java | 277 ++++++++++++++++++ .../org/apache/iceberg/ScanTaskParser.java | 28 +- .../org/apache/iceberg/TableMetadata.java | 13 +- .../apache/iceberg/hadoop/HadoopFileIO.java | 20 +- .../org/apache/iceberg/io/FileIOParser.java | 4 +- .../apache/iceberg/util/PartitionUtil.java | 10 + .../TestAllManifestsTableTaskParser.java | 108 +++++++ .../iceberg/TestFilesTableTaskParser.java | 92 ++++++ .../iceberg/TestManifestFileParser.java | 74 +++++ 17 files changed, 1063 insertions(+), 49 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/AllManifestsTableTaskParser.java create mode 100644 core/src/main/java/org/apache/iceberg/FilesTableTaskParser.java create mode 100644 core/src/main/java/org/apache/iceberg/ManifestEntriesTableTaskParser.java create mode 100644 core/src/main/java/org/apache/iceberg/ManifestFileParser.java create mode 100644 core/src/test/java/org/apache/iceberg/TestAllManifestsTableTaskParser.java create mode 100644 core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java create mode 100644 core/src/test/java/org/apache/iceberg/TestManifestFileParser.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 3018840b4513..430c72120770 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -874,6 +874,10 @@ acceptedBreaks: justification: "Static utility class - should not have public constructor" "1.4.0": org.apache.iceberg:iceberg-core: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.GenericManifestFile" + new: "class org.apache.iceberg.GenericManifestFile" + justification: "Serialization across versions is not supported" - code: "java.class.defaultSerializationChanged" old: "class org.apache.iceberg.PartitionData" new: "class org.apache.iceberg.PartitionData" diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index a9c6f50e0b36..641a7a3c9aec 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -52,7 +53,8 @@ public class AllManifestsTable extends BaseMetadataTable { public static final Types.NestedField REF_SNAPSHOT_ID = Types.NestedField.required(18, "reference_snapshot_id", Types.LongType.get()); - private static final Schema MANIFEST_FILE_SCHEMA = + @VisibleForTesting + static final Schema MANIFEST_FILE_SCHEMA = new Schema( Types.NestedField.required(14, "content", Types.IntegerType.get()), Types.NestedField.required(1, "path", Types.StringType.get()), @@ -119,6 +121,7 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext protected CloseableIterable doPlanFiles() { FileIO io = table().io(); Map specs = Maps.newHashMap(table().specs()); + Schema dataTableSchema = table().schema(); Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter(); SnapshotEvaluator snapshotEvaluator = @@ -132,7 +135,13 @@ protected CloseableIterable doPlanFiles() { snap -> { if (snap.manifestListLocation() != null) { return new ManifestListReadTask( - io, schema(), specs, snap.manifestListLocation(), filter, snap.snapshotId()); + dataTableSchema, + io, + schema(), + specs, + snap.manifestListLocation(), + filter, + snap.snapshotId()); } else { return StaticDataTask.of( io.newInputFile( @@ -149,6 +158,7 @@ protected CloseableIterable doPlanFiles() { } static class ManifestListReadTask implements DataTask { + private final Schema dataTableSchema; private final FileIO io; private final Schema schema; private final Map specs; @@ -158,12 +168,14 @@ static class ManifestListReadTask implements DataTask { private DataFile lazyDataFile = null; ManifestListReadTask( + Schema dataTableSchema, FileIO io, Schema schema, Map specs, String manifestListLocation, Expression residual, long referenceSnapshotId) { + this.dataTableSchema = dataTableSchema; this.io = io; this.schema = schema; this.specs = specs; @@ -244,6 +256,31 @@ public Expression residual() { public Iterable split(long splitSize) { return ImmutableList.of(this); // don't split } + + @Override + public Schema schema() { + return schema; + } + + Schema dataTableSchema() { + return dataTableSchema; + } + + FileIO io() { + return io; + } + + Map specsById() { + return specs; + } + + String manifestListLocation() { + return manifestListLocation; + } + + long referenceSnapshotId() { + return referenceSnapshotId; + } } static StaticDataTask.Row manifestFileToRow( diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTableTaskParser.java b/core/src/main/java/org/apache/iceberg/AllManifestsTableTaskParser.java new file mode 100644 index 000000000000..1a372eed8eba --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTableTaskParser.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.util.PartitionUtil; + +class AllManifestsTableTaskParser { + private static final String DATA_TABLE_SCHEMA = "data-table-schema"; + private static final String FILE_IO = "file-io"; + private static final String SCHEMA = "schema"; + private static final String SPECS = "partition-specs"; + private static final String MANIFEST_LIST_LOCATION = "manifest-list-Location"; + private static final String RESIDUAL = "residual-filter"; + private static final String REFERENCE_SNAPSHOT_ID = "reference-snapshot-id"; + + private AllManifestsTableTaskParser() {} + + static void toJson(AllManifestsTable.ManifestListReadTask task, JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(task != null, "Invalid manifest task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeFieldName(DATA_TABLE_SCHEMA); + SchemaParser.toJson(task.dataTableSchema(), generator); + + generator.writeFieldName(FILE_IO); + FileIOParser.toJson(task.io(), generator); + + generator.writeFieldName(SCHEMA); + SchemaParser.toJson(task.schema(), generator); + + generator.writeArrayFieldStart(SPECS); + for (PartitionSpec spec : task.specsById().values()) { + PartitionSpecParser.toJson(spec, generator); + } + + generator.writeEndArray(); + + generator.writeStringField(MANIFEST_LIST_LOCATION, task.manifestListLocation()); + + generator.writeFieldName(RESIDUAL); + ExpressionParser.toJson(task.residual(), generator); + + generator.writeNumberField(REFERENCE_SNAPSHOT_ID, task.referenceSnapshotId()); + } + + static AllManifestsTable.ManifestListReadTask fromJson(JsonNode jsonNode) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for manifest task: non-object (%s)", jsonNode); + + Schema dataTableSchema = SchemaParser.fromJson(JsonUtil.get(DATA_TABLE_SCHEMA, jsonNode)); + FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null); + Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode)); + + JsonNode specsArray = jsonNode.get(SPECS); + Preconditions.checkArgument( + specsArray.isArray(), "Invalid JSON node for partition specs: non-array (%s)", specsArray); + + ImmutableList.Builder specsBuilder = ImmutableList.builder(); + for (JsonNode specNode : specsArray) { + PartitionSpec spec = PartitionSpecParser.fromJson(dataTableSchema, specNode); + specsBuilder.add(spec); + } + + Map specsById = PartitionUtil.indexSpecs(specsBuilder.build()); + String manifestListLocation = JsonUtil.getString(MANIFEST_LIST_LOCATION, jsonNode); + Expression residualFilter = ExpressionParser.fromJson(JsonUtil.get(RESIDUAL, jsonNode)); + long referenceSnapshotId = JsonUtil.getLong(REFERENCE_SNAPSHOT_ID, jsonNode); + + return new AllManifestsTable.ManifestListReadTask( + dataTableSchema, + fileIO, + schema, + specsById, + manifestListLocation, + residualFilter, + referenceSnapshotId); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java index f4019d688cb8..dd6c2ae1c2a7 100644 --- a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java @@ -33,7 +33,6 @@ import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -92,15 +91,9 @@ static CloseableIterable planFiles( evalCache.get(manifest.partitionSpecId()).eval(manifest) && manifestContentEvaluator.eval(manifest)); - String schemaString = SchemaParser.toJson(projectedSchema); - String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); - ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); - return CloseableIterable.transform( filteredManifests, - manifest -> - new ManifestReadTask( - table, manifest, projectedSchema, schemaString, specString, residuals)); + manifest -> new ManifestReadTask(table, manifest, projectedSchema, filter)); } /** @@ -283,19 +276,28 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final ManifestFile manifest; private final Map specsById; + ManifestReadTask(Table table, ManifestFile manifest, Schema projection, Expression filter) { + this(table.schema(), table.io(), table.specs(), manifest, projection, filter); + } + ManifestReadTask( - Table table, + Schema dataTableSchema, + FileIO io, + Map specsById, ManifestFile manifest, Schema projection, - String schemaString, - String specString, - ResidualEvaluator residuals) { - super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); + Expression filter) { + super( + DataFiles.fromManifest(manifest), + null, + SchemaParser.toJson(projection), + PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), + ResidualEvaluator.unpartitioned(filter)); this.projection = projection; - this.io = table.io(); + this.io = io; this.manifest = manifest; - this.specsById = Maps.newHashMap(table.specs()); - this.dataTableSchema = table.schema(); + this.specsById = Maps.newHashMap(specsById); + this.dataTableSchema = dataTableSchema; Type fileProjectionType = projection.findType("data_file"); this.fileProjection = @@ -311,7 +313,6 @@ public long estimatedRowsCount() { + (long) manifest.existingFilesCount(); } - @VisibleForTesting ManifestFile manifest() { return manifest; } @@ -403,5 +404,21 @@ private MetricsUtil.ReadableMetricsStruct readableMetrics( public Iterable split(long splitSize) { return ImmutableList.of(this); // don't split } + + FileIO io() { + return io; + } + + Map specsById() { + return specsById; + } + + Schema dataTableSchema() { + return dataTableSchema; + } + + Schema projection() { + return projection; + } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java index 149edf950032..4880363f0d03 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java @@ -29,7 +29,6 @@ import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -80,16 +79,11 @@ private static CloseableIterable planFiles( CloseableIterable.filter( manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); - String schemaString = SchemaParser.toJson(projectedSchema); - String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; - ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); return CloseableIterable.transform( filteredManifests, - manifest -> - new ManifestReadTask( - table, manifest, projectedSchema, schemaString, specString, residuals)); + manifest -> new ManifestReadTask(table, manifest, projectedSchema, filter)); } abstract static class BaseFilesTableScan extends BaseMetadataTableScan { @@ -140,18 +134,27 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final Schema dataTableSchema; private final Schema projection; + ManifestReadTask(Table table, ManifestFile manifest, Schema projection, Expression filter) { + this(table.schema(), table.io(), table.specs(), manifest, projection, filter); + } + ManifestReadTask( - Table table, + Schema dataTableSchema, + FileIO io, + Map specsById, ManifestFile manifest, Schema projection, - String schemaString, - String specString, - ResidualEvaluator residuals) { - super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); - this.io = table.io(); - this.specsById = Maps.newHashMap(table.specs()); + Expression filter) { + super( + DataFiles.fromManifest(manifest), + null, + SchemaParser.toJson(projection), + PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), + ResidualEvaluator.unpartitioned(filter)); + this.io = io; + this.specsById = Maps.newHashMap(specsById); this.manifest = manifest; - this.dataTableSchema = table.schema(); + this.dataTableSchema = dataTableSchema; this.projection = projection; } @@ -240,9 +243,24 @@ public Iterable split(long splitSize) { return ImmutableList.of(this); // don't split } - @VisibleForTesting + FileIO io() { + return io; + } + + Map specsById() { + return specsById; + } + ManifestFile manifest() { return manifest; } + + Schema dataTableSchema() { + return dataTableSchema; + } + + Schema projection() { + return projection; + } } } diff --git a/core/src/main/java/org/apache/iceberg/FilesTableTaskParser.java b/core/src/main/java/org/apache/iceberg/FilesTableTaskParser.java new file mode 100644 index 000000000000..3cc0040d173d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/FilesTableTaskParser.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.util.PartitionUtil; + +class FilesTableTaskParser { + private static final String FILE_IO = "file-io"; + private static final String SPECS = "partition-specs"; + private static final String SCHEMA = "schema"; + private static final String PROJECTION = "projection"; + private static final String RESIDUAL = "residual-filter"; + private static final String MANIFEST = "manifest-file"; + + private FilesTableTaskParser() {} + + static void toJson(BaseFilesTable.ManifestReadTask task, JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(task != null, "Invalid manifest task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeFieldName(SCHEMA); + SchemaParser.toJson(task.dataTableSchema(), generator); + + generator.writeFieldName(PROJECTION); + SchemaParser.toJson(task.projection(), generator); + + generator.writeFieldName(FILE_IO); + FileIOParser.toJson(task.io(), generator); + + generator.writeArrayFieldStart(SPECS); + for (PartitionSpec spec : task.specsById().values()) { + PartitionSpecParser.toJson(spec, generator); + } + + generator.writeEndArray(); + + generator.writeFieldName(RESIDUAL); + ExpressionParser.toJson(task.residual(), generator); + + generator.writeFieldName(MANIFEST); + ManifestFileParser.toJson(task.manifest(), generator); + } + + static BaseFilesTable.ManifestReadTask fromJson(JsonNode jsonNode) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for manifest task: non-object (%s)", jsonNode); + + Schema dataTableSchema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode)); + Schema projection = SchemaParser.fromJson(JsonUtil.get(PROJECTION, jsonNode)); + + FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null); + + JsonNode specsArray = jsonNode.get(SPECS); + Preconditions.checkArgument( + specsArray.isArray(), "Invalid JSON node for partition specs: non-array (%s)", specsArray); + + ImmutableList.Builder specsBuilder = ImmutableList.builder(); + for (JsonNode specNode : specsArray) { + PartitionSpec spec = PartitionSpecParser.fromJson(dataTableSchema, specNode); + specsBuilder.add(spec); + } + + Map specsById = PartitionUtil.indexSpecs(specsBuilder.build()); + Expression residualFilter = ExpressionParser.fromJson(JsonUtil.get(RESIDUAL, jsonNode)); + ManifestFile manifestFile = ManifestFileParser.fromJson(JsonUtil.get(MANIFEST, jsonNode)); + + return new BaseFilesTable.ManifestReadTask( + dataTableSchema, fileIO, specsById, manifestFile, projection, residualFilter); + } +} diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index ed94da5e1791..d081e0bdd568 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -105,6 +105,42 @@ public GenericManifestFile(Schema avroSchema) { this.keyMetadata = null; } + /** Adjust the arg order to avoid conflict with the public constructor below */ + GenericManifestFile( + String path, + long length, + int specId, + ManifestContent content, + long sequenceNumber, + long minSequenceNumber, + Long snapshotId, + List partitions, + ByteBuffer keyMetadata, + Integer addedFilesCount, + Long addedRowsCount, + Integer existingFilesCount, + Long existingRowsCount, + Integer deletedFilesCount, + Long deletedRowsCount) { + this.avroSchema = AVRO_SCHEMA; + this.manifestPath = path; + this.length = length; + this.specId = specId; + this.content = content; + this.sequenceNumber = sequenceNumber; + this.minSequenceNumber = minSequenceNumber; + this.snapshotId = snapshotId; + this.addedFilesCount = addedFilesCount; + this.addedRowsCount = addedRowsCount; + this.existingFilesCount = existingFilesCount; + this.existingRowsCount = existingRowsCount; + this.deletedFilesCount = deletedFilesCount; + this.deletedRowsCount = deletedRowsCount; + this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); + this.fromProjectionPos = null; + this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); + } + public GenericManifestFile( String path, long length, diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTableTaskParser.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTableTaskParser.java new file mode 100644 index 000000000000..4fa9e6967eb9 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTableTaskParser.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.util.PartitionUtil; + +class ManifestEntriesTableTaskParser { + private static final String FILE_IO = "file-io"; + private static final String SPECS = "partition-specs"; + private static final String SCHEMA = "schema"; + private static final String PROJECTION = "projection"; + private static final String RESIDUAL = "residual-filter"; + private static final String MANIFEST = "manifest-file"; + + private ManifestEntriesTableTaskParser() {} + + static void toJson(BaseEntriesTable.ManifestReadTask task, JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(task != null, "Invalid manifest task: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeFieldName(SCHEMA); + SchemaParser.toJson(task.dataTableSchema(), generator); + + generator.writeFieldName(FILE_IO); + FileIOParser.toJson(task.io(), generator); + + generator.writeArrayFieldStart(SPECS); + for (PartitionSpec spec : task.specsById().values()) { + PartitionSpecParser.toJson(spec, generator); + } + + generator.writeEndArray(); + + generator.writeFieldName(MANIFEST); + ManifestFileParser.toJson(task.manifest(), generator); + + generator.writeFieldName(PROJECTION); + SchemaParser.toJson(task.projection(), generator); + + generator.writeFieldName(RESIDUAL); + ExpressionParser.toJson(task.residual(), generator); + } + + static BaseEntriesTable.ManifestReadTask fromJson(JsonNode jsonNode) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest task: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for manifest task: non-object (%s)", jsonNode); + + Schema dataTableSchema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode)); + FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null); + + JsonNode specsArray = jsonNode.get(SPECS); + Preconditions.checkArgument( + specsArray.isArray(), "Invalid JSON node for partition specs: non-array (%s)", specsArray); + ImmutableList.Builder specsBuilder = ImmutableList.builder(); + for (JsonNode specNode : specsArray) { + PartitionSpec spec = PartitionSpecParser.fromJson(dataTableSchema, specNode); + specsBuilder.add(spec); + } + + Map specsById = PartitionUtil.indexSpecs(specsBuilder.build()); + + ManifestFile manifestFile = ManifestFileParser.fromJson(JsonUtil.get(MANIFEST, jsonNode)); + Schema projection = SchemaParser.fromJson(JsonUtil.get(PROJECTION, jsonNode)); + Expression residualFilter = ExpressionParser.fromJson(JsonUtil.get(RESIDUAL, jsonNode)); + + return new BaseEntriesTable.ManifestReadTask( + dataTableSchema, fileIO, specsById, manifestFile, projection, residualFilter); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestFileParser.java b/core/src/main/java/org/apache/iceberg/ManifestFileParser.java new file mode 100644 index 000000000000..af3bcde58f0e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ManifestFileParser.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.JsonUtil; + +class ManifestFileParser { + private static final String PATH = "path"; + private static final String LENGTH = "length"; + private static final String SPEC_ID = "partition-spec-id"; + private static final String CONTENT = "content"; + private static final String SEQUENCE_NUMBER = "sequence-number"; + private static final String MIN_SEQUENCE_NUMBER = "min-sequence-number"; + private static final String ADDED_SNAPSHOT_ID = "added-snapshot-id"; + private static final String ADDED_FILES_COUNT = "added-files-count"; + private static final String EXISTING_FILES_COUNT = "existing-files-count"; + private static final String DELETED_FILES_COUNT = "deleted-files-count"; + private static final String ADDED_ROWS_COUNT = "added-rows-count"; + private static final String EXISTING_ROWS_COUNT = "existing-rows-count"; + private static final String DELETED_ROWS_COUNT = "deleted-rows-count"; + private static final String PARTITION_FIELD_SUMMARY = "partition-field-summary"; + private static final String KEY_METADATA = "key-metadata"; + + private ManifestFileParser() {} + + static void toJson(ManifestFile manifestFile, JsonGenerator generator) throws IOException { + Preconditions.checkArgument(manifestFile != null, "Invalid manifest file: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeStartObject(); + + generator.writeStringField(PATH, manifestFile.path()); + generator.writeNumberField(LENGTH, manifestFile.length()); + generator.writeNumberField(SPEC_ID, manifestFile.partitionSpecId()); + + if (manifestFile.content() != null) { + generator.writeNumberField(CONTENT, manifestFile.content().id()); + } + + generator.writeNumberField(SEQUENCE_NUMBER, manifestFile.sequenceNumber()); + generator.writeNumberField(MIN_SEQUENCE_NUMBER, manifestFile.minSequenceNumber()); + + if (manifestFile.snapshotId() != null) { + generator.writeNumberField(ADDED_SNAPSHOT_ID, manifestFile.snapshotId()); + } + + if (manifestFile.addedFilesCount() != null) { + generator.writeNumberField(ADDED_FILES_COUNT, manifestFile.addedFilesCount()); + } + + if (manifestFile.existingFilesCount() != null) { + generator.writeNumberField(EXISTING_FILES_COUNT, manifestFile.existingFilesCount()); + } + + if (manifestFile.deletedFilesCount() != null) { + generator.writeNumberField(DELETED_FILES_COUNT, manifestFile.deletedFilesCount()); + } + + if (manifestFile.addedRowsCount() != null) { + generator.writeNumberField(ADDED_ROWS_COUNT, manifestFile.addedRowsCount()); + } + + if (manifestFile.existingRowsCount() != null) { + generator.writeNumberField(EXISTING_ROWS_COUNT, manifestFile.existingRowsCount()); + } + + if (manifestFile.deletedRowsCount() != null) { + generator.writeNumberField(DELETED_ROWS_COUNT, manifestFile.deletedRowsCount()); + } + + if (manifestFile.partitions() != null) { + generator.writeArrayFieldStart(PARTITION_FIELD_SUMMARY); + for (ManifestFile.PartitionFieldSummary summary : manifestFile.partitions()) { + PartitionFieldSummaryParser.toJson(summary, generator); + } + + generator.writeEndArray(); + } + + if (manifestFile.keyMetadata() != null) { + generator.writeStringField( + KEY_METADATA, + BaseEncoding.base16().encode(ByteBuffers.toByteArray(manifestFile.keyMetadata()))); + } + + generator.writeEndObject(); + } + + static ManifestFile fromJson(JsonNode jsonNode) { + Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest file: null"); + Preconditions.checkArgument( + jsonNode.isObject(), "Invalid JSON node for manifest file: non-object (%s)", jsonNode); + + String path = JsonUtil.getString(PATH, jsonNode); + long length = JsonUtil.getLong(LENGTH, jsonNode); + int specId = JsonUtil.getInt(SPEC_ID, jsonNode); + + ManifestContent manifestContent = null; + if (jsonNode.has(CONTENT)) { + manifestContent = ManifestContent.fromId(JsonUtil.getInt(CONTENT, jsonNode)); + } + + long sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, jsonNode); + long minSequenceNumber = JsonUtil.getLong(MIN_SEQUENCE_NUMBER, jsonNode); + + Long addedSnapshotId = null; + if (jsonNode.has(ADDED_SNAPSHOT_ID)) { + addedSnapshotId = JsonUtil.getLong(ADDED_SNAPSHOT_ID, jsonNode); + } + + Integer addedFilesCount = null; + if (jsonNode.has(ADDED_FILES_COUNT)) { + addedFilesCount = JsonUtil.getInt(ADDED_FILES_COUNT, jsonNode); + } + + Integer existingFilesCount = null; + if (jsonNode.has(EXISTING_FILES_COUNT)) { + existingFilesCount = JsonUtil.getInt(EXISTING_FILES_COUNT, jsonNode); + } + + Integer deletedFilesCount = null; + if (jsonNode.has(DELETED_FILES_COUNT)) { + deletedFilesCount = JsonUtil.getInt(DELETED_FILES_COUNT, jsonNode); + } + + Long addedRowsCount = null; + if (jsonNode.has(ADDED_ROWS_COUNT)) { + addedRowsCount = JsonUtil.getLong(ADDED_ROWS_COUNT, jsonNode); + } + + Long existingRowsCount = null; + if (jsonNode.has(EXISTING_ROWS_COUNT)) { + existingRowsCount = JsonUtil.getLong(EXISTING_ROWS_COUNT, jsonNode); + } + + Long deletedRowsCount = null; + if (jsonNode.has(DELETED_ROWS_COUNT)) { + deletedRowsCount = JsonUtil.getLong(DELETED_ROWS_COUNT, jsonNode); + } + + List partitionFieldSummaries = null; + if (jsonNode.has(PARTITION_FIELD_SUMMARY)) { + JsonNode summaryArray = jsonNode.get(PARTITION_FIELD_SUMMARY); + Preconditions.checkArgument( + summaryArray.isArray(), + "Invalid JSON node for partition field summaries: non-array (%s)", + summaryArray); + + ImmutableList.Builder builder = ImmutableList.builder(); + for (JsonNode summaryNode : summaryArray) { + ManifestFile.PartitionFieldSummary summary = + PartitionFieldSummaryParser.fromJson(summaryNode); + builder.add(summary); + } + + partitionFieldSummaries = builder.build(); + } + + ByteBuffer keyMetadata = null; + if (jsonNode.has(KEY_METADATA)) { + String hexStr = JsonUtil.getString(KEY_METADATA, jsonNode); + keyMetadata = ByteBuffer.wrap(BaseEncoding.base16().decode(hexStr)); + } + + return new GenericManifestFile( + path, + length, + specId, + manifestContent, + sequenceNumber, + minSequenceNumber, + addedSnapshotId, + partitionFieldSummaries, + keyMetadata, + addedFilesCount, + addedRowsCount, + existingFilesCount, + existingRowsCount, + deletedFilesCount, + deletedRowsCount); + } + + private static class PartitionFieldSummaryParser { + private static final String CONTAINS_NULL = "contains-null"; + private static final String CONTAINS_NAN = "contains-nan"; + private static final String LOWER_BOUND = "lower-bound"; + private static final String UPPER_BOUND = "upper-bound"; + + private PartitionFieldSummaryParser() {} + + static void toJson(ManifestFile.PartitionFieldSummary summary, JsonGenerator generator) + throws IOException { + Preconditions.checkArgument(summary != null, "Invalid partition field summary: null"); + Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); + + generator.writeStartObject(); + + generator.writeBooleanField(CONTAINS_NULL, summary.containsNull()); + + if (summary.containsNaN() != null) { + generator.writeBooleanField(CONTAINS_NAN, summary.containsNaN()); + } + + if (summary.lowerBound() != null) { + generator.writeStringField( + LOWER_BOUND, + BaseEncoding.base16().encode(ByteBuffers.toByteArray(summary.lowerBound()))); + } + + if (summary.upperBound() != null) { + generator.writeStringField( + UPPER_BOUND, + BaseEncoding.base16().encode(ByteBuffers.toByteArray(summary.upperBound()))); + } + + generator.writeEndObject(); + } + + static ManifestFile.PartitionFieldSummary fromJson(JsonNode jsonNode) { + Preconditions.checkArgument( + jsonNode != null, "Invalid JSON node for partition field summary: null"); + Preconditions.checkArgument( + jsonNode.isObject(), + "Invalid JSON node for partition field summary: non-object (%s)", + jsonNode); + + boolean containsNull = JsonUtil.getBool(CONTAINS_NULL, jsonNode); + Boolean containsNaN = null; + if (jsonNode.has(CONTAINS_NAN)) { + containsNaN = JsonUtil.getBool(CONTAINS_NAN, jsonNode); + } + + ByteBuffer lowerBound = null; + if (jsonNode.has(LOWER_BOUND)) { + String hexStr = JsonUtil.getString(LOWER_BOUND, jsonNode); + lowerBound = ByteBuffer.wrap(BaseEncoding.base16().decode(hexStr)); + } + + ByteBuffer upperBound = null; + if (jsonNode.has(UPPER_BOUND)) { + String hexStr = JsonUtil.getString(UPPER_BOUND, jsonNode); + upperBound = ByteBuffer.wrap(BaseEncoding.base16().decode(hexStr)); + } + + if (containsNaN != null) { + return new GenericPartitionFieldSummary(containsNull, containsNaN, lowerBound, upperBound); + } else { + return new GenericPartitionFieldSummary(containsNull, lowerBound, upperBound); + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/ScanTaskParser.java b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java index 9447d0668a1f..67e44cea7d07 100644 --- a/core/src/main/java/org/apache/iceberg/ScanTaskParser.java +++ b/core/src/main/java/org/apache/iceberg/ScanTaskParser.java @@ -30,7 +30,10 @@ public class ScanTaskParser { private enum TaskType { FILE_SCAN_TASK("file-scan-task"), - DATA_TASK("data-task"); + DATA_TASK("data-task"), + FILES_TABLE_TASK("files-table-task"), + ALL_MANIFESTS_TABLE_TASK("all-manifests-table-task"), + MANIFEST_ENTRIES_TABLE_TASK("manifest-entries-task"); private final String value; @@ -45,6 +48,12 @@ public static TaskType fromTypeName(String value) { return FILE_SCAN_TASK; } else if (DATA_TASK.typeName().equalsIgnoreCase(value)) { return DATA_TASK; + } else if (FILES_TABLE_TASK.typeName().equalsIgnoreCase(value)) { + return FILES_TABLE_TASK; + } else if (ALL_MANIFESTS_TABLE_TASK.typeName().equalsIgnoreCase(value)) { + return ALL_MANIFESTS_TABLE_TASK; + } else if (MANIFEST_ENTRIES_TABLE_TASK.typeName().equalsIgnoreCase(value)) { + return MANIFEST_ENTRIES_TABLE_TASK; } else { throw new IllegalArgumentException("Unknown task type: " + value); } @@ -74,6 +83,17 @@ private static void toJson(FileScanTask fileScanTask, JsonGenerator generator) if (fileScanTask instanceof StaticDataTask) { generator.writeStringField(TASK_TYPE, TaskType.DATA_TASK.typeName()); DataTaskParser.toJson((StaticDataTask) fileScanTask, generator); + } else if (fileScanTask instanceof BaseFilesTable.ManifestReadTask) { + generator.writeStringField(TASK_TYPE, TaskType.FILES_TABLE_TASK.typeName()); + FilesTableTaskParser.toJson((BaseFilesTable.ManifestReadTask) fileScanTask, generator); + } else if (fileScanTask instanceof AllManifestsTable.ManifestListReadTask) { + generator.writeStringField(TASK_TYPE, TaskType.ALL_MANIFESTS_TABLE_TASK.typeName()); + AllManifestsTableTaskParser.toJson( + (AllManifestsTable.ManifestListReadTask) fileScanTask, generator); + } else if (fileScanTask instanceof BaseEntriesTable.ManifestReadTask) { + generator.writeStringField(TASK_TYPE, TaskType.MANIFEST_ENTRIES_TABLE_TASK.typeName()); + ManifestEntriesTableTaskParser.toJson( + (BaseEntriesTable.ManifestReadTask) fileScanTask, generator); } else if (fileScanTask instanceof BaseFileScanTask || fileScanTask instanceof BaseFileScanTask.SplitScanTask) { generator.writeStringField(TASK_TYPE, TaskType.FILE_SCAN_TASK.typeName()); @@ -98,6 +118,12 @@ private static FileScanTask fromJson(JsonNode jsonNode, boolean caseSensitive) { return FileScanTaskParser.fromJson(jsonNode, caseSensitive); case DATA_TASK: return DataTaskParser.fromJson(jsonNode); + case FILES_TABLE_TASK: + return FilesTableTaskParser.fromJson(jsonNode); + case ALL_MANIFESTS_TABLE_TASK: + return AllManifestsTableTaskParser.fromJson(jsonNode); + case MANIFEST_ENTRIES_TABLE_TASK: + return ManifestEntriesTableTaskParser.fromJson(jsonNode); default: throw new UnsupportedOperationException("Unsupported task type: " + taskType.typeName()); } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index bd1c8a1a0371..aff047fe8234 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -43,6 +43,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PartitionUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SerializableSupplier; @@ -331,7 +332,7 @@ public String toString() { this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber); this.schemasById = indexSchemas(); - this.specsById = indexSpecs(specs); + this.specsById = PartitionUtil.indexSpecs(specs); this.sortOrdersById = indexSortOrders(sortOrders); this.refs = validateRefs(currentSnapshotId, refs, snapshotsById); this.statisticsFiles = ImmutableList.copyOf(statisticsFiles); @@ -806,14 +807,6 @@ private Map indexSchemas() { return builder.build(); } - private static Map indexSpecs(List specs) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (PartitionSpec spec : specs) { - builder.put(spec.specId(), spec); - } - return builder.build(); - } - private static Map indexSortOrders(List sortOrders) { ImmutableMap.Builder builder = ImmutableMap.builder(); for (SortOrder sortOrder : sortOrders) { @@ -1050,7 +1043,7 @@ public Builder setCurrentSchema(int schemaId) { this.specs = Lists.newArrayList(Iterables.transform(specs, spec -> updateSpecSchema(schema, spec))); specsById.clear(); - specsById.putAll(indexSpecs(specs)); + specsById.putAll(PartitionUtil.indexSpecs(specs)); this.sortOrders = Lists.newArrayList( diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java index 7aaa2b6a75b1..2b99e17e41dc 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java @@ -37,6 +37,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; @@ -71,6 +72,13 @@ public HadoopFileIO(Configuration hadoopConf) { public HadoopFileIO(SerializableSupplier hadoopConf) { this.hadoopConf = hadoopConf; + Map props = Maps.newHashMapWithExpectedSize(hadoopConf.get().size()); + Iterator> iter = hadoopConf.get().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + props.put(entry.getKey(), entry.getValue()); + } + this.properties = SerializableMap.copyOf(props); } public Configuration conf() { @@ -79,7 +87,17 @@ public Configuration conf() { @Override public void initialize(Map props) { - this.properties = SerializableMap.copyOf(props); + Map mergedMap = + Maps.newHashMapWithExpectedSize(props.size() + properties.size()); + mergedMap.putAll(properties.immutableMap()); + mergedMap.putAll(props); + this.properties = SerializableMap.copyOf(mergedMap); + + if (hadoopConf == null) { + this.hadoopConf = new SerializableConfiguration(new Configuration())::get; + } + + props.forEach(hadoopConf.get()::set); } @Override diff --git a/core/src/main/java/org/apache/iceberg/io/FileIOParser.java b/core/src/main/java/org/apache/iceberg/io/FileIOParser.java index 90ace239c2b7..2743b7b616cd 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileIOParser.java +++ b/core/src/main/java/org/apache/iceberg/io/FileIOParser.java @@ -40,7 +40,7 @@ public static String toJson(FileIO io, boolean pretty) { return JsonUtil.generate(gen -> toJson(io, gen), pretty); } - private static void toJson(FileIO io, JsonGenerator generator) throws IOException { + public static void toJson(FileIO io, JsonGenerator generator) throws IOException { String impl = io.getClass().getName(); Map properties; try { @@ -72,7 +72,7 @@ public static FileIO fromJson(String json, Object conf) { return JsonUtil.parse(json, node -> fromJson(node, conf)); } - private static FileIO fromJson(JsonNode json, Object conf) { + public static FileIO fromJson(JsonNode json, Object conf) { Preconditions.checkArgument(json.isObject(), "Cannot parse FileIO from non-object: %s", json); String impl = JsonUtil.getString(FILE_IO_IMPL, json); Map properties = JsonUtil.getStringMap(PROPERTIES, json); diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index 4a5f0c792baa..83735939317b 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -26,6 +26,7 @@ import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -98,4 +99,13 @@ public static StructLike coercePartition( projection.wrap(partition); return projection; } + + public static Map indexSpecs(List specs) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (PartitionSpec spec : specs) { + builder.put(spec.specId(), spec); + } + + return builder.build(); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestAllManifestsTableTaskParser.java b/core/src/test/java/org/apache/iceberg/TestAllManifestsTableTaskParser.java new file mode 100644 index 000000000000..be2e71e677f5 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestAllManifestsTableTaskParser.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Map; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.PartitionUtil; +import org.junit.jupiter.api.Test; + +public class TestAllManifestsTableTaskParser { + @Test + public void testParser() { + AllManifestsTable.ManifestListReadTask task = createTask(); + String jsonStr = ScanTaskParser.toJson(task); + assertThat(jsonStr).isEqualTo(taskJson()); + AllManifestsTable.ManifestListReadTask deserializedTask = + (AllManifestsTable.ManifestListReadTask) ScanTaskParser.fromJson(jsonStr, false); + assertTaskEquals(task, deserializedTask); + } + + static AllManifestsTable.ManifestListReadTask createTask() { + Schema dataTableSchema = TestBase.SCHEMA; + HadoopFileIO fileIO = new HadoopFileIO(); + fileIO.initialize(ImmutableMap.of("k1", "v1", "k2", "v2")); + Map specsById = + PartitionUtil.indexSpecs( + Arrays.asList(PartitionSpec.builderFor(dataTableSchema).bucket("data", 16).build())); + + return new AllManifestsTable.ManifestListReadTask( + dataTableSchema, + fileIO, + AllManifestsTable.MANIFEST_FILE_SCHEMA, + specsById, + "/path/manifest-list-file.avro", + Expressions.equal("id", 1), + 1L); + } + + private String taskJson() { + return "{\"task-type\":\"all-manifests-table-task\"," + + "\"data-table-schema\":{\"type\":\"struct\",\"schema-id\":0," + + "\"fields\":[{\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"}," + + "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]}," + + "\"file-io\":{\"io-impl\":\"org.apache.iceberg.hadoop.HadoopFileIO\"," + + "\"properties\":{\"k1\":\"v1\",\"k2\":\"v2\"}}," + + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{" + + "\"id\":14,\"name\":\"content\",\"required\":true,\"type\":\"int\"}," + + "{\"id\":1,\"name\":\"path\",\"required\":true,\"type\":\"string\"}," + + "{\"id\":2,\"name\":\"length\",\"required\":true,\"type\":\"long\"}," + + "{\"id\":3,\"name\":\"partition_spec_id\",\"required\":false,\"type\":\"int\"}," + + "{\"id\":4,\"name\":\"added_snapshot_id\",\"required\":false,\"type\":\"long\"}," + + "{\"id\":5,\"name\":\"added_data_files_count\",\"required\":false,\"type\":\"int\"}," + + "{\"id\":6,\"name\":\"existing_data_files_count\",\"required\":false,\"type\":\"int\"}," + + "{\"id\":7,\"name\":\"deleted_data_files_count\",\"required\":false,\"type\":\"int\"}," + + "{\"id\":15,\"name\":\"added_delete_files_count\",\"required\":true,\"type\":\"int\"}," + + "{\"id\":16,\"name\":\"existing_delete_files_count\",\"required\":true,\"type\":\"int\"}," + + "{\"id\":17,\"name\":\"deleted_delete_files_count\",\"required\":true,\"type\":\"int\"}," + + "{\"id\":8,\"name\":\"partition_summaries\",\"required\":false,\"type\":" + + "{\"type\":\"list\",\"element-id\":9,\"element\":{\"type\":\"struct\",\"fields\":[{" + + "\"id\":10,\"name\":\"contains_null\",\"required\":true,\"type\":\"boolean\"}," + + "{\"id\":11,\"name\":\"contains_nan\",\"required\":true,\"type\":\"boolean\"}," + + "{\"id\":12,\"name\":\"lower_bound\",\"required\":false,\"type\":\"string\"}," + + "{\"id\":13,\"name\":\"upper_bound\",\"required\":false,\"type\":\"string\"}]},\"element-required\":true}}," + + "{\"id\":18,\"name\":\"reference_snapshot_id\",\"required\":true,\"type\":\"long\"}]}," + + "\"partition-specs\":[{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\"," + + "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]}]," + + "\"manifest-list-Location\":\"/path/manifest-list-file.avro\"," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}," + + "\"reference-snapshot-id\":1}"; + } + + private void assertTaskEquals( + AllManifestsTable.ManifestListReadTask expected, + AllManifestsTable.ManifestListReadTask actual) { + + HadoopFileIO expectedIO = (HadoopFileIO) expected.io(); + HadoopFileIO actualIO = (HadoopFileIO) expected.io(); + assertThat(actualIO.properties()).isEqualTo(expectedIO.properties()); + + assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue(); + + assertThat(actual.specsById()).isEqualTo(expected.specsById()); + assertThat(actual.manifestListLocation()).isEqualTo(expected.manifestListLocation()); + assertThat(actual.residual().toString()).isEqualTo(expected.residual().toString()); + assertThat(actual.referenceSnapshotId()).isEqualTo(expected.referenceSnapshotId()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java b/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java new file mode 100644 index 000000000000..24a027fa1816 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestFilesTableTaskParser.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Map; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.PartitionUtil; +import org.junit.jupiter.api.Test; + +public class TestFilesTableTaskParser { + @Test + public void testParser() { + BaseFilesTable.ManifestReadTask task = createTask(); + String jsonStr = ScanTaskParser.toJson(task); + assertThat(jsonStr).isEqualTo(taskJson()); + BaseFilesTable.ManifestReadTask deserializedTask = + (BaseFilesTable.ManifestReadTask) ScanTaskParser.fromJson(jsonStr, false); + assertTaskEquals(task, deserializedTask); + } + + static BaseFilesTable.ManifestReadTask createTask() { + Schema schema = TestBase.SCHEMA; + HadoopFileIO fileIO = new HadoopFileIO(); + fileIO.initialize(ImmutableMap.of("k1", "v1", "k2", "v2")); + Map specsById = + PartitionUtil.indexSpecs( + Arrays.asList(PartitionSpec.builderFor(schema).bucket("data", 16).build())); + ManifestFile manifestFile = TestManifestFileParser.createManifestFile(); + return new BaseFilesTable.ManifestReadTask( + schema, fileIO, specsById, manifestFile, schema, Expressions.equal("id", 1)); + } + + private String taskJson() { + return "{\"task-type\":\"files-table-task\"," + + "\"schema\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{" + + "\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"}," + + "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]}," + + "\"projection\":{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{" + + "\"id\":3,\"name\":\"id\",\"required\":true,\"type\":\"int\"}," + + "{\"id\":4,\"name\":\"data\",\"required\":true,\"type\":\"string\"}]}," + + "\"file-io\":{\"io-impl\":\"org.apache.iceberg.hadoop.HadoopFileIO\"," + + "\"properties\":{\"k1\":\"v1\",\"k2\":\"v2\"}}," + + "\"partition-specs\":[{\"spec-id\":0,\"fields\":[{" + + "\"name\":\"data_bucket\",\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]}]," + + "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1}," + + "\"manifest-file\":{\"path\":\"/path/input.m0.avro\"," + + "\"length\":5878,\"partition-spec-id\":0,\"content\":0,\"sequence-number\":1,\"min-sequence-number\":2," + + "\"added-snapshot-id\":12345678901234567," + + "\"added-files-count\":1,\"existing-files-count\":3,\"deleted-files-count\":0," + + "\"added-rows-count\":10,\"existing-rows-count\":30,\"deleted-rows-count\":0," + + "\"partition-field-summary\":[{\"contains-null\":true,\"contains-nan\":false," + + "\"lower-bound\":\"0A000000\",\"upper-bound\":\"64000000\"}],\"key-metadata\":\"DB030000\"}}"; + } + + private void assertTaskEquals( + BaseFilesTable.ManifestReadTask expected, BaseFilesTable.ManifestReadTask actual) { + assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue(); + + assertThat(expected.projection().sameSchema(actual.projection())) + .as("Projected schema should match") + .isTrue(); + + HadoopFileIO expectedIO = (HadoopFileIO) expected.io(); + HadoopFileIO actualIO = (HadoopFileIO) expected.io(); + assertThat(actualIO.properties()).isEqualTo(expectedIO.properties()); + + assertThat(actual.specsById()).isEqualTo(expected.specsById()); + assertThat(actual.residual().toString()).isEqualTo(expected.residual().toString()); + assertThat(actual.manifest()).isEqualTo(expected.manifest()); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java b/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java new file mode 100644 index 000000000000..302f0e559ae5 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.JsonUtil; +import org.junit.jupiter.api.TestTemplate; + +public class TestManifestFileParser extends TestBase { + + @TestTemplate + public void testParser() throws Exception { + ManifestFile manifest = createManifestFile(); + String jsonStr = JsonUtil.generate(gen -> ManifestFileParser.toJson(manifest, gen), false); + assertThat(jsonStr).isEqualTo(manifestFileJson()); + } + + static ManifestFile createManifestFile() { + ByteBuffer lowerBound = Conversions.toByteBuffer(Types.IntegerType.get(), 10); + ByteBuffer upperBound = Conversions.toByteBuffer(Types.IntegerType.get(), 100); + List partitionFieldSummaries = + Arrays.asList(new GenericPartitionFieldSummary(true, false, lowerBound, upperBound)); + ByteBuffer keyMetadata = Conversions.toByteBuffer(Types.IntegerType.get(), 987); + + return new GenericManifestFile( + "/path/input.m0.avro", + 5878L, + 0, + ManifestContent.DATA, + 1L, + 2L, + 12345678901234567L, + 1, + 10L, + 3, + 30L, + 0, + 0L, + partitionFieldSummaries, + keyMetadata); + } + + private String manifestFileJson() { + return "{\"path\":\"/path/input.m0.avro\"," + + "\"length\":5878,\"partition-spec-id\":0,\"content\":0,\"sequence-number\":1,\"min-sequence-number\":2," + + "\"added-snapshot-id\":12345678901234567,\"added-files-count\":1,\"existing-files-count\":3,\"deleted-files-count\":0," + + "\"added-rows-count\":10,\"existing-rows-count\":30,\"deleted-rows-count\":0," + + "\"partition-field-summary\":[{\"contains-null\":true,\"contains-nan\":false," + + "\"lower-bound\":\"0A000000\",\"upper-bound\":\"64000000\"}]," + + "\"key-metadata\":\"DB030000\"}"; + } +}