Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core,API: Set 503: added_snapshot_id as required #11626

Merged
merged 3 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface ManifestFile {
Types.LongType.get(),
"Lowest sequence number in the manifest");
Types.NestedField SNAPSHOT_ID =
optional(
required(
503, "added_snapshot_id", Types.LongType.get(), "Snapshot ID that added the manifest");
Types.NestedField ADDED_FILES_COUNT =
optional(504, "added_files_count", Types.IntegerType.get(), "Added entry count");
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private void cacheManifests(FileIO fileIO) {
allManifests =
Lists.transform(
Arrays.asList(v1ManifestLocations),
location -> new GenericManifestFile(fileIO.newInputFile(location), 0));
location ->
new GenericManifestFile(fileIO.newInputFile(location), 0, this.snapshotId));
}

if (allManifests == null) {
Expand Down
28 changes: 27 additions & 1 deletion core/src/main/java/org/apache/iceberg/GenericManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@ public GenericManifestFile(Schema avroSchema) {
this.keyMetadata = null;
}

GenericManifestFile(InputFile file, int specId, long snapshotId) {
super(ManifestFile.schema().columns().size());
this.avroSchema = AVRO_SCHEMA;
this.file = file;
this.manifestPath = file.location();
this.length = null; // lazily loaded from file
this.specId = specId;
this.sequenceNumber = 0;
this.minSequenceNumber = 0;
this.snapshotId = snapshotId;
this.addedFilesCount = null;
this.addedRowsCount = null;
this.existingFilesCount = null;
this.existingRowsCount = null;
this.deletedFilesCount = null;
this.deletedRowsCount = null;
this.partitions = null;
this.keyMetadata = null;
}

/** Adjust the arg order to avoid conflict with the public constructor below */
GenericManifestFile(
String path,
Expand Down Expand Up @@ -167,7 +187,13 @@ private GenericManifestFile(GenericManifestFile toCopy) {
super(toCopy);
this.avroSchema = toCopy.avroSchema;
this.manifestPath = toCopy.manifestPath;
this.length = toCopy.length;
try {
this.length = toCopy.length();
} catch (UnsupportedOperationException e) {
// Can be removed when embedded manifests are dropped
// DummyFileIO does not support .length()
this.length = null;
}
Comment on lines +190 to +196
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See

/**
* The main purpose of this class is to lazily retrieve the path from a v1 Snapshot that has
* manifest lists
*/
private static class DummyFileIO implements FileIO {
@Override
public InputFile newInputFile(String path) {
return new InputFile() {
@Override
public long getLength() {
throw new UnsupportedOperationException();
}
@Override
public SeekableInputStream newStream() {
throw new UnsupportedOperationException();
}
@Override
public String location() {
return path;
}
@Override
public boolean exists() {
return true;
}
};
}
@Override
public OutputFile newOutputFile(String path) {
throw new UnsupportedOperationException();
}
@Override
public void deleteFile(String path) {
throw new UnsupportedOperationException();
}
}

this.specId = toCopy.specId;
this.content = toCopy.content;
this.sequenceNumber = toCopy.sequenceNumber;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private V2Metadata() {}
ManifestFile.MANIFEST_CONTENT.asRequired(),
ManifestFile.SEQUENCE_NUMBER.asRequired(),
ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
ManifestFile.SNAPSHOT_ID.asRequired(),
ManifestFile.SNAPSHOT_ID,
ManifestFile.ADDED_FILES_COUNT.asRequired(),
ManifestFile.EXISTING_FILES_COUNT.asRequired(),
ManifestFile.DELETED_FILES_COUNT.asRequired(),
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/V3Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private V3Metadata() {}
ManifestFile.MANIFEST_CONTENT.asRequired(),
ManifestFile.SEQUENCE_NUMBER.asRequired(),
ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
ManifestFile.SNAPSHOT_ID.asRequired(),
ManifestFile.SNAPSHOT_ID,
ManifestFile.ADDED_FILES_COUNT.asRequired(),
ManifestFile.EXISTING_FILES_COUNT.asRequired(),
ManifestFile.DELETED_FILES_COUNT.asRequired(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,8 +1244,8 @@ private String createManifestListWithManifestFiles(long snapshotId, Long parentS

List<ManifestFile> manifests =
ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0, snapshotId),
new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0, snapshotId));

try (ManifestListWriter writer =
ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private String createManifestListWithManifestFiles(long snapshotId, Long parentS

List<ManifestFile> manifests =
ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0, snapshotId),
new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0, snapshotId));

try (ManifestListWriter writer =
ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,8 @@ private String createManifestListWithManifestFile(
try (ManifestListWriter writer =
ManifestLists.write(1, Files.localOutput(manifestList), snapshotId, parentSnapshotId, 0)) {
writer.addAll(
ImmutableList.of(new GenericManifestFile(localInput(manifestFile), SPEC_5.specId())));
ImmutableList.of(
new GenericManifestFile(localInput(manifestFile), SPEC_5.specId(), snapshotId)));
}

return localInput(manifestList).location();
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ public void testFullProjection() throws Exception {
assertThat(cmp).as("Should contain the correct data value").isEqualTo(0);
}

@Test
public void testReadOptionalAsRequired() throws Exception {
Schema writeSchema =
new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.optional(1, "data", Types.StringType.get()));

Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table"));
record.put("id", 34L);
record.put("data", "test");

Schema readSchema =
new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.required(1, "data", Types.StringType.get()));

Record projected = writeAndRead("read_optional_as_required", writeSchema, readSchema, record);

int cmp = Comparators.charSequences().compare("test", (CharSequence) projected.get("data"));
assertThat(cmp).as("Should contain the correct data/renamed value").isEqualTo(0);
}

@Test
public void testReorderedFullProjection() throws Exception {
Schema schema =
Expand Down