Skip to content

Commit

Permalink
Core: add JSON serialization for BaseFilesTable.ManifestReadTask, All…
Browse files Browse the repository at this point in the history
…ManifestsTable.ManifestListReadTask, and BaseEntriesTable.ManifestReadTask
  • Loading branch information
stevenzwu committed Aug 2, 2024
1 parent af75440 commit bc32130
Show file tree
Hide file tree
Showing 17 changed files with 1,063 additions and 49 deletions.
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,10 @@ acceptedBreaks:
justification: "Static utility class - should not have public constructor"
"1.4.0":
org.apache.iceberg:iceberg-core:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.GenericManifestFile"
new: "class org.apache.iceberg.GenericManifestFile"
justification: "Serialization across versions is not supported"
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.PartitionData"
new: "class org.apache.iceberg.PartitionData"
Expand Down
41 changes: 39 additions & 2 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -52,7 +53,8 @@ public class AllManifestsTable extends BaseMetadataTable {
public static final Types.NestedField REF_SNAPSHOT_ID =
Types.NestedField.required(18, "reference_snapshot_id", Types.LongType.get());

private static final Schema MANIFEST_FILE_SCHEMA =
@VisibleForTesting
static final Schema MANIFEST_FILE_SCHEMA =
new Schema(
Types.NestedField.required(14, "content", Types.IntegerType.get()),
Types.NestedField.required(1, "path", Types.StringType.get()),
Expand Down Expand Up @@ -119,6 +121,7 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
protected CloseableIterable<FileScanTask> doPlanFiles() {
FileIO io = table().io();
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());
Schema dataTableSchema = table().schema();
Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();

SnapshotEvaluator snapshotEvaluator =
Expand All @@ -132,7 +135,13 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
snap -> {
if (snap.manifestListLocation() != null) {
return new ManifestListReadTask(
io, schema(), specs, snap.manifestListLocation(), filter, snap.snapshotId());
dataTableSchema,
io,
schema(),
specs,
snap.manifestListLocation(),
filter,
snap.snapshotId());
} else {
return StaticDataTask.of(
io.newInputFile(
Expand All @@ -149,6 +158,7 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
}

static class ManifestListReadTask implements DataTask {
private final Schema dataTableSchema;
private final FileIO io;
private final Schema schema;
private final Map<Integer, PartitionSpec> specs;
Expand All @@ -158,12 +168,14 @@ static class ManifestListReadTask implements DataTask {
private DataFile lazyDataFile = null;

ManifestListReadTask(
Schema dataTableSchema,
FileIO io,
Schema schema,
Map<Integer, PartitionSpec> specs,
String manifestListLocation,
Expression residual,
long referenceSnapshotId) {
this.dataTableSchema = dataTableSchema;
this.io = io;
this.schema = schema;
this.specs = specs;
Expand Down Expand Up @@ -244,6 +256,31 @@ public Expression residual() {
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}

@Override
public Schema schema() {
return schema;
}

Schema dataTableSchema() {
return dataTableSchema;
}

FileIO io() {
return io;
}

Map<Integer, PartitionSpec> specsById() {
return specs;
}

String manifestListLocation() {
return manifestListLocation;
}

long referenceSnapshotId() {
return referenceSnapshotId;
}
}

static StaticDataTask.Row manifestFileToRow(
Expand Down
107 changes: 107 additions & 0 deletions core/src/main/java/org/apache/iceberg/AllManifestsTableTaskParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionParser;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.util.PartitionUtil;

class AllManifestsTableTaskParser {
private static final String DATA_TABLE_SCHEMA = "data-table-schema";
private static final String FILE_IO = "file-io";
private static final String SCHEMA = "schema";
private static final String SPECS = "partition-specs";
private static final String MANIFEST_LIST_LOCATION = "manifest-list-Location";
private static final String RESIDUAL = "residual-filter";
private static final String REFERENCE_SNAPSHOT_ID = "reference-snapshot-id";

private AllManifestsTableTaskParser() {}

static void toJson(AllManifestsTable.ManifestListReadTask task, JsonGenerator generator)
throws IOException {
Preconditions.checkArgument(task != null, "Invalid manifest task: null");
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");

generator.writeFieldName(DATA_TABLE_SCHEMA);
SchemaParser.toJson(task.dataTableSchema(), generator);

generator.writeFieldName(FILE_IO);
FileIOParser.toJson(task.io(), generator);

generator.writeFieldName(SCHEMA);
SchemaParser.toJson(task.schema(), generator);

generator.writeArrayFieldStart(SPECS);
for (PartitionSpec spec : task.specsById().values()) {
PartitionSpecParser.toJson(spec, generator);
}

generator.writeEndArray();

generator.writeStringField(MANIFEST_LIST_LOCATION, task.manifestListLocation());

generator.writeFieldName(RESIDUAL);
ExpressionParser.toJson(task.residual(), generator);

generator.writeNumberField(REFERENCE_SNAPSHOT_ID, task.referenceSnapshotId());
}

static AllManifestsTable.ManifestListReadTask fromJson(JsonNode jsonNode) {
Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest task: null");
Preconditions.checkArgument(
jsonNode.isObject(), "Invalid JSON node for manifest task: non-object (%s)", jsonNode);

Schema dataTableSchema = SchemaParser.fromJson(JsonUtil.get(DATA_TABLE_SCHEMA, jsonNode));
FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null);
Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode));

JsonNode specsArray = jsonNode.get(SPECS);
Preconditions.checkArgument(
specsArray.isArray(), "Invalid JSON node for partition specs: non-array (%s)", specsArray);

ImmutableList.Builder<PartitionSpec> specsBuilder = ImmutableList.builder();
for (JsonNode specNode : specsArray) {
PartitionSpec spec = PartitionSpecParser.fromJson(dataTableSchema, specNode);
specsBuilder.add(spec);
}

Map<Integer, PartitionSpec> specsById = PartitionUtil.indexSpecs(specsBuilder.build());
String manifestListLocation = JsonUtil.getString(MANIFEST_LIST_LOCATION, jsonNode);
Expression residualFilter = ExpressionParser.fromJson(JsonUtil.get(RESIDUAL, jsonNode));
long referenceSnapshotId = JsonUtil.getLong(REFERENCE_SNAPSHOT_ID, jsonNode);

return new AllManifestsTable.ManifestListReadTask(
dataTableSchema,
fileIO,
schema,
specsById,
manifestListLocation,
residualFilter,
referenceSnapshotId);
}
}
51 changes: 34 additions & 17 deletions core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -92,15 +91,9 @@ static CloseableIterable<FileScanTask> planFiles(
evalCache.get(manifest.partitionSpecId()).eval(manifest)
&& manifestContentEvaluator.eval(manifest));

String schemaString = SchemaParser.toJson(projectedSchema);
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(
filteredManifests,
manifest ->
new ManifestReadTask(
table, manifest, projectedSchema, schemaString, specString, residuals));
manifest -> new ManifestReadTask(table, manifest, projectedSchema, filter));
}

/**
Expand Down Expand Up @@ -283,19 +276,28 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final ManifestFile manifest;
private final Map<Integer, PartitionSpec> specsById;

ManifestReadTask(Table table, ManifestFile manifest, Schema projection, Expression filter) {
this(table.schema(), table.io(), table.specs(), manifest, projection, filter);
}

ManifestReadTask(
Table table,
Schema dataTableSchema,
FileIO io,
Map<Integer, PartitionSpec> specsById,
ManifestFile manifest,
Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
Expression filter) {
super(
DataFiles.fromManifest(manifest),
null,
SchemaParser.toJson(projection),
PartitionSpecParser.toJson(PartitionSpec.unpartitioned()),
ResidualEvaluator.unpartitioned(filter));
this.projection = projection;
this.io = table.io();
this.io = io;
this.manifest = manifest;
this.specsById = Maps.newHashMap(table.specs());
this.dataTableSchema = table.schema();
this.specsById = Maps.newHashMap(specsById);
this.dataTableSchema = dataTableSchema;

Type fileProjectionType = projection.findType("data_file");
this.fileProjection =
Expand All @@ -311,7 +313,6 @@ public long estimatedRowsCount() {
+ (long) manifest.existingFilesCount();
}

@VisibleForTesting
ManifestFile manifest() {
return manifest;
}
Expand Down Expand Up @@ -403,5 +404,21 @@ private MetricsUtil.ReadableMetricsStruct readableMetrics(
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}

FileIO io() {
return io;
}

Map<Integer, PartitionSpec> specsById() {
return specsById;
}

Schema dataTableSchema() {
return dataTableSchema;
}

Schema projection() {
return projection;
}
}
}
50 changes: 34 additions & 16 deletions core/src/main/java/org/apache/iceberg/BaseFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -80,16 +79,11 @@ private static CloseableIterable<FileScanTask> planFiles(
CloseableIterable.filter(
manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));

String schemaString = SchemaParser.toJson(projectedSchema);
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(
filteredManifests,
manifest ->
new ManifestReadTask(
table, manifest, projectedSchema, schemaString, specString, residuals));
manifest -> new ManifestReadTask(table, manifest, projectedSchema, filter));
}

abstract static class BaseFilesTableScan extends BaseMetadataTableScan {
Expand Down Expand Up @@ -140,18 +134,27 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Schema dataTableSchema;
private final Schema projection;

ManifestReadTask(Table table, ManifestFile manifest, Schema projection, Expression filter) {
this(table.schema(), table.io(), table.specs(), manifest, projection, filter);
}

ManifestReadTask(
Table table,
Schema dataTableSchema,
FileIO io,
Map<Integer, PartitionSpec> specsById,
ManifestFile manifest,
Schema projection,
String schemaString,
String specString,
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.io = table.io();
this.specsById = Maps.newHashMap(table.specs());
Expression filter) {
super(
DataFiles.fromManifest(manifest),
null,
SchemaParser.toJson(projection),
PartitionSpecParser.toJson(PartitionSpec.unpartitioned()),
ResidualEvaluator.unpartitioned(filter));
this.io = io;
this.specsById = Maps.newHashMap(specsById);
this.manifest = manifest;
this.dataTableSchema = table.schema();
this.dataTableSchema = dataTableSchema;
this.projection = projection;
}

Expand Down Expand Up @@ -240,9 +243,24 @@ public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}

@VisibleForTesting
FileIO io() {
return io;
}

Map<Integer, PartitionSpec> specsById() {
return specsById;
}

ManifestFile manifest() {
return manifest;
}

Schema dataTableSchema() {
return dataTableSchema;
}

Schema projection() {
return projection;
}
}
}
Loading

0 comments on commit bc32130

Please sign in to comment.