diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 4056bc3aec..3cd0d312ff 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -563,6 +563,20 @@ table = table.transaction().remove_properties("abc").commit_transaction() assert table.properties == {} ``` +## Snapshot properties + +Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API: + +```python +tbl.append(df, snapshot_properties={"abc": "def"}) + +# or + +tbl.overwrite(df, snapshot_properties={"abc": "def"}) + +assert tbl.metadata.snapshots[-1].summary["abc"] == "def" +``` + ## Query the data To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 517e6c86df..437a2640e8 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -332,13 +332,13 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self._table.name_mapping(), ) - def update_snapshot(self) -> UpdateSnapshot: + def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot: """Create a new UpdateSnapshot to produce a new snapshot for the table. Returns: A new UpdateSnapshot """ - return UpdateSnapshot(self, io=self._table.io) + return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) def update_spec(self) -> UpdateSpec: """Create a new UpdateSpec to update the partitioning of the table. @@ -1095,12 +1095,13 @@ def name_mapping(self) -> Optional[NameMapping]: else: return None - def append(self, df: pa.Table) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for appending a PyArrow table to the table. Args: df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary """ try: import pyarrow as pa @@ -1116,7 +1117,7 @@ def append(self, df: pa.Table) -> None: _check_schema(self.schema(), other_schema=df.schema) with self.transaction() as txn: - with txn.update_snapshot().fast_append() as update_snapshot: + with txn.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( @@ -1125,7 +1126,9 @@ def append(self, df: pa.Table) -> None: for data_file in data_files: update_snapshot.append_data_file(data_file) - def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None: + def overwrite( + self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + ) -> None: """ Shorthand for overwriting the table with a PyArrow table. @@ -1133,6 +1136,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T df: The Arrow dataframe that will be used to overwrite the table overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite + snapshot_properties: Custom properties to be added to the snapshot summary """ try: import pyarrow as pa @@ -1151,7 +1155,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T _check_schema(self.schema(), other_schema=df.schema) with self.transaction() as txn: - with txn.update_snapshot().overwrite() as update_snapshot: + with txn.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( @@ -2551,6 +2555,7 @@ def __init__( transaction: Transaction, io: FileIO, commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() @@ -2562,6 +2567,7 @@ def __init__( snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) self._added_data_files = [] + self.snapshot_properties = snapshot_properties def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer: self._added_data_files.append(data_file) @@ -2629,7 +2635,7 @@ def _write_delete_manifest() -> List[ManifestFile]: return added_manifests.result() + delete_manifests.result() + existing_manifests.result() - def _summary(self) -> Summary: + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: ssc = SnapshotSummaryCollector() partition_summary_limit = int( self._transaction.table_metadata.properties.get( @@ -2652,7 +2658,7 @@ def _summary(self) -> Summary: ) return update_snapshot_summaries( - summary=Summary(operation=self._operation, **ssc.build()), + summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties), previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, truncate_full_table=self._operation == Operation.OVERWRITE, ) @@ -2661,7 +2667,7 @@ def _commit(self) -> UpdatesAndRequirements: new_manifests = self._manifests() next_sequence_number = self._transaction.table_metadata.next_sequence_number() - summary = self._summary() + summary = self._summary(self.snapshot_properties) manifest_list_file_path = _generate_manifest_list_path( location=self._transaction.table_metadata.location, @@ -2776,13 +2782,17 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]: class UpdateSnapshot: _transaction: Transaction _io: FileIO + _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO) -> None: + def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str]) -> None: self._transaction = transaction self._io = io + self._snapshot_properties = snapshot_properties def fast_append(self) -> FastAppendFiles: - return FastAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io) + return FastAppendFiles( + operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + ) def overwrite(self) -> OverwriteFiles: return OverwriteFiles( @@ -2791,6 +2801,7 @@ def overwrite(self) -> OverwriteFiles: else Operation.APPEND, transaction=self._transaction, io=self._io, + snapshot_properties=self._snapshot_properties, ) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 6d44d92724..d4ed085c51 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -32,6 +32,7 @@ NoSuchTableError, TableAlreadyExistsError, ) +from pyiceberg.io.pyarrow import schema_to_pyarrow from pyiceberg.schema import Schema from pyiceberg.types import IntegerType from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX @@ -692,3 +693,68 @@ def test_commit_table_properties( updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + + +@mock_aws +def test_commit_append_table_snapshot_properties( + _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + + table.append( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 1, "baz": False}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_properties={"snapshot_prop_a": "test_prop_a"}, + ) + + updated_table_metadata = table.metadata + summary = updated_table_metadata.snapshots[-1].summary + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 + assert summary is not None + assert summary["snapshot_prop_a"] == "test_prop_a" + + +@mock_aws +def test_commit_overwrite_table_snapshot_properties( + _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + + table.append( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 1, "baz": False}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_properties={"snapshot_prop_a": "test_prop_a"}, + ) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 + + table.overwrite( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 2, "baz": True}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_properties={"snapshot_prop_b": "test_prop_b"}, + ) + + updated_table_metadata = table.metadata + summary = updated_table_metadata.snapshots[-1].summary + assert test_catalog._parse_metadata_version(table.metadata_location) == 2 + assert summary is not None + assert summary["snapshot_prop_a"] is None + assert summary["snapshot_prop_b"] == "test_prop_b"