From 6803ebad1caca83da0898a11920c3c7366da8953 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 2 Jun 2024 18:39:15 -0700 Subject: [PATCH 01/17] add ListPacker + tests --- pyiceberg/utils/bin_packing.py | 26 +++++++++++++++++++ tests/utils/test_bin_packing.py | 46 ++++++++++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/pyiceberg/utils/bin_packing.py b/pyiceberg/utils/bin_packing.py index ddebde13e2..0291619685 100644 --- a/pyiceberg/utils/bin_packing.py +++ b/pyiceberg/utils/bin_packing.py @@ -104,3 +104,29 @@ def remove_bin(self) -> Bin[T]: return bin_ else: return self.bins.pop(0) + + +class ListPacker(Generic[T]): + _target_weight: int + _lookback: int + _largest_bin_first: bool + + def __init__(self, target_weight: int, lookback: int, largest_bin_first: bool) -> None: + self._target_weight = target_weight + self._lookback = lookback + self._largest_bin_first = largest_bin_first + + def pack(self, items: List[T], weight_func: Callable[[T], int]) -> List[List[T]]: + return list( + PackingIterator( + items=items, + target_weight=self._target_weight, + lookback=self._lookback, + weight_func=weight_func, + largest_bin_first=self._largest_bin_first, + ) + ) + + def pack_end(self, items: List[T], weight_func: Callable[[T], int]) -> List[List[T]]: + packed = self.pack(items=list(reversed(items)), weight_func=weight_func) + return [list(reversed(bin_items)) for bin_items in reversed(packed)] diff --git a/tests/utils/test_bin_packing.py b/tests/utils/test_bin_packing.py index 054ea79556..3bfacdf481 100644 --- a/tests/utils/test_bin_packing.py +++ b/tests/utils/test_bin_packing.py @@ -20,7 +20,9 @@ import pytest -from pyiceberg.utils.bin_packing import PackingIterator +from pyiceberg.utils.bin_packing import ListPacker, PackingIterator + +INT_MAX = 2147483647 @pytest.mark.parametrize( @@ -83,4 +85,46 @@ def test_bin_packing_lookback( def weight_func(x: int) -> int: return x + packer: ListPacker[int] = ListPacker(target_weight, lookback, largest_bin_first) + assert list(PackingIterator(splits, target_weight, lookback, weight_func, largest_bin_first)) == expected_lists + assert list(packer.pack(splits, weight_func)) == expected_lists + + +@pytest.mark.parametrize( + "splits, target_weight, lookback, largest_bin_first, expected_lists", + [ + # Single Lookback Tests + ([1, 2, 3, 4, 5], 3, 1, False, [[1, 2], [3], [4], [5]]), + ([1, 2, 3, 4, 5], 4, 1, False, [[1, 2], [3], [4], [5]]), + ([1, 2, 3, 4, 5], 5, 1, False, [[1], [2, 3], [4], [5]]), + ([1, 2, 3, 4, 5], 6, 1, False, [[1, 2, 3], [4], [5]]), + ([1, 2, 3, 4, 5], 7, 1, False, [[1, 2], [3, 4], [5]]), + ([1, 2, 3, 4, 5], 8, 1, False, [[1, 2], [3, 4], [5]]), + ([1, 2, 3, 4, 5], 9, 1, False, [[1, 2, 3], [4, 5]]), + ([1, 2, 3, 4, 5], 11, 1, False, [[1, 2, 3], [4, 5]]), + ([1, 2, 3, 4, 5], 12, 1, False, [[1, 2], [3, 4, 5]]), + ([1, 2, 3, 4, 5], 14, 1, False, [[1], [2, 3, 4, 5]]), + ([1, 2, 3, 4, 5], 15, 1, False, [[1, 2, 3, 4, 5]]), + # Unlimited Lookback Tests + ([1, 2, 3, 4, 5], 3, INT_MAX, False, [[1, 2], [3], [4], [5]]), + ([1, 2, 3, 4, 5], 4, INT_MAX, False, [[2], [1, 3], [4], [5]]), + ([1, 2, 3, 4, 5], 5, INT_MAX, False, [[2, 3], [1, 4], [5]]), + ([1, 2, 3, 4, 5], 6, INT_MAX, False, [[3], [2, 4], [1, 5]]), + ([1, 2, 3, 4, 5], 7, INT_MAX, False, [[1], [3, 4], [2, 5]]), + ([1, 2, 3, 4, 5], 8, INT_MAX, False, [[1, 2, 4], [3, 5]]), + ([1, 2, 3, 4, 5], 9, INT_MAX, False, [[1, 2, 3], [4, 5]]), + ([1, 2, 3, 4, 5], 10, INT_MAX, False, [[2, 3], [1, 4, 5]]), + ([1, 2, 3, 4, 5], 11, INT_MAX, False, [[1, 3], [2, 4, 5]]), + ([1, 2, 3, 4, 5], 12, INT_MAX, False, [[1, 2], [3, 4, 5]]), + ([1, 2, 3, 4, 5], 13, INT_MAX, False, [[2], [1, 3, 4, 5]]), + ([1, 2, 3, 4, 5], 14, INT_MAX, False, [[1], [2, 3, 4, 5]]), + ([1, 2, 3, 4, 5], 15, INT_MAX, False, [[1, 2, 3, 4, 5]]), + ], +) +def test_reverse_bin_packing_lookback( + splits: List[int], target_weight: int, lookback: int, largest_bin_first: bool, expected_lists: List[List[int]] +) -> None: + packer: ListPacker[int] = ListPacker(target_weight, lookback, largest_bin_first) + result = packer.pack_end(splits, lambda x: x) + assert result == expected_lists From f0fc2603c1c9350a420d9d4fb80d036484ca4168 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 2 Jun 2024 19:20:54 -0700 Subject: [PATCH 02/17] add merge append --- pyiceberg/table/__init__.py | 208 ++++++++++++++++++- tests/integration/test_writes/test_writes.py | 49 +++++ 2 files changed, 255 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f160ab2441..2c5bb977dd 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -16,10 +16,13 @@ # under the License. from __future__ import annotations +import concurrent import itertools import uuid import warnings from abc import ABC, abstractmethod +from collections import defaultdict +from concurrent.futures import Future from copy import copy from dataclasses import dataclass from datetime import datetime @@ -62,7 +65,7 @@ inclusive_projection, manifest_evaluator, ) -from pyiceberg.io import FileIO, load_file_io +from pyiceberg.io import FileIO, OutputFile, load_file_io from pyiceberg.manifest import ( POSITIONAL_DELETE_SCHEMA, DataFile, @@ -71,6 +74,7 @@ ManifestEntry, ManifestEntryStatus, ManifestFile, + ManifestWriter, PartitionFieldSummary, write_manifest, write_manifest_list, @@ -136,6 +140,7 @@ StructType, transform_dict_value_to_str, ) +from pyiceberg.utils.bin_packing import ListPacker from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.datetime import datetime_to_millis from pyiceberg.utils.singleton import _convert_to_hashable_type @@ -240,6 +245,15 @@ class TableProperties: FORMAT_VERSION = "format-version" DEFAULT_FORMAT_VERSION = 2 + MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes" + MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB + + MANIFEST_MIN_MERGE_COUNT = "commit.manifest.min-count-to-merge" + MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100 + + MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled" + MANIFEST_MERGE_ENABLED_DEFAULT = True + class PropertyUtil: @staticmethod @@ -2751,10 +2765,12 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): commit_uuid: uuid.UUID + _io: FileIO _operation: Operation _snapshot_id: int _parent_snapshot_id: Optional[int] _added_data_files: List[DataFile] + _manifest_num_counter: itertools.count[int] def __init__( self, @@ -2775,6 +2791,7 @@ def __init__( ) self._added_data_files = [] self.snapshot_properties = snapshot_properties + self._manifest_num_counter = itertools.count(0) def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer: self._added_data_files.append(data_file) @@ -2786,6 +2803,10 @@ def _deleted_entries(self) -> List[ManifestEntry]: ... @abstractmethod def _existing_manifests(self) -> List[ManifestFile]: ... + def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: + """To perform any post-processing on the manifests before writing them to the new snapshot.""" + return manifests + def _manifests(self) -> List[ManifestFile]: def _write_added_manifest() -> List[ManifestFile]: if self._added_data_files: @@ -2840,7 +2861,7 @@ def _write_delete_manifest() -> List[ManifestFile]: delete_manifests = executor.submit(_write_delete_manifest) existing_manifests = executor.submit(self._existing_manifests) - return added_manifests.result() + delete_manifests.result() + existing_manifests.result() + return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result()) def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: ssc = SnapshotSummaryCollector() @@ -2913,6 +2934,34 @@ def _commit(self) -> UpdatesAndRequirements: ), ) + @property + def snapshot_id(self) -> int: + return self._snapshot_id + + def spec(self, spec_id: int) -> PartitionSpec: + return self._transaction.table_metadata.specs()[spec_id] + + def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: + return write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=spec, + schema=self._transaction.table_metadata.schema(), + output_file=self.new_manifest_output(), + snapshot_id=self._snapshot_id, + ) + + def new_manifest_output(self) -> OutputFile: + return self._io.new_output( + _new_manifest_path( + location=self._transaction.table_metadata.location, + num=next(self._manifest_num_counter), + commit_uuid=self.commit_uuid, + ) + ) + + def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]: + return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted) + class FastAppendFiles(_MergingSnapshotProducer): def _existing_manifests(self) -> List[ManifestFile]: @@ -2943,6 +2992,61 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] +class MergeAppendFiles(FastAppendFiles): + _target_size_bytes: int + _min_count_to_merge: int + _merge_enabled: bool + + def __init__( + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + ) -> None: + super().__init__(operation, transaction, io, commit_uuid) + self._target_size_bytes = PropertyUtil.property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._min_count_to_merge = PropertyUtil.property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_MIN_MERGE_COUNT, + TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, + ) # type: ignore + self._merge_enabled = PropertyUtil.property_as_bool( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_MERGE_ENABLED, + TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, + ) + + def _deleted_entries(self) -> List[ManifestEntry]: + """To determine if we need to record any deleted manifest entries. + + In case of an append, nothing is deleted. + """ + return [] + + def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: + """To perform any post-processing on the manifests before writing them to the new snapshot. + + In MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge + if automatic merge is enabled. + """ + unmerged_data_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DATA] + unmerged_deletes_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DELETES] + + data_manifest_merge_manager = _ManifestMergeManager( + target_size_bytes=self._target_size_bytes, + min_count_to_merge=self._min_count_to_merge, + merge_enabled=self._merge_enabled, + snapshot_producer=self, + ) + + return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests + + class OverwriteFiles(_MergingSnapshotProducer): def _existing_manifests(self) -> List[ManifestFile]: """To determine if there are any existing manifest files. @@ -3001,6 +3105,9 @@ def fast_append(self) -> FastAppendFiles: operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties ) + def merge_append(self) -> MergeAppendFiles: + return MergeAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io) + def overwrite(self) -> OverwriteFiles: return OverwriteFiles( operation=Operation.OVERWRITE @@ -3735,3 +3842,100 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions) return table_partitions + + +class _ManifestMergeManager: + _target_size_bytes: int + _min_count_to_merge: int + _merge_enabled: bool + _snapshot_producer: _MergingSnapshotProducer + + def __init__( + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _MergingSnapshotProducer + ) -> None: + self._target_size_bytes = target_size_bytes + self._min_count_to_merge = min_count_to_merge + self._merge_enabled = merge_enabled + self._snapshot_producer = snapshot_producer + + def _group_by_spec( + self, first_manifest: ManifestFile, remaining_manifests: List[ManifestFile] + ) -> Dict[int, List[ManifestFile]]: + groups = defaultdict(list) + groups[first_manifest.partition_spec_id].append(first_manifest) + for manifest in remaining_manifests: + groups[manifest.partition_spec_id].append(manifest) + return groups + + def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile: + with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: + for manifest in manifest_bin: + for entry in self._snapshot_producer.fetch_manifest_entry(manifest): + if entry.status == ManifestEntryStatus.DELETED: + # suppress deletes from previous snapshots. only files deleted by this snapshot + # should be added to the new manifest + if entry.snapshot_id == self._snapshot_producer.snapshot_id: + writer.add_entry(entry) + elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id: + # adds from this snapshot are still adds, otherwise they should be existing + writer.add_entry(entry) + else: + # add all files from the old manifest as existing files + writer.add_entry( + ManifestEntry( + status=ManifestEntryStatus.EXISTING, + snapshot_id=entry.snapshot_id, + data_sequence_number=entry.data_sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + ) + + return writer.to_manifest_file() + + def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]: + packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False) + bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length) + + def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]: + output_manifests = [] + if len(manifest_bin) == 1: + output_manifests.append(manifest_bin[0]) + elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge: + # if the bin has the first manifest (the new data files or an appended manifest file) + # then only merge it + # if the number of manifests is above the minimum count. this is applied only to bins + # with an in-memory + # manifest so that large manifests don't prevent merging older groups. + output_manifests.extend(manifest_bin) + else: + output_manifests.append(self._create_manifest(spec_id, manifest_bin)) + + return output_manifests + + executor = ExecutorFactory.get_or_create() + futures = [executor.submit(merge_bin, b) for b in bins] + + # for consistent ordering, we need to maintain future order + futures_index = {f: i for i, f in enumerate(futures)} + completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], key=lambda f: futures_index[f]) + for future in concurrent.futures.as_completed(futures): + completed_futures.add(future) + + bin_results: List[List[ManifestFile]] = [f.result() for f in completed_futures if f.result()] + + return [manifest for bin_result in bin_results for manifest in bin_result] + + def merge_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: + if not self._merge_enabled or len(manifests) == 0: + return manifests + + first_manifest = manifests[0] + remaining_manifests = manifests[1:] + groups = self._group_by_spec(first_manifest, remaining_manifests) + + merged_manifests = [] + for spec_id in reversed(groups.keys()): + merged_manifests.extend(self._merge_group(first_manifest, spec_id, groups[spec_id])) + + return merged_manifests diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index e329adcd5c..4886aa2ee6 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -871,3 +871,52 @@ def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null tbl.append(arrow_table_without_some_columns) # overwrite and then append should produce twice the data assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_merge_manifest_min_count_to_merge( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + tbl_a = _create_table( + session_catalog, + "default.merge_manifest_a", + {"commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + [], + ) + tbl_b = _create_table( + session_catalog, + "default.merge_manifest_b", + {"commit.manifest.min-count-to-merge": "1", "commit.manifest.target-size-bytes": "1", "format-version": format_version}, + [], + ) + tbl_c = _create_table( + session_catalog, + "default.merge_manifest_c", + {"commit.manifest-merge.enabled": "false", "format-version": format_version}, + [], + ) + + # tbl_a should merge all manifests into 1 + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + + # tbl_b should not merge any manifests because the target size is too small + tbl_b.append(arrow_table_with_null) + tbl_b.append(arrow_table_with_null) + tbl_b.append(arrow_table_with_null) + + # tbl_c should not merge any manifests because merging is disabled + tbl_c.append(arrow_table_with_null) + tbl_c.append(arrow_table_with_null) + tbl_c.append(arrow_table_with_null) + + assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore + assert len(tbl_b.current_snapshot().manifests(tbl_b.io)) == 3 # type: ignore + assert len(tbl_c.current_snapshot().manifests(tbl_c.io)) == 3 # type: ignore + + # tbl_a and tbl_c should contain the same data + assert tbl_a.scan().to_arrow().equals(tbl_c.scan().to_arrow()) + # tbl_b and tbl_c should contain the same data + assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow()) From cbb8cecee9a226a5b6568e36316f13d24e9acc3c Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 3 Jun 2024 03:32:21 +0000 Subject: [PATCH 03/17] add merge_append --- pyiceberg/table/__init__.py | 49 ++++++++++++++++++++ tests/integration/test_writes/test_writes.py | 20 ++++---- 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2c5bb977dd..851e589a87 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -428,6 +428,44 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: update_snapshot.append_data_file(data_file) + def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand API for appending a PyArrow table to a table transaction. + + Args: + df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + if unsupported_partitions := [ + field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform + ]: + raise ValueError( + f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." + ) + + _check_schema_compatible(self._table.schema(), other_schema=df.schema) + # cast if the two schemas are compatible but not equal + table_arrow_schema = self._table.schema().as_arrow() + if table_arrow_schema != df.schema: + df = df.cast(table_arrow_schema) + + with self.update_snapshot(snapshot_properties=snapshot_properties).merge_append() as update_snapshot: + # skip writing data files if the dataframe is empty + if df.shape[0] > 0: + data_files = _dataframe_to_data_files( + table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, + io=self._table.io + ) + for data_file in data_files: + update_snapshot.append_data_file(data_file) def overwrite( self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: @@ -1352,6 +1390,17 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties) + def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand API for appending a PyArrow table to the table. + + Args: + df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + with self.transaction() as tx: + tx.merge_append(df=df, snapshot_properties=snapshot_properties) + def overwrite( self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 4886aa2ee6..a5094a2657 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -876,7 +876,7 @@ def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_merge_manifest_min_count_to_merge( - spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int + session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: tbl_a = _create_table( session_catalog, @@ -898,19 +898,19 @@ def test_merge_manifest_min_count_to_merge( ) # tbl_a should merge all manifests into 1 - tbl_a.append(arrow_table_with_null) - tbl_a.append(arrow_table_with_null) - tbl_a.append(arrow_table_with_null) + tbl_a.merge_append(arrow_table_with_null) + tbl_a.merge_append(arrow_table_with_null) + tbl_a.merge_append(arrow_table_with_null) # tbl_b should not merge any manifests because the target size is too small - tbl_b.append(arrow_table_with_null) - tbl_b.append(arrow_table_with_null) - tbl_b.append(arrow_table_with_null) + tbl_b.merge_append(arrow_table_with_null) + tbl_b.merge_append(arrow_table_with_null) + tbl_b.merge_append(arrow_table_with_null) # tbl_c should not merge any manifests because merging is disabled - tbl_c.append(arrow_table_with_null) - tbl_c.append(arrow_table_with_null) - tbl_c.append(arrow_table_with_null) + tbl_c.merge_append(arrow_table_with_null) + tbl_c.merge_append(arrow_table_with_null) + tbl_c.merge_append(arrow_table_with_null) assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore assert len(tbl_b.current_snapshot().manifests(tbl_b.io)) == 3 # type: ignore From bf63c0391684e6987cbdc8928c9278f6d27c34a0 Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 3 Jun 2024 00:26:18 -0700 Subject: [PATCH 04/17] fix snapshot inheritance --- pyiceberg/manifest.py | 67 +++++++++++++++++++++++++++++++++++++ pyiceberg/table/__init__.py | 40 +++++++++------------- 2 files changed, 83 insertions(+), 24 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index defe5958c5..027af2a2aa 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -402,6 +402,46 @@ class ManifestEntry(Record): def __init__(self, *data: Any, **named_data: Any) -> None: super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data}) + def _wrap( + self, + new_status: ManifestEntryStatus, + new_snapshot_id: int, + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + self.status = new_status + self.snapshot_id = new_snapshot_id + self.data_sequence_number = new_data_sequence_number + self.file_sequence_number = new_file_sequence_number + self.data_file = new_file + return self + + def _wrap_append(self, new_snapshot_id: int, new_data_sequence_number: Optional[int], new_file: DataFile) -> ManifestEntry: + return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file) + + def _wrap_delete( + self, + new_snapshot_id: int, + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + return self._wrap( + ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file + ) + + def _wrap_existing( + self, + new_snapshot_id: int, + new_data_sequence_number: Optional[int], + new_file_sequence_number: Optional[int], + new_file: DataFile, + ) -> ManifestEntry: + return self._wrap( + ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file + ) + PARTITION_FIELD_SUMMARY_TYPE = StructType( NestedField(509, "contains_null", BooleanType(), required=True), @@ -654,6 +694,7 @@ class ManifestWriter(ABC): _deleted_rows: int _min_data_sequence_number: Optional[int] _partitions: List[Record] + _reused_entry_wrapper: ManifestEntry def __init__( self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str] = EMPTY_DICT @@ -673,6 +714,7 @@ def __init__( self._deleted_rows = 0 self._min_data_sequence_number = None self._partitions = [] + self._reused_entry_wrapper = ManifestEntry() def __enter__(self) -> ManifestWriter: """Open the writer.""" @@ -763,6 +805,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter: self._writer.write_block([self.prepare_entry(entry)]) return self + def add(self, entry: ManifestEntry) -> ManifestWriter: + if entry.data_sequence_number is not None and entry.data_sequence_number >= 0: + self.add_entry( + self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file) + ) + else: + self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file)) + return self + + def delete(self, entry: ManifestEntry) -> ManifestWriter: + self.add_entry( + self._reused_entry_wrapper._wrap_delete( + self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + ) + ) + return self + + def existing(self, entry: ManifestEntry) -> ManifestWriter: + self.add_entry( + self._reused_entry_wrapper._wrap_existing( + self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + ) + ) + return self + class ManifestWriterV1(ManifestWriter): def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 851e589a87..3af95d4c78 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -430,12 +430,12 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ - Shorthand API for appending a PyArrow table to a table transaction. + Shorthand API for appending a PyArrow table to a table transaction. - Args: - df: The Arrow dataframe that will be appended to overwrite the table - snapshot_properties: Custom properties to be added to the snapshot summary - """ + Args: + df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ try: import pyarrow as pa except ModuleNotFoundError as e: @@ -461,11 +461,11 @@ def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( - table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, - io=self._table.io + table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io ) for data_file in data_files: update_snapshot.append_data_file(data_file) + def overwrite( self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: @@ -1392,12 +1392,12 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ - Shorthand API for appending a PyArrow table to the table. + Shorthand API for appending a PyArrow table to the table. - Args: - df: The Arrow dataframe that will be appended to overwrite the table - snapshot_properties: Custom properties to be added to the snapshot summary - """ + Args: + df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ with self.transaction() as tx: tx.merge_append(df=df, snapshot_properties=snapshot_properties) @@ -3919,26 +3919,18 @@ def _group_by_spec( def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile: with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: for manifest in manifest_bin: - for entry in self._snapshot_producer.fetch_manifest_entry(manifest): + for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): if entry.status == ManifestEntryStatus.DELETED: # suppress deletes from previous snapshots. only files deleted by this snapshot # should be added to the new manifest if entry.snapshot_id == self._snapshot_producer.snapshot_id: - writer.add_entry(entry) + writer.delete(entry) elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id: # adds from this snapshot are still adds, otherwise they should be existing - writer.add_entry(entry) + writer.add(entry) else: # add all files from the old manifest as existing files - writer.add_entry( - ManifestEntry( - status=ManifestEntryStatus.EXISTING, - snapshot_id=entry.snapshot_id, - data_sequence_number=entry.data_sequence_number, - file_sequence_number=entry.file_sequence_number, - data_file=entry.data_file, - ) - ) + writer.existing(entry) return writer.to_manifest_file() From 9dd69af6ab58a1ad69b42976175c892fae39a265 Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 3 Jun 2024 22:16:37 -0700 Subject: [PATCH 05/17] test manifest file and entries --- pyiceberg/table/__init__.py | 2 +- tests/integration/test_writes/test_writes.py | 93 ++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3af95d4c78..b6d07aa47f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2870,7 +2870,7 @@ def _write_added_manifest() -> List[ManifestFile]: snapshot_id=self._snapshot_id, ) as writer: for data_file in self._added_data_files: - writer.add_entry( + writer.add( ManifestEntry( status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index a5094a2657..df834cf7aa 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -920,3 +920,96 @@ def test_merge_manifest_min_count_to_merge( assert tbl_a.scan().to_arrow().equals(tbl_c.scan().to_arrow()) # tbl_b and tbl_c should contain the same data assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow()) + + # verify the sequence number of tbl_a's only manifest file + tbl_a_manifest = tbl_a.current_snapshot().manifests(tbl_a.io)[0] # type: ignore + assert tbl_a_manifest.sequence_number == (3 if format_version == 2 else 0) + assert tbl_a_manifest.min_sequence_number == (1 if format_version == 2 else 0) + + # verify the manifest entries of tbl_a, in which the manifests are merged + tbl_a_entries = tbl_a.inspect.entries().to_pydict() + assert tbl_a_entries["status"] == [1, 0, 0] + assert tbl_a_entries["sequence_number"] == [3, 2, 1] if format_version == 2 else [0, 0, 0] + assert tbl_a_entries["file_sequence_number"] == [3, 2, 1] if format_version == 2 else [0, 0, 0] + for i in range(3): + tbl_a_data_file = tbl_a_entries["data_file"][i] + assert tbl_a_data_file["column_sizes"] == [ + (1, 49), + (2, 78), + (3, 128), + (4, 94), + (5, 118), + (6, 94), + (7, 118), + (8, 118), + (9, 118), + (10, 94), + (11, 78), + (12, 109), + ] + assert tbl_a_data_file["content"] == 0 + assert tbl_a_data_file["equality_ids"] is None + assert tbl_a_data_file["file_format"] == "PARQUET" + assert tbl_a_data_file["file_path"].startswith("s3://warehouse/default/merge_manifest_a/data/") + assert tbl_a_data_file["key_metadata"] is None + assert tbl_a_data_file["lower_bounds"] == [ + (1, b"\x00"), + (2, b"a"), + (3, b"aaaaaaaaaaaaaaaa"), + (4, b"\x01\x00\x00\x00"), + (5, b"\x01\x00\x00\x00\x00\x00\x00\x00"), + (6, b"\x00\x00\x00\x80"), + (7, b"\x00\x00\x00\x00\x00\x00\x00\x80"), + (8, b"\x00\x9bj\xca8\xf1\x05\x00"), + (9, b"\x00\x9bj\xca8\xf1\x05\x00"), + (10, b"\x9eK\x00\x00"), + (11, b"\x01"), + (12, b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" b"\x00\x00\x00\x00"), + ] + assert tbl_a_data_file["nan_value_counts"] == [] + assert tbl_a_data_file["null_value_counts"] == [ + (1, 1), + (2, 1), + (3, 1), + (4, 1), + (5, 1), + (6, 1), + (7, 1), + (8, 1), + (9, 1), + (10, 1), + (11, 1), + (12, 1), + ] + assert tbl_a_data_file["partition"] == {} + assert tbl_a_data_file["record_count"] == 3 + assert tbl_a_data_file["sort_order_id"] is None + assert tbl_a_data_file["split_offsets"] == [4] + assert tbl_a_data_file["upper_bounds"] == [ + (1, b"\x01"), + (2, b"z"), + (3, b"zzzzzzzzzzzzzzz{"), + (4, b"\t\x00\x00\x00"), + (5, b"\t\x00\x00\x00\x00\x00\x00\x00"), + (6, b"fff?"), + (7, b"\xcd\xcc\xcc\xcc\xcc\xcc\xec?"), + (8, b"\x00\xbb\r\xab\xdb\xf5\x05\x00"), + (9, b"\x00\xbb\r\xab\xdb\xf5\x05\x00"), + (10, b"\xd9K\x00\x00"), + (11, b"\x12"), + (12, b"\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11\x11" b"\x11\x11\x11\x11"), + ] + assert tbl_a_data_file["value_counts"] == [ + (1, 3), + (2, 3), + (3, 3), + (4, 3), + (5, 3), + (6, 3), + (7, 3), + (8, 3), + (9, 3), + (10, 3), + (11, 3), + (12, 3), + ] From 4921a7f21c7b0dc3339fa85790dba2f0c87092f1 Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 3 Jun 2024 22:39:12 -0700 Subject: [PATCH 06/17] add doc --- mkdocs/docs/api.md | 4 ++++ mkdocs/docs/configuration.md | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 70b5fd62eb..306b9207dd 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -273,6 +273,10 @@ tbl.append(df) # or +tbl.merge_append(df) + +# or + tbl.overwrite(df) ``` diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index f8a69119c8..044cc80da6 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -61,6 +61,21 @@ Iceberg tables support table properties to configure table behavior. | `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | | `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit | +## Table behavior options + +| Key | Options | Default | Description | +| ------------------------------------ | ------------------- | ------- | ----------------------------------------------------------- | +| `commit.manifest.target-size-bytes` | Size in bytes | 8MB | Target size when merging manifest files | +| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files | +| `commit.manifest-merge.enabled` | Boolean | True | Controls whether to automatically merge manifests on writes | + + + +!!! note "Fast append" + PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) which ignores `commit.manifest*` and does not merge manifests on writes. To make table commit respect `commit.manifest*`, use [`merge_append`](api.md#write-support) instead. + + + # FileIO Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed. From 984ca4185e4b85c9c4f2d9610bc58d6d0067cb04 Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 3 Jun 2024 22:46:56 -0700 Subject: [PATCH 07/17] fix lint --- mkdocs/docs/configuration.md | 2 +- pyiceberg/table/__init__.py | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 044cc80da6..6d995bd832 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -72,7 +72,7 @@ Iceberg tables support table properties to configure table behavior. !!! note "Fast append" - PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) which ignores `commit.manifest*` and does not merge manifests on writes. To make table commit respect `commit.manifest*`, use [`merge_append`](api.md#write-support) instead. + PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) which ignores `commit.manifest*` and does not merge manifests on writes. To make table merge manifests on writes and respect `commit.manifest*`, use [`merge_append`](api.md#write-support) instead. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b6d07aa47f..1a56022ebc 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -430,7 +430,12 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ - Shorthand API for appending a PyArrow table to a table transaction. + Shorthand API for appending a PyArrow table to a table transaction and merging manifests on write. + + The manifest merge behavior is controlled by table properties: + - commit.manifest.target-size-bytes + - commit.manifest.min-count-to-merge + - commit.manifest-merge.enabled Args: df: The Arrow dataframe that will be appended to overwrite the table @@ -1392,7 +1397,12 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ - Shorthand API for appending a PyArrow table to the table. + Shorthand API for appending a PyArrow table to a table transaction and merging manifests on write. + + The manifest merge behavior is controlled by table properties: + - commit.manifest.target-size-bytes + - commit.manifest.min-count-to-merge + - commit.manifest-merge.enabled Args: df: The Arrow dataframe that will be appended to overwrite the table @@ -3070,13 +3080,6 @@ def __init__( TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, ) - def _deleted_entries(self) -> List[ManifestEntry]: - """To determine if we need to record any deleted manifest entries. - - In case of an append, nothing is deleted. - """ - return [] - def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: """To perform any post-processing on the manifests before writing them to the new snapshot. From 8510f7105c76b5fa243cadeb1e5d1bb9bfdfcdf3 Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 3 Jun 2024 22:48:08 -0700 Subject: [PATCH 08/17] change test name --- tests/integration/test_writes/test_writes.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index df834cf7aa..676093cd45 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -875,9 +875,7 @@ def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) -def test_merge_manifest_min_count_to_merge( - session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int -) -> None: +def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: tbl_a = _create_table( session_catalog, "default.merge_manifest_a", From a7da318b4d1def07bd63f7768be58cfcb31cfa69 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 9 Jun 2024 23:15:37 -0700 Subject: [PATCH 09/17] address review comments --- pyiceberg/table/__init__.py | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0974ba9d04..a30345076b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3922,12 +3922,9 @@ def __init__( self._merge_enabled = merge_enabled self._snapshot_producer = snapshot_producer - def _group_by_spec( - self, first_manifest: ManifestFile, remaining_manifests: List[ManifestFile] - ) -> Dict[int, List[ManifestFile]]: + def _group_by_spec(self, manifests: List[ManifestFile]) -> Dict[int, List[ManifestFile]]: groups = defaultdict(list) - groups[first_manifest.partition_spec_id].append(first_manifest) - for manifest in remaining_manifests: + for manifest in manifests: groups[manifest.partition_spec_id].append(manifest) return groups @@ -3935,16 +3932,14 @@ def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> Ma with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer: for manifest in manifest_bin: for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False): - if entry.status == ManifestEntryStatus.DELETED: - # suppress deletes from previous snapshots. only files deleted by this snapshot - # should be added to the new manifest - if entry.snapshot_id == self._snapshot_producer.snapshot_id: - writer.delete(entry) + if entry.status == ManifestEntryStatus.DELETED and entry.snapshot_id == self._snapshot_producer.snapshot_id: + # only files deleted by this snapshot should be added to the new manifest + writer.delete(entry) elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id: - # adds from this snapshot are still adds, otherwise they should be existing + # added entries from this snapshot are still added, otherwise they should be existing writer.add(entry) - else: - # add all files from the old manifest as existing files + elif entry.status != ManifestEntryStatus.DELETED: + # add all non-deleted files from the old manifest as existing files writer.existing(entry) return writer.to_manifest_file() @@ -3958,11 +3953,9 @@ def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]: if len(manifest_bin) == 1: output_manifests.append(manifest_bin[0]) elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge: - # if the bin has the first manifest (the new data files or an appended manifest file) - # then only merge it - # if the number of manifests is above the minimum count. this is applied only to bins - # with an in-memory - # manifest so that large manifests don't prevent merging older groups. + # if the bin has the first manifest (the new data files or an appended manifest file) then only + # merge it if the number of manifests is above the minimum count. this is applied only to bins + # with an in-memory manifest so that large manifests don't prevent merging older groups. output_manifests.extend(manifest_bin) else: output_manifests.append(self._create_manifest(spec_id, manifest_bin)) @@ -3987,8 +3980,7 @@ def merge_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]: return manifests first_manifest = manifests[0] - remaining_manifests = manifests[1:] - groups = self._group_by_spec(first_manifest, remaining_manifests) + groups = self._group_by_spec(manifests) merged_manifests = [] for spec_id in reversed(groups.keys()): From c4feda5db83cfb230caefa124d7a8f2600d920f7 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 9 Jun 2024 23:17:27 -0700 Subject: [PATCH 10/17] rename _MergingSnapshotProducer to _SnapshotProducer --- pyiceberg/table/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a30345076b..8d629a36f1 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2834,7 +2834,7 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths)) -class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): +class _SnapshotProducer(UpdateTableMetadata["_SnapshotProducer"]): commit_uuid: uuid.UUID _io: FileIO _operation: Operation @@ -2864,7 +2864,7 @@ def __init__( self.snapshot_properties = snapshot_properties self._manifest_num_counter = itertools.count(0) - def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer: + def append_data_file(self, data_file: DataFile) -> _SnapshotProducer: self._added_data_files.append(data_file) return self @@ -3034,7 +3034,7 @@ def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = T return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted) -class FastAppendFiles(_MergingSnapshotProducer): +class FastAppendFiles(_SnapshotProducer): def _existing_manifests(self) -> List[ManifestFile]: """To determine if there are any existing manifest files. @@ -3111,7 +3111,7 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests -class OverwriteFiles(_MergingSnapshotProducer): +class OverwriteFiles(_SnapshotProducer): def _existing_manifests(self) -> List[ManifestFile]: """To determine if there are any existing manifest files. @@ -3912,10 +3912,10 @@ class _ManifestMergeManager: _target_size_bytes: int _min_count_to_merge: int _merge_enabled: bool - _snapshot_producer: _MergingSnapshotProducer + _snapshot_producer: _SnapshotProducer def __init__( - self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _MergingSnapshotProducer + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer ) -> None: self._target_size_bytes = target_size_bytes self._min_count_to_merge = min_count_to_merge From 9777e9b1ec0ff5b2ffbb1c722e1173f5d6f662ba Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 1 Jul 2024 01:34:44 -0700 Subject: [PATCH 11/17] fix a serious bug --- pyiceberg/table/__init__.py | 11 ++--------- tests/catalog/test_sql.py | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 756ccb15a1..80a7705e7c 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3081,14 +3081,11 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile def _manifests(self) -> List[ManifestFile]: def _write_added_manifest() -> List[ManifestFile]: if self._added_data_files: - output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, num=0, commit_uuid=self.commit_uuid - ) with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.spec(), schema=self._transaction.table_metadata.schema(), - output_file=self._io.new_output(output_file_location), + output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: for data_file in self._added_data_files: @@ -3109,15 +3106,11 @@ def _write_delete_manifest() -> List[ManifestFile]: # Check if we need to mark the files as deleted deleted_entries = self._deleted_entries() if len(deleted_entries) > 0: - output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, num=1, commit_uuid=self.commit_uuid - ) - with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.spec(), schema=self._transaction.table_metadata.schema(), - output_file=self._io.new_output(output_file_location), + output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: for delete_entry in deleted_entries: diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 24adfb88ab..a40d3385c5 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1490,3 +1490,30 @@ def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, table_id # Act and Assert for a non-existing table assert catalog.table_exists(("non", "exist")) is False + + +@pytest.mark.parametrize( + "catalog", + [ + lazy_fixture("catalog_memory"), + lazy_fixture("catalog_sqlite"), + ], +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_merge_manifests_file_integrity(catalog: SqlCatalog, arrow_table_with_null: pa.Table, format_version: int) -> None: + # temporary test for proof of correctness + catalog.create_namespace_if_not_exists("default") + try: + catalog.drop_table("default.test_merge_manifest") + except NoSuchTableError: + pass + tbl = catalog.create_table( + "default.test_merge_manifest", + arrow_table_with_null.schema, + properties={"commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + ) + + for _ in range(5): + tbl.merge_append(arrow_table_with_null) + + assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null) From 66dddbe28777af9cbbba595d6d280631e9a10a2b Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 1 Jul 2024 22:24:34 -0700 Subject: [PATCH 12/17] update the doc --- mkdocs/docs/api.md | 4 ---- mkdocs/docs/configuration.md | 12 ++++++------ 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 0705dcecb0..6da2fc3a8b 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -273,10 +273,6 @@ tbl.append(df) # or -tbl.merge_append(df) - -# or - tbl.overwrite(df) ``` diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 90cca0f266..25834311e4 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -63,16 +63,16 @@ Iceberg tables support table properties to configure table behavior. ## Table behavior options -| Key | Options | Default | Description | -| ------------------------------------ | ------------------- | ------- | ----------------------------------------------------------- | -| `commit.manifest.target-size-bytes` | Size in bytes | 8MB | Target size when merging manifest files | -| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files | -| `commit.manifest-merge.enabled` | Boolean | True | Controls whether to automatically merge manifests on writes | +| Key | Options | Default | Description | +| ------------------------------------ | ------------------- | ------------- | ----------------------------------------------------------- | +| `commit.manifest.target-size-bytes` | Size in bytes | 8388608 (8MB) | Target size when merging manifest files | +| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files | +| `commit.manifest-merge.enabled` | Boolean | False | Controls whether to automatically merge manifests on writes | !!! note "Fast append" - PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) which ignores `commit.manifest*` and does not merge manifests on writes. To make table merge manifests on writes and respect `commit.manifest*`, use [`merge_append`](api.md#write-support) instead. + Unlike Java implementation, PyIceberg default to the [fast append](api.md#write-support) and thus `commit.manifest-merge.enabled` is set to `False` by default. From aff1bea2c69c364081bc43475b45c9eac0b6bba0 Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 1 Jul 2024 22:41:18 -0700 Subject: [PATCH 13/17] remove merge_append as public API --- pyiceberg/table/__init__.py | 78 ++++---------------- tests/catalog/test_sql.py | 2 +- tests/integration/test_writes/test_writes.py | 18 ++--- 3 files changed, 25 insertions(+), 73 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 80a7705e7c..79583cfaa6 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -503,57 +503,22 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) if table_arrow_schema != df.schema: df = df.cast(table_arrow_schema) - with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: - # skip writing data files if the dataframe is empty - if df.shape[0] > 0: - data_files = _dataframe_to_data_files( - table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io - ) - for data_file in data_files: - update_snapshot.append_data_file(data_file) - - def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: - """ - Shorthand API for appending a PyArrow table to a table transaction and merging manifests on write. - - The manifest merge behavior is controlled by table properties: - - commit.manifest.target-size-bytes - - commit.manifest.min-count-to-merge - - commit.manifest-merge.enabled - - Args: - df: The Arrow dataframe that will be appended to overwrite the table - snapshot_properties: Custom properties to be added to the snapshot summary - """ - try: - import pyarrow as pa - except ModuleNotFoundError as e: - raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e - - if not isinstance(df, pa.Table): - raise ValueError(f"Expected PyArrow table, got: {df}") - - if unsupported_partitions := [ - field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform - ]: - raise ValueError( - f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." - ) - - _check_schema_compatible(self._table.schema(), other_schema=df.schema) - # cast if the two schemas are compatible but not equal - table_arrow_schema = self._table.schema().as_arrow() - if table_arrow_schema != df.schema: - df = df.cast(table_arrow_schema) + manifest_merge_enabled = PropertyUtil.property_as_bool( + self.table_metadata.properties, + TableProperties.MANIFEST_MERGE_ENABLED, + TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, + ) + update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties) + append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append - with self.update_snapshot(snapshot_properties=snapshot_properties).merge_append() as update_snapshot: + with append_method() as append_files: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( - table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io + table_metadata=self._table.metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io ) for data_file in data_files: - update_snapshot.append_data_file(data_file) + append_files.append_data_file(data_file) def overwrite( self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT @@ -1511,22 +1476,6 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) with self.transaction() as tx: tx.append(df=df, snapshot_properties=snapshot_properties) - def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: - """ - Shorthand API for appending a PyArrow table to a table transaction and merging manifests on write. - - The manifest merge behavior is controlled by table properties: - - commit.manifest.target-size-bytes - - commit.manifest.min-count-to-merge - - commit.manifest-merge.enabled - - Args: - df: The Arrow dataframe that will be appended to overwrite the table - snapshot_properties: Custom properties to be added to the snapshot summary - """ - with self.transaction() as tx: - tx.merge_append(df=df, snapshot_properties=snapshot_properties) - def overwrite( self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT ) -> None: @@ -3264,8 +3213,9 @@ def __init__( transaction: Transaction, io: FileIO, commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: - super().__init__(operation, transaction, io, commit_uuid) + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) self._target_size_bytes = PropertyUtil.property_as_int( self._transaction.table_metadata.properties, TableProperties.MANIFEST_TARGET_SIZE_BYTES, @@ -3360,7 +3310,9 @@ def fast_append(self) -> FastAppendFiles: ) def merge_append(self) -> MergeAppendFiles: - return MergeAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io) + return MergeAppendFiles( + operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + ) def overwrite(self) -> OverwriteFiles: return OverwriteFiles( diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index a40d3385c5..ec6f75c3ba 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1514,6 +1514,6 @@ def test_merge_manifests_file_integrity(catalog: SqlCatalog, arrow_table_with_nu ) for _ in range(5): - tbl.merge_append(arrow_table_with_null) + tbl.append(arrow_table_with_null) assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 754aed22af..ec0920d38a 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -950,19 +950,19 @@ def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Tab ) # tbl_a should merge all manifests into 1 - tbl_a.merge_append(arrow_table_with_null) - tbl_a.merge_append(arrow_table_with_null) - tbl_a.merge_append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) # tbl_b should not merge any manifests because the target size is too small - tbl_b.merge_append(arrow_table_with_null) - tbl_b.merge_append(arrow_table_with_null) - tbl_b.merge_append(arrow_table_with_null) + tbl_b.append(arrow_table_with_null) + tbl_b.append(arrow_table_with_null) + tbl_b.append(arrow_table_with_null) # tbl_c should not merge any manifests because merging is disabled - tbl_c.merge_append(arrow_table_with_null) - tbl_c.merge_append(arrow_table_with_null) - tbl_c.merge_append(arrow_table_with_null) + tbl_c.append(arrow_table_with_null) + tbl_c.append(arrow_table_with_null) + tbl_c.append(arrow_table_with_null) assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore assert len(tbl_b.current_snapshot().manifests(tbl_b.io)) == 3 # type: ignore From 7625857072a5fc0c1e58ad3422b26ec437a6dd33 Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 1 Jul 2024 22:45:02 -0700 Subject: [PATCH 14/17] make default to false --- pyiceberg/table/__init__.py | 2 +- tests/integration/test_writes/test_writes.py | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 79583cfaa6..cb41223a9a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -254,7 +254,7 @@ class TableProperties: MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100 MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled" - MANIFEST_MERGE_ENABLED_DEFAULT = True + MANIFEST_MERGE_ENABLED_DEFAULT = False class PropertyUtil: diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index ec0920d38a..ba0b9759a3 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -933,19 +933,24 @@ def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Tab tbl_a = _create_table( session_catalog, "default.merge_manifest_a", - {"commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + {"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "1", "format-version": format_version}, [], ) tbl_b = _create_table( session_catalog, "default.merge_manifest_b", - {"commit.manifest.min-count-to-merge": "1", "commit.manifest.target-size-bytes": "1", "format-version": format_version}, + { + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "1", + "commit.manifest.target-size-bytes": "1", + "format-version": format_version, + }, [], ) tbl_c = _create_table( session_catalog, "default.merge_manifest_c", - {"commit.manifest-merge.enabled": "false", "format-version": format_version}, + {"commit.manifest.min-count-to-merge": "1", "format-version": format_version}, [], ) From 3e3a1b495318a2c5251c304b8c200a2bf4d3b05b Mon Sep 17 00:00:00 2001 From: HonahX Date: Mon, 1 Jul 2024 23:22:07 -0700 Subject: [PATCH 15/17] add test description --- tests/catalog/test_sql.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index ec6f75c3ba..32a1233add 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1500,8 +1500,9 @@ def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, table_id ], ) @pytest.mark.parametrize("format_version", [1, 2]) -def test_merge_manifests_file_integrity(catalog: SqlCatalog, arrow_table_with_null: pa.Table, format_version: int) -> None: - # temporary test for proof of correctness +def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with_null: pa.Table, format_version: int) -> None: + # To catch manifest file name collision bug during merge: + # https://github.com/apache/iceberg-python/pull/363#discussion_r1660691918 catalog.create_namespace_if_not_exists("default") try: catalog.drop_table("default.test_merge_manifest") @@ -1510,7 +1511,11 @@ def test_merge_manifests_file_integrity(catalog: SqlCatalog, arrow_table_with_nu tbl = catalog.create_table( "default.test_merge_manifest", arrow_table_with_null.schema, - properties={"commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + properties={ + "commit.manifest-merge.enabled": "true", + "commit.manifest.min-count-to-merge": "2", + "format-version": format_version, + }, ) for _ in range(5): From 71a5fe0b74beba949e6a77009203d88528171042 Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 9 Jul 2024 23:20:53 -0700 Subject: [PATCH 16/17] fix merge conflict --- pyiceberg/table/__init__.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 344cad0a8f..39bcfc2ef6 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -3094,7 +3094,6 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] - _manifest_counter: itertools.count[int] def __init__( self, @@ -3117,7 +3116,6 @@ def __init__( self._deleted_data_files = set() self.snapshot_properties = snapshot_properties self._manifest_num_counter = itertools.count(0) - self._manifest_counter = itertools.count(0) def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) @@ -3292,7 +3290,7 @@ def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = T return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted) -class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]): +class DeleteFiles(_SnapshotProducer["DeleteFiles"]): """Will delete manifest entries from the current snapshot based on the predicate. This will produce a DELETE snapshot: @@ -3393,16 +3391,11 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> # Rewrite the manifest if len(existing_entries) > 0: - output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, - num=next(self._manifest_counter), - commit_uuid=self.commit_uuid, - ) with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], schema=self._transaction.table_metadata.schema(), - output_file=self._io.new_output(output_file_location), + output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: for existing_entry in existing_entries: @@ -3433,7 +3426,8 @@ def files_affected(self) -> bool: """Indicate if any manifest-entries can be dropped.""" return len(self._deleted_entries()) > 0 -class FastAppendFiles(_SnapshotProducer): + +class FastAppendFiles(_SnapshotProducer["FastAppendFiles"]): def _existing_manifests(self) -> List[ManifestFile]: """To determine if there are any existing manifest files. @@ -3529,18 +3523,13 @@ def _existing_manifests(self) -> List[ManifestFile]: if len(found_deleted_data_files) == 0: existing_files.append(manifest_file) else: - # We have to rewrite the - output_file_location = _new_manifest_path( - location=self._transaction.table_metadata.location, - num=next(self._manifest_counter), - commit_uuid=self.commit_uuid, - ) + # We have to rewrite the manifest file without the deleted data files if any(entry.data_file not in found_deleted_data_files for entry in entries): with write_manifest( format_version=self._transaction.table_metadata.format_version, spec=self._transaction.table_metadata.spec(), schema=self._transaction.table_metadata.schema(), - output_file=self._io.new_output(output_file_location), + output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, ) as writer: [ @@ -4522,14 +4511,14 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T return table_partitions -class _ManifestMergeManager: +class _ManifestMergeManager(Generic[U]): _target_size_bytes: int _min_count_to_merge: int _merge_enabled: bool - _snapshot_producer: _SnapshotProducer + _snapshot_producer: _SnapshotProducer[U] def __init__( - self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer + self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _SnapshotProducer[U] ) -> None: self._target_size_bytes = target_size_bytes self._min_count_to_merge = min_count_to_merge From c7e4095535289c49f93437ba7d18306480ddd6f6 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 10 Jul 2024 00:03:21 -0700 Subject: [PATCH 17/17] fix snapshot_id issue --- pyiceberg/manifest.py | 12 +++++---- tests/integration/test_writes/test_writes.py | 26 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index b555b0493f..6148d9a69a 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -407,7 +407,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None: def _wrap( self, new_status: ManifestEntryStatus, - new_snapshot_id: int, + new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, @@ -419,12 +419,14 @@ def _wrap( self.data_file = new_file return self - def _wrap_append(self, new_snapshot_id: int, new_data_sequence_number: Optional[int], new_file: DataFile) -> ManifestEntry: + def _wrap_append( + self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile + ) -> ManifestEntry: return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file) def _wrap_delete( self, - new_snapshot_id: int, + new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, @@ -435,7 +437,7 @@ def _wrap_delete( def _wrap_existing( self, - new_snapshot_id: int, + new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, @@ -838,7 +840,7 @@ def delete(self, entry: ManifestEntry) -> ManifestWriter: def existing(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( self._reused_entry_wrapper._wrap_existing( - self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file ) ) return self diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index ae2def1fb4..2542fbdb38 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1084,6 +1084,29 @@ def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Tab # tbl_b and tbl_c should contain the same data assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow()) + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_merge_manifests_file_content(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: + tbl_a = _create_table( + session_catalog, + "default.merge_manifest_a", + {"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + [], + ) + + # tbl_a should merge all manifests into 1 + tbl_a.append(arrow_table_with_null) + + tbl_a_first_entries = tbl_a.inspect.entries().to_pydict() + first_snapshot_id = tbl_a_first_entries["snapshot_id"][0] + first_data_file_path = tbl_a_first_entries["data_file"][0]["file_path"] + + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + + assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore + # verify the sequence number of tbl_a's only manifest file tbl_a_manifest = tbl_a.current_snapshot().manifests(tbl_a.io)[0] # type: ignore assert tbl_a_manifest.sequence_number == (3 if format_version == 2 else 0) @@ -1114,6 +1137,9 @@ def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Tab assert tbl_a_data_file["equality_ids"] is None assert tbl_a_data_file["file_format"] == "PARQUET" assert tbl_a_data_file["file_path"].startswith("s3://warehouse/default/merge_manifest_a/data/") + if tbl_a_data_file["file_path"] == first_data_file_path: + # verify that the snapshot id recorded should be the one where the file was added + assert tbl_a_entries["snapshot_id"][i] == first_snapshot_id assert tbl_a_data_file["key_metadata"] is None assert tbl_a_data_file["lower_bounds"] == [ (1, b"\x00"),