Skip to content

Commit

Permalink
Change Append/Overwrite API to accept snapshot properties (apache#419)
Browse files Browse the repository at this point in the history
* added test for snapshot properties

* change append/overwrite to accept snapshot_properties

* Update tests/catalog/test_glue.py

Co-authored-by: Fokko Driesprong <[email protected]>

* Update pyiceberg/table/__init__.py

Co-authored-by: Fokko Driesprong <[email protected]>

* updated docs,docstrings

* fix linting

* Update mkdocs/docs/api.md

Co-authored-by: Fokko Driesprong <[email protected]>

* Update pyiceberg/table/__init__.py

Co-authored-by: Fokko Driesprong <[email protected]>

---------

Co-authored-by: Fokko Driesprong <[email protected]>
  • Loading branch information
Gowthami03B and Fokko authored Mar 19, 2024
1 parent d3db840 commit 5506fd0
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 11 deletions.
14 changes: 14 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,20 @@ table = table.transaction().remove_properties("abc").commit_transaction()
assert table.properties == {}
```

## Snapshot properties

Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API:

```python
tbl.append(df, snapshot_properties={"abc": "def"})

# or

tbl.overwrite(df, snapshot_properties={"abc": "def"})

assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
```

## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:
Expand Down
33 changes: 22 additions & 11 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,13 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
name_mapping=self._table.name_mapping(),
)

def update_snapshot(self) -> UpdateSnapshot:
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
"""Create a new UpdateSnapshot to produce a new snapshot for the table.
Returns:
A new UpdateSnapshot
"""
return UpdateSnapshot(self, io=self._table.io)
return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)

def update_spec(self) -> UpdateSpec:
"""Create a new UpdateSpec to update the partitioning of the table.
Expand Down Expand Up @@ -1095,12 +1095,13 @@ def name_mapping(self) -> Optional[NameMapping]:
else:
return None

def append(self, df: pa.Table) -> None:
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for appending a PyArrow table to the table.
Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
import pyarrow as pa
Expand All @@ -1116,7 +1117,7 @@ def append(self, df: pa.Table) -> None:
_check_schema(self.schema(), other_schema=df.schema)

with self.transaction() as txn:
with txn.update_snapshot().fast_append() as update_snapshot:
with txn.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
Expand All @@ -1125,14 +1126,17 @@ def append(self, df: pa.Table) -> None:
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None:
def overwrite(
self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
) -> None:
"""
Shorthand for overwriting the table with a PyArrow table.
Args:
df: The Arrow dataframe that will be used to overwrite the table
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
import pyarrow as pa
Expand All @@ -1151,7 +1155,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
_check_schema(self.schema(), other_schema=df.schema)

with self.transaction() as txn:
with txn.update_snapshot().overwrite() as update_snapshot:
with txn.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
Expand Down Expand Up @@ -2551,6 +2555,7 @@ def __init__(
transaction: Transaction,
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
super().__init__(transaction)
self.commit_uuid = commit_uuid or uuid.uuid4()
Expand All @@ -2562,6 +2567,7 @@ def __init__(
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
)
self._added_data_files = []
self.snapshot_properties = snapshot_properties

def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer:
self._added_data_files.append(data_file)
Expand Down Expand Up @@ -2629,7 +2635,7 @@ def _write_delete_manifest() -> List[ManifestFile]:

return added_manifests.result() + delete_manifests.result() + existing_manifests.result()

def _summary(self) -> Summary:
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
ssc = SnapshotSummaryCollector()
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
Expand All @@ -2652,7 +2658,7 @@ def _summary(self) -> Summary:
)

return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build()),
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=self._operation == Operation.OVERWRITE,
)
Expand All @@ -2661,7 +2667,7 @@ def _commit(self) -> UpdatesAndRequirements:
new_manifests = self._manifests()
next_sequence_number = self._transaction.table_metadata.next_sequence_number()

summary = self._summary()
summary = self._summary(self.snapshot_properties)

manifest_list_file_path = _generate_manifest_list_path(
location=self._transaction.table_metadata.location,
Expand Down Expand Up @@ -2776,13 +2782,17 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
class UpdateSnapshot:
_transaction: Transaction
_io: FileIO
_snapshot_properties: Dict[str, str]

def __init__(self, transaction: Transaction, io: FileIO) -> None:
def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str]) -> None:
self._transaction = transaction
self._io = io
self._snapshot_properties = snapshot_properties

def fast_append(self) -> FastAppendFiles:
return FastAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io)
return FastAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)

def overwrite(self) -> OverwriteFiles:
return OverwriteFiles(
Expand All @@ -2791,6 +2801,7 @@ def overwrite(self) -> OverwriteFiles:
else Operation.APPEND,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
)


Expand Down
66 changes: 66 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
Expand Down Expand Up @@ -692,3 +693,68 @@ def test_commit_table_properties(
updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}


@mock_aws
def test_commit_append_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
)

updated_table_metadata = table.metadata
summary = updated_table_metadata.snapshots[-1].summary
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert summary is not None
assert summary["snapshot_prop_a"] == "test_prop_a"


@mock_aws
def test_commit_overwrite_table_snapshot_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)

assert test_catalog._parse_metadata_version(table.metadata_location) == 0

table.append(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 1, "baz": False}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
)

assert test_catalog._parse_metadata_version(table.metadata_location) == 1

table.overwrite(
pa.Table.from_pylist(
[{"foo": "foo_val", "bar": 2, "baz": True}],
schema=schema_to_pyarrow(table_schema_simple),
),
snapshot_properties={"snapshot_prop_b": "test_prop_b"},
)

updated_table_metadata = table.metadata
summary = updated_table_metadata.snapshots[-1].summary
assert test_catalog._parse_metadata_version(table.metadata_location) == 2
assert summary is not None
assert summary["snapshot_prop_a"] is None
assert summary["snapshot_prop_b"] == "test_prop_b"

0 comments on commit 5506fd0

Please sign in to comment.