-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: add JSON serialization for BaseFilesTable.ManifestReadTask, AllManifestsTable.ManifestListReadTask, and BaseEntriesTable.ManifestReadTask #10735
Conversation
@@ -158,12 +168,14 @@ static class ManifestListReadTask implements DataTask { | |||
private DataFile lazyDataFile = null; | |||
|
|||
ManifestListReadTask( | |||
Schema dataTableSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs the table schema to be able to parse partition spec.
the other arg schema
is for the MANIFEST_FILE_SCHEMA
.
@@ -304,11 +306,6 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { | |||
: new Schema(); | |||
} | |||
|
|||
@VisibleForTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not just for testing anymore. JSON parser needs this getter too
ManifestReadTask( | ||
Table table, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did a bit refactoring. this constructor can be used by the JSON parser. the other constructor above is used for the previous purpose in the planFiles
method above.
@@ -105,6 +105,42 @@ public GenericManifestFile(Schema avroSchema) { | |||
this.keyMetadata = null; | |||
} | |||
|
|||
/** Adjust the arg order to avoid conflict with the public constructor below */ | |||
GenericManifestFile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used by ManifestFileParser
introduced in this PR
DATA_TASK("data-task"), | ||
FILES_TABLE_TASK("files-table-task"), | ||
ALL_MANIFESTS_TABLE_TASK("all-manifests-table-task"), | ||
MANIFEST_ENTRIES_TABLE_TASK("manifest-entries-task"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the BaseEntriesTable
task, which is the base class for ManifestEntriesTable
and AllEntriesTable
. Don't want to call it base-entries-task
as it wouldn't be clear what kind of task.
@@ -71,6 +72,13 @@ public HadoopFileIO(Configuration hadoopConf) { | |||
|
|||
public HadoopFileIO(SerializableSupplier<Configuration> hadoopConf) { | |||
this.hadoopConf = hadoopConf; | |||
Map<String, String> props = Maps.newHashMapWithExpectedSize(hadoopConf.get().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These bugs were discovered during serialization test (don't remember in core module or flink module)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we extract this into a separate PR maybe (with separate tests)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will revert the change in this PR
.palantir/revapi.yml
Outdated
@@ -874,6 +874,10 @@ acceptedBreaks: | |||
justification: "Static utility class - should not have public constructor" | |||
"1.4.0": | |||
org.apache.iceberg:iceberg-core: | |||
- code: "java.class.defaultSerializationChanged" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the wrong version isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great catch. the initial change and grade revapiAcceptBreak were run a few months ago. will need to re-run it maybe after 1.16 release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu can you please rebase the PR and make sure to run the rev api task that will automatically add this to the correct version in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the diff in the file is still messed up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. I first copied the revapi.yml
file from latest main branch. then applied the revapiAcceptBreak
. There was a couple additional diffs from the revapi action.
➜ iceberg git:(issue-9597-manifest-task) ✗ ./gradlew :iceberg-core:revapiAcceptBreak --justification "Serialization across versions is not supported" \
--code "java.class.defaultSerializationChanged" \
--old "class org.apache.iceberg.GenericManifestFile" \
--new "class org.apache.iceberg.GenericManifestFile"
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null"); | ||
|
||
generator.writeFieldName(DATA_TABLE_SCHEMA); | ||
SchemaParser.toJson(task.dataTableSchema(), generator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding the (base) table schema here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the dataTableSchema
to the AllManifestsTable.ManifestListReadTask
because we need it to parse partition spec. See line 89 below.
PartitionSpec spec = PartitionSpecParser.fromJson(dataTableSchema, specNode);
also mentioned in https://github.com/apache/iceberg/pull/10735/files#r1685644366
dfaa439
to
bc32130
Compare
|
||
static void toJson(AllManifestsTable.ManifestListReadTask task, JsonGenerator generator) | ||
throws IOException { | ||
Preconditions.checkArgument(task != null, "Invalid manifest task: null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add unit tests to cover the scenarios where the task/generator is null
} | ||
|
||
static AllManifestsTable.ManifestListReadTask fromJson(JsonNode jsonNode) { | ||
Preconditions.checkArgument(jsonNode != null, "Invalid JSON node for manifest task: null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please also add unit tests to cover these here (same as for toJson). This should also be done for all the other JSON parsers
FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null); | ||
Schema schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, jsonNode)); | ||
|
||
JsonNode specsArray = jsonNode.get(SPECS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to use JsonUtil.get(SPECS, jsonNode)
here as technically the field could be missing and JsonUtil
will provide good error messaging in such a case.
Please also update this in the other parsers
import org.apache.iceberg.util.PartitionUtil; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class TestAllManifestsTableTaskParser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of the tests across this PR only verify the happy path, but not if e.g. fields are missing or passed parameters are null. It would be good to add such tests, similar to the tests we have for other parsers
@@ -283,19 +276,28 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { | |||
private final ManifestFile manifest; | |||
private final Map<Integer, PartitionSpec> specsById; | |||
|
|||
ManifestReadTask(Table table, ManifestFile manifest, Schema projection, Expression filter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be private maybe? Since it isn't used anywhere outside this class
@@ -140,18 +134,27 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { | |||
private final Schema dataTableSchema; | |||
private final Schema projection; | |||
|
|||
ManifestReadTask(Table table, ManifestFile manifest, Schema projection, Expression filter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ManifestReadTask(Table table, ManifestFile manifest, Schema projection, Expression filter) { | |
private ManifestReadTask(Table table, ManifestFile manifest, Schema projection, Expression filter) { |
|
||
FileIO fileIO = FileIOParser.fromJson(JsonUtil.get(FILE_IO, jsonNode), null); | ||
|
||
JsonNode specsArray = jsonNode.get(SPECS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should also be using JsonUtil.get
(same as I mentioned in the other Parser)
this.deletedRowsCount = deletedRowsCount; | ||
this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); | ||
this.fromProjectionPos = null; | ||
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could keyMetadata
be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteBuffers
handles the null value properly.
public class ByteBuffers {
public static byte[] toByteArray(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
|
||
ByteBuffer keyMetadata = null; | ||
if (jsonNode.has(KEY_METADATA)) { | ||
String hexStr = JsonUtil.getString(KEY_METADATA, jsonNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use JsonUtil.getByteBufferOrNull
here like in other parsers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for the other places in this PR
} | ||
|
||
if (manifestFile.keyMetadata() != null) { | ||
generator.writeStringField( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be doing the same thing as we're doing in
SingleValueParser.toJson(DataFile.KEY_METADATA.type(), contentFile.keyMetadata(), generator); |
} | ||
|
||
if (summary.lowerBound() != null) { | ||
generator.writeStringField( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be doing the same thing as in
SingleValueParser.toJson(DataFile.KEY_METADATA.type(), contentFile.keyMetadata(), generator); |
…ManifestsTable.ManifestListReadTask, and BaseEntriesTable.ManifestReadTask
bc32130
to
a294130
Compare
.hasMessageContaining("Invalid JSON node for manifest file: non-object "); | ||
} | ||
|
||
@TestTemplate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TestTemplate | |
@Test |
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.TestTemplate; | ||
|
||
public class TestManifestFileParser extends TestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this on purpose that this extends TestBase
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably a mistake. fixed.
@@ -1025,14 +1025,11 @@ acceptedBreaks: | |||
new: "class org.apache.iceberg.types.Types.NestedField" | |||
justification: "new Constructor added" | |||
org.apache.iceberg:iceberg-core: | |||
- code: "java.method.visibilityReduced" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for other reviewers: I've checked and these blocked have been moved when running
./gradlew :iceberg-core:revapiAcceptBreak --justification "Serialization across versions is not supported" \
--code "java.class.defaultSerializationChanged" \
--old "class org.apache.iceberg.GenericManifestFile" \
--new "class org.apache.iceberg.GenericManifestFile"
assertTaskEquals(task, deserializedTask); | ||
} | ||
|
||
static AllManifestsTable.ManifestListReadTask createTask() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static AllManifestsTable.ManifestListReadTask createTask() { | |
private AllManifestsTable.ManifestListReadTask createTask() { |
static AllManifestsTable.ManifestListReadTask createTask() { | ||
Schema dataTableSchema = TestBase.SCHEMA; | ||
HadoopFileIO fileIO = new HadoopFileIO(); | ||
fileIO.initialize(ImmutableMap.of("k1", "v1", "k2", "v2")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also check what happens when the hadoop file io has a conf, but let's get this PR in first and we can discuss this & improve it on #10926
assertTaskEquals(task, deserializedTask); | ||
} | ||
|
||
static BaseFilesTable.ManifestReadTask createTask() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static BaseFilesTable.ManifestReadTask createTask() { | |
private BaseFilesTable.ManifestReadTask createTask() { |
|
||
private void assertTaskEquals( | ||
BaseFilesTable.ManifestReadTask expected, BaseFilesTable.ManifestReadTask actual) { | ||
assertThat(expected.schema().sameSchema(actual.schema())).as("Schema should match").isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(actual.schema().asStruct()).isEqualTo(expected.schema().asStruct());
would be slightly better, because you would immediately see the differences in the schema when the assertion fails. Using isTrue
would require debugging both schemas to understand the difference.
Could you please update this also for the projection check in the next few lines and also do the same in TestAllManifestsTableTaskParser
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall this LGTM, I just left a few small suggestions that should make it easier for debugging in case some assertions fail. Thanks @stevenzwu, let's get this PR in and we can focus on the issue you ran into with #10926
thanks @nastra for the review |
…lManifestsTable.ManifestListReadTask, and BaseEntriesTable.ManifestReadTask (apache#10735)
This completes the JSON parser for scan task. These scan tasks are for metadata tables.
This would unblock Flink to switch to FLIP-27 source as the default. Flink unit tests pass with FLIP-27 as default, except for one test with limit clause (which I would follow up separately).
close issue #9597 .