Skip to content

Commit

Permalink
fix snapshot_id issue
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Jul 10, 2024
1 parent 71a5fe0 commit c7e4095
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 5 deletions.
12 changes: 7 additions & 5 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None:
def _wrap(
self,
new_status: ManifestEntryStatus,
new_snapshot_id: int,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
Expand All @@ -419,12 +419,14 @@ def _wrap(
self.data_file = new_file
return self

def _wrap_append(self, new_snapshot_id: int, new_data_sequence_number: Optional[int], new_file: DataFile) -> ManifestEntry:
def _wrap_append(
self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile
) -> ManifestEntry:
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)

def _wrap_delete(
self,
new_snapshot_id: int,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
Expand All @@ -435,7 +437,7 @@ def _wrap_delete(

def _wrap_existing(
self,
new_snapshot_id: int,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
Expand Down Expand Up @@ -838,7 +840,7 @@ def delete(self, entry: ManifestEntry) -> ManifestWriter:
def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,29 @@ def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Tab
# tbl_b and tbl_c should contain the same data
assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow())


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_merge_manifests_file_content(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
tbl_a = _create_table(
session_catalog,
"default.merge_manifest_a",
{"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "1", "format-version": format_version},
[],
)

# tbl_a should merge all manifests into 1
tbl_a.append(arrow_table_with_null)

tbl_a_first_entries = tbl_a.inspect.entries().to_pydict()
first_snapshot_id = tbl_a_first_entries["snapshot_id"][0]
first_data_file_path = tbl_a_first_entries["data_file"][0]["file_path"]

tbl_a.append(arrow_table_with_null)
tbl_a.append(arrow_table_with_null)

assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore

# verify the sequence number of tbl_a's only manifest file
tbl_a_manifest = tbl_a.current_snapshot().manifests(tbl_a.io)[0] # type: ignore
assert tbl_a_manifest.sequence_number == (3 if format_version == 2 else 0)
Expand Down Expand Up @@ -1114,6 +1137,9 @@ def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Tab
assert tbl_a_data_file["equality_ids"] is None
assert tbl_a_data_file["file_format"] == "PARQUET"
assert tbl_a_data_file["file_path"].startswith("s3://warehouse/default/merge_manifest_a/data/")
if tbl_a_data_file["file_path"] == first_data_file_path:
# verify that the snapshot id recorded should be the one where the file was added
assert tbl_a_entries["snapshot_id"][i] == first_snapshot_id
assert tbl_a_data_file["key_metadata"] is None
assert tbl_a_data_file["lower_bounds"] == [
(1, b"\x00"),
Expand Down

0 comments on commit c7e4095

Please sign in to comment.