Skip to content

Commit

Permalink
Core,API: Set 503: added_snapshot_id as required (#11626)
Browse files Browse the repository at this point in the history
* Core,API: Set `503: added_snapshot_id` as required

`503: added_snapshot_id` field should be written as a required field,
but currently it is written as optional.

As the reference implementation should produce metadata that is
as close to the spec as possible.

For reading, this isn't a problem with the current Java implementation
as it will still read optional fields as required, but only thrown an
error when it encounters a `null` value.

* Improve constructor instead
  • Loading branch information
Fokko authored Nov 28, 2024
1 parent 3a04257 commit 8fccdec
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 10 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface ManifestFile {
Types.LongType.get(),
"Lowest sequence number in the manifest");
Types.NestedField SNAPSHOT_ID =
optional(
required(
503, "added_snapshot_id", Types.LongType.get(), "Snapshot ID that added the manifest");
Types.NestedField ADDED_FILES_COUNT =
optional(504, "added_files_count", Types.IntegerType.get(), "Added entry count");
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private void cacheManifests(FileIO fileIO) {
allManifests =
Lists.transform(
Arrays.asList(v1ManifestLocations),
location -> new GenericManifestFile(fileIO.newInputFile(location), 0));
location ->
new GenericManifestFile(fileIO.newInputFile(location), 0, this.snapshotId));
}

if (allManifests == null) {
Expand Down
28 changes: 27 additions & 1 deletion core/src/main/java/org/apache/iceberg/GenericManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@ public GenericManifestFile(Schema avroSchema) {
this.keyMetadata = null;
}

GenericManifestFile(InputFile file, int specId, long snapshotId) {
super(ManifestFile.schema().columns().size());
this.avroSchema = AVRO_SCHEMA;
this.file = file;
this.manifestPath = file.location();
this.length = null; // lazily loaded from file
this.specId = specId;
this.sequenceNumber = 0;
this.minSequenceNumber = 0;
this.snapshotId = snapshotId;
this.addedFilesCount = null;
this.addedRowsCount = null;
this.existingFilesCount = null;
this.existingRowsCount = null;
this.deletedFilesCount = null;
this.deletedRowsCount = null;
this.partitions = null;
this.keyMetadata = null;
}

/** Adjust the arg order to avoid conflict with the public constructor below */
GenericManifestFile(
String path,
Expand Down Expand Up @@ -167,7 +187,13 @@ private GenericManifestFile(GenericManifestFile toCopy) {
super(toCopy);
this.avroSchema = toCopy.avroSchema;
this.manifestPath = toCopy.manifestPath;
this.length = toCopy.length;
try {
this.length = toCopy.length();
} catch (UnsupportedOperationException e) {
// Can be removed when embedded manifests are dropped
// DummyFileIO does not support .length()
this.length = null;
}
this.specId = toCopy.specId;
this.content = toCopy.content;
this.sequenceNumber = toCopy.sequenceNumber;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private V2Metadata() {}
ManifestFile.MANIFEST_CONTENT.asRequired(),
ManifestFile.SEQUENCE_NUMBER.asRequired(),
ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
ManifestFile.SNAPSHOT_ID.asRequired(),
ManifestFile.SNAPSHOT_ID,
ManifestFile.ADDED_FILES_COUNT.asRequired(),
ManifestFile.EXISTING_FILES_COUNT.asRequired(),
ManifestFile.DELETED_FILES_COUNT.asRequired(),
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/V3Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private V3Metadata() {}
ManifestFile.MANIFEST_CONTENT.asRequired(),
ManifestFile.SEQUENCE_NUMBER.asRequired(),
ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
ManifestFile.SNAPSHOT_ID.asRequired(),
ManifestFile.SNAPSHOT_ID,
ManifestFile.ADDED_FILES_COUNT.asRequired(),
ManifestFile.EXISTING_FILES_COUNT.asRequired(),
ManifestFile.DELETED_FILES_COUNT.asRequired(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,8 +1244,8 @@ private String createManifestListWithManifestFiles(long snapshotId, Long parentS

List<ManifestFile> manifests =
ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0, snapshotId),
new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0, snapshotId));

try (ManifestListWriter writer =
ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private String createManifestListWithManifestFiles(long snapshotId, Long parentS

List<ManifestFile> manifests =
ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0, snapshotId),
new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0, snapshotId));

try (ManifestListWriter writer =
ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,8 @@ private String createManifestListWithManifestFile(
try (ManifestListWriter writer =
ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) {
writer.addAll(
ImmutableList.of(new GenericManifestFile(localInput(manifestFile), SPEC_5.specId())));
ImmutableList.of(
new GenericManifestFile(localInput(manifestFile), SPEC_5.specId(), snapshotId)));
}

return localInput(manifestList).location();
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ public void testFullProjection() throws Exception {
assertThat(cmp).as("Should contain the correct data value").isEqualTo(0);
}

@Test
public void testReadOptionalAsRequired() throws Exception {
Schema writeSchema =
new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.optional(1, "data", Types.StringType.get()));

Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table"));
record.put("id", 34L);
record.put("data", "test");

Schema readSchema =
new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.required(1, "data", Types.StringType.get()));

Record projected = writeAndRead("read_optional_as_required", writeSchema, readSchema, record);

int cmp = Comparators.charSequences().compare("test", (CharSequence) projected.get("data"));
assertThat(cmp).as("Should contain the correct data/renamed value").isEqualTo(0);
}

@Test
public void testReorderedFullProjection() throws Exception {
Schema schema =
Expand Down

0 comments on commit 8fccdec

Please sign in to comment.