diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c75a0a5983..517e6c86df 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -213,6 +213,9 @@ class TableProperties: METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column" + WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit" + WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0 + DEFAULT_NAME_MAPPING = "schema.name-mapping.default" FORMAT_VERSION = "format-version" DEFAULT_FORMAT_VERSION = 2 @@ -2628,9 +2631,19 @@ def _write_delete_manifest() -> List[ManifestFile]: def _summary(self) -> Summary: ssc = SnapshotSummaryCollector() + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ssc.set_partition_summary_limit(partition_summary_limit) for data_file in self._added_data_files: - ssc.add_file(data_file=data_file) + ssc.add_file( + data_file=data_file, + partition_spec=self._transaction.table_metadata.spec(), + schema=self._transaction.table_metadata.schema(), + ) previous_snapshot = ( self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a2f15d4405..f74ac4b7d4 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -15,19 +15,16 @@ # specific language governing permissions and limitations # under the License. import time +from collections import defaultdict from enum import Enum -from typing import ( - Any, - Dict, - List, - Mapping, - Optional, -) +from typing import Any, DefaultDict, Dict, List, Mapping, Optional from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.typedef import IcebergBaseModel ADDED_DATA_FILES = 'added-data-files' @@ -52,8 +49,8 @@ TOTAL_DELETE_FILES = 'total-delete-files' TOTAL_RECORDS = 'total-records' TOTAL_FILE_SIZE = 'total-files-size' - - +CHANGED_PARTITION_COUNT_PROP = 'changed-partition-count' +CHANGED_PARTITION_PREFIX = "partitions." OPERATION = "operation" @@ -77,6 +74,97 @@ def __repr__(self) -> str: return f"Operation.{self.name}" +class UpdateMetrics: + added_file_size: int + removed_file_size: int + added_data_files: int + removed_data_files: int + added_eq_delete_files: int + removed_eq_delete_files: int + added_pos_delete_files: int + removed_pos_delete_files: int + added_delete_files: int + removed_delete_files: int + added_records: int + deleted_records: int + added_pos_deletes: int + removed_pos_deletes: int + added_eq_deletes: int + removed_eq_deletes: int + + def __init__(self) -> None: + self.added_file_size = 0 + self.removed_file_size = 0 + self.added_data_files = 0 + self.removed_data_files = 0 + self.added_eq_delete_files = 0 + self.removed_eq_delete_files = 0 + self.added_pos_delete_files = 0 + self.removed_pos_delete_files = 0 + self.added_delete_files = 0 + self.removed_delete_files = 0 + self.added_records = 0 + self.deleted_records = 0 + self.added_pos_deletes = 0 + self.removed_pos_deletes = 0 + self.added_eq_deletes = 0 + self.removed_eq_deletes = 0 + + def add_file(self, data_file: DataFile) -> None: + self.added_file_size += data_file.file_size_in_bytes + + if data_file.content == DataFileContent.DATA: + self.added_data_files += 1 + self.added_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.added_delete_files += 1 + self.added_pos_delete_files += 1 + self.added_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.added_delete_files += 1 + self.added_eq_delete_files += 1 + self.added_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def remove_file(self, data_file: DataFile) -> None: + self.removed_file_size += data_file.file_size_in_bytes + + if data_file.content == DataFileContent.DATA: + self.removed_data_files += 1 + self.deleted_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.removed_delete_files += 1 + self.removed_pos_delete_files += 1 + self.removed_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.removed_delete_files += 1 + self.removed_eq_delete_files += 1 + self.removed_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def to_dict(self) -> Dict[str, str]: + properties: Dict[str, str] = {} + set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE) + set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE) + set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES) + set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES) + set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES) + set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) + set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) + set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) + set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES) + set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES) + set_when_positive(properties, self.added_records, ADDED_RECORDS) + set_when_positive(properties, self.deleted_records, DELETED_RECORDS) + set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES) + set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES) + set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES) + set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES) + return properties + + class Summary(IcebergBaseModel, Mapping[str, str]): """A class that stores the summary information for a Snapshot. @@ -172,100 +260,53 @@ class SnapshotLogEntry(IcebergBaseModel): class SnapshotSummaryCollector: - added_file_size: int - removed_file_size: int - added_data_files: int - removed_data_files: int - added_eq_delete_files: int - removed_eq_delete_files: int - added_pos_delete_files: int - removed_pos_delete_files: int - added_delete_files: int - removed_delete_files: int - added_records: int - deleted_records: int - added_pos_deletes: int - removed_pos_deletes: int - added_eq_deletes: int - removed_eq_deletes: int + metrics: UpdateMetrics + partition_metrics: DefaultDict[str, UpdateMetrics] + max_changed_partitions_for_summaries: int def __init__(self) -> None: - self.added_file_size = 0 - self.removed_file_size = 0 - self.added_data_files = 0 - self.removed_data_files = 0 - self.added_eq_delete_files = 0 - self.removed_eq_delete_files = 0 - self.added_pos_delete_files = 0 - self.removed_pos_delete_files = 0 - self.added_delete_files = 0 - self.removed_delete_files = 0 - self.added_records = 0 - self.deleted_records = 0 - self.added_pos_deletes = 0 - self.removed_pos_deletes = 0 - self.added_eq_deletes = 0 - self.removed_eq_deletes = 0 - - def add_file(self, data_file: DataFile) -> None: - self.added_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.added_data_files += 1 - self.added_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.added_delete_files += 1 - self.added_pos_delete_files += 1 - self.added_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.added_delete_files += 1 - self.added_eq_delete_files += 1 - self.added_eq_deletes += data_file.record_count - else: - raise ValueError(f"Unknown data file content: {data_file.content}") - - def remove_file(self, data_file: DataFile) -> None: - self.removed_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.removed_data_files += 1 - self.deleted_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.removed_delete_files += 1 - self.removed_pos_delete_files += 1 - self.removed_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.removed_delete_files += 1 - self.removed_eq_delete_files += 1 - self.removed_eq_deletes += data_file.record_count + self.metrics = UpdateMetrics() + self.partition_metrics = defaultdict(UpdateMetrics) + self.max_changed_partitions_for_summaries = 0 + + def set_partition_summary_limit(self, limit: int) -> None: + self.max_changed_partitions_for_summaries = limit + + def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None: + self.metrics.add_file(data_file) + if len(data_file.partition.record_fields()) != 0: + self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema) + + def remove_file( + self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC + ) -> None: + self.metrics.remove_file(data_file) + if len(data_file.partition.record_fields()) != 0: + self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema) + + def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None: + partition_path = partition_spec.partition_to_path(file.partition, schema) + partition_metrics: UpdateMetrics = self.partition_metrics[partition_path] + + if is_add_file: + partition_metrics.add_file(file) else: - raise ValueError(f"Unknown data file content: {data_file.content}") + partition_metrics.remove_file(file) def build(self) -> Dict[str, str]: - def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: - if num > 0: - properties[property_name] = str(num) - - properties: Dict[str, str] = {} - set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE) - set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE) - set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES) - set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES) - set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES) - set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) - set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) - set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) - set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES) - set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES) - set_when_positive(properties, self.added_records, ADDED_RECORDS) - set_when_positive(properties, self.deleted_records, DELETED_RECORDS) - set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES) - set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES) - set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES) - set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES) + properties = self.metrics.to_dict() + changed_partitions_size = len(self.partition_metrics) + set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP) + if changed_partitions_size <= self.max_changed_partitions_for_summaries: + for partition_path, update_metrics_partition in self.partition_metrics.items(): + if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0: + properties[CHANGED_PARTITION_PREFIX + partition_path] = summary return properties + def _partition_summary(self, update_metrics: UpdateMetrics) -> str: + return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()]) + def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: for prop in { @@ -366,3 +407,8 @@ def _update_totals(total_property: str, added_property: str, removed_property: s ) return summary + + +def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: + if num > 0: + properties[property_name] = str(num) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 82c4ace711..c7095cb71b 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -525,12 +525,15 @@ def test_summaries_with_only_nulls( 'total-records': '2', } - assert summaries[0] == { - 'total-data-files': '0', - 'total-delete-files': '0', + assert summaries[2] == { + 'removed-files-size': '4239', 'total-equality-deletes': '0', - 'total-files-size': '0', 'total-position-deletes': '0', + 'deleted-data-files': '1', + 'total-delete-files': '0', + 'total-files-size': '0', + 'deleted-records': '2', + 'total-data-files': '0', 'total-records': '0', } diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 3591847ad6..e85ecce506 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -18,7 +18,17 @@ import pytest from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, update_snapshot_summaries +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import Record +from pyiceberg.types import ( + BooleanType, + IntegerType, + NestedField, + StringType, +) @pytest.fixture @@ -137,26 +147,66 @@ def manifest_file() -> ManifestFile: ) -@pytest.fixture -def data_file() -> DataFile: - return DataFile( - content=DataFileContent.DATA, - record_count=100, - file_size_in_bytes=1234, - ) +@pytest.mark.integration +def test_snapshot_summary_collector(table_schema_simple: Schema) -> None: + ssc = SnapshotSummaryCollector() + + assert ssc.build() == {} + data_file = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record()) + ssc.add_file(data_file, schema=table_schema_simple) + + assert ssc.build() == { + 'added-data-files': '1', + 'added-files-size': '1234', + 'added-records': '100', + } + +@pytest.mark.integration +def test_snapshot_summary_collector_with_partition() -> None: + # Given -def test_snapshot_summary_collector(data_file: DataFile) -> None: ssc = SnapshotSummaryCollector() assert ssc.build() == {} + schema = Schema( + NestedField(field_id=1, name="bool_field", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="string_field", field_type=StringType(), required=False), + NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False), + ) + spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name='int_field')) + data_file_1 = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(int_field=1)) + data_file_2 = DataFile(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(int_field=2)) + # When + ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_2, schema=schema, partition_spec=spec) + + # Then + assert ssc.build() == { + 'added-files-size': '1234', + 'removed-files-size': '5555', + 'added-data-files': '1', + 'deleted-data-files': '2', + 'added-records': '100', + 'deleted-records': '300', + 'changed-partition-count': '2', + } - ssc.add_file(data_file) + # When + ssc.set_partition_summary_limit(10) + # Then assert ssc.build() == { - 'added-data-files': '1', 'added-files-size': '1234', + 'removed-files-size': '5555', + 'added-data-files': '1', + 'deleted-data-files': '2', 'added-records': '100', + 'deleted-records': '300', + 'changed-partition-count': '2', + 'partitions.int_field=1': 'added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100', + 'partitions.int_field=2': 'removed-files-size=4321,deleted-data-files=1,deleted-records=200', }