Skip to content

Commit

Permalink
Add Snapshot logic and Summary generation (apache#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Dec 7, 2023
1 parent 54a08f3 commit 0cbb71c
Show file tree
Hide file tree
Showing 3 changed files with 415 additions and 6 deletions.
246 changes: 244 additions & 2 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,40 @@
Any,
Dict,
List,
Mapping,
Optional,
)

from pydantic import Field, PrivateAttr, model_serializer

from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestFile, read_manifest_list
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.typedef import IcebergBaseModel

ADDED_DATA_FILES = 'added-data-files'
ADDED_DELETE_FILES = 'added-delete-files'
ADDED_EQUALITY_DELETES = 'added-equality-deletes'
ADDED_FILE_SIZE = 'added-files-size'
ADDED_POSITION_DELETES = 'added-position-deletes'
ADDED_POSITION_DELETE_FILES = 'added-position-delete-files'
ADDED_RECORDS = 'added-records'
DELETED_DATA_FILES = 'deleted-data-files'
DELETED_RECORDS = 'deleted-records'
ADDED_EQUALITY_DELETE_FILES = 'added-equality-delete-files'
REMOVED_DELETE_FILES = 'removed-delete-files'
REMOVED_EQUALITY_DELETES = 'removed-equality-deletes'
REMOVED_EQUALITY_DELETE_FILES = 'removed-equality-delete-files'
REMOVED_FILE_SIZE = 'removed-files-size'
REMOVED_POSITION_DELETES = 'removed-position-deletes'
REMOVED_POSITION_DELETE_FILES = 'removed-position-delete-files'
TOTAL_EQUALITY_DELETES = 'total-equality-deletes'
TOTAL_POSITION_DELETES = 'total-position-deletes'
TOTAL_DATA_FILES = 'total-data-files'
TOTAL_DELETE_FILES = 'total-delete-files'
TOTAL_RECORDS = 'total-records'
TOTAL_FILE_SIZE = 'total-files-size'


OPERATION = "operation"


Expand All @@ -51,7 +76,7 @@ def __repr__(self) -> str:
return f"Operation.{self.name}"


class Summary(IcebergBaseModel):
class Summary(IcebergBaseModel, Mapping[str, str]):
"""A class that stores the summary information for a Snapshot.
The snapshot summary’s operation field is used by some operations,
Expand All @@ -65,6 +90,25 @@ def __init__(self, operation: Operation, **data: Any) -> None:
super().__init__(operation=operation, **data)
self._additional_properties = data

def __getitem__(self, __key: str) -> Optional[Any]: # type: ignore
"""Return a key as it is a map."""
if __key.lower() == 'operation':
return self.operation
else:
return self._additional_properties.get(__key)

def __setitem__(self, key: str, value: Any) -> None:
"""Set a key as it is a map."""
if key.lower() == 'operation':
self.operation = value
else:
self._additional_properties[key] = value

def __len__(self) -> int:
"""Return the number of keys in the summary."""
# Operation is required
return 1 + len(self._additional_properties)

@model_serializer
def ser_model(self) -> Dict[str, str]:
return {
Expand All @@ -81,6 +125,14 @@ def __repr__(self) -> str:
repr_properties = f", **{repr(self._additional_properties)}" if self._additional_properties else ""
return f"Summary({repr(self.operation)}{repr_properties})"

def __eq__(self, other: Any) -> bool:
"""Compare if the summary is equal to another summary."""
return (
self.operation == other.operation and self.additional_properties == other.additional_properties
if isinstance(other, Summary)
else False
)


class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
Expand Down Expand Up @@ -116,3 +168,193 @@ class MetadataLogEntry(IcebergBaseModel):
class SnapshotLogEntry(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
timestamp_ms: int = Field(alias="timestamp-ms")


class SnapshotSummaryCollector:
added_file_size: int
removed_file_size: int
added_data_files: int
removed_data_files: int
added_eq_delete_files: int
removed_eq_delete_files: int
added_pos_delete_files: int
removed_pos_delete_files: int
added_delete_files: int
removed_delete_files: int
added_records: int
deleted_records: int
added_pos_deletes: int
removed_pos_deletes: int
added_eq_deletes: int
removed_eq_deletes: int

def __init__(self) -> None:
self.added_file_size = 0
self.removed_file_size = 0
self.added_data_files = 0
self.removed_data_files = 0
self.added_eq_delete_files = 0
self.removed_eq_delete_files = 0
self.added_pos_delete_files = 0
self.removed_pos_delete_files = 0
self.added_delete_files = 0
self.removed_delete_files = 0
self.added_records = 0
self.deleted_records = 0
self.added_pos_deletes = 0
self.removed_pos_deletes = 0
self.added_eq_deletes = 0
self.removed_eq_deletes = 0

def add_file(self, data_file: DataFile) -> None:
self.added_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.added_data_files += 1
self.added_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.added_delete_files += 1
self.added_pos_delete_files += 1
self.added_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.added_delete_files += 1
self.added_eq_delete_files += 1
self.added_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def remove_file(self, data_file: DataFile) -> None:
self.removed_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.removed_data_files += 1
self.deleted_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.removed_delete_files += 1
self.removed_pos_delete_files += 1
self.removed_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.removed_delete_files += 1
self.removed_eq_delete_files += 1
self.removed_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def build(self) -> Dict[str, str]:
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)

properties: Dict[str, str] = {}
set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE)
set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE)
set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES)
set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES)
set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES)
set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES)
set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES)
set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES)
set_when_positive(properties, self.added_records, ADDED_RECORDS)
set_when_positive(properties, self.deleted_records, DELETED_RECORDS)
set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES)
set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES)
set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES)
set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES)

return properties


def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary:
for prop in {
TOTAL_DATA_FILES,
TOTAL_DELETE_FILES,
TOTAL_RECORDS,
TOTAL_FILE_SIZE,
TOTAL_POSITION_DELETES,
TOTAL_EQUALITY_DELETES,
}:
summary[prop] = '0'

if value := previous_summary.get(TOTAL_DATA_FILES):
summary[DELETED_DATA_FILES] = value
if value := previous_summary.get(TOTAL_DELETE_FILES):
summary[REMOVED_DELETE_FILES] = value
if value := previous_summary.get(TOTAL_RECORDS):
summary[DELETED_RECORDS] = value
if value := previous_summary.get(TOTAL_FILE_SIZE):
summary[REMOVED_FILE_SIZE] = value
if value := previous_summary.get(TOTAL_POSITION_DELETES):
summary[REMOVED_POSITION_DELETES] = value
if value := previous_summary.get(TOTAL_EQUALITY_DELETES):
summary[REMOVED_EQUALITY_DELETES] = value

return summary


def _update_snapshot_summaries(
summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False
) -> Summary:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}:
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
summary = _truncate_table_summary(summary, previous_summary)

if not previous_summary:
previous_summary = {
TOTAL_DATA_FILES: '0',
TOTAL_DELETE_FILES: '0',
TOTAL_RECORDS: '0',
TOTAL_FILE_SIZE: '0',
TOTAL_POSITION_DELETES: '0',
TOTAL_EQUALITY_DELETES: '0',
}

def _update_totals(total_property: str, added_property: str, removed_property: str) -> None:
if previous_total_str := previous_summary.get(total_property):
try:
new_total = int(previous_total_str)
if new_total >= 0 and (added := summary.get(added_property)):
new_total += int(added)
if new_total >= 0 and (removed := summary.get(removed_property)):
new_total -= int(removed)
except ValueError as e:
raise ValueError(f"Could not parse summary property {total_property} to an int: {previous_total_str}") from e

if new_total >= 0:
summary[total_property] = str(new_total)

_update_totals(
total_property=TOTAL_DATA_FILES,
added_property=ADDED_DATA_FILES,
removed_property=DELETED_DATA_FILES,
)
_update_totals(
total_property=TOTAL_DELETE_FILES,
added_property=ADDED_DELETE_FILES,
removed_property=REMOVED_DELETE_FILES,
)
_update_totals(
total_property=TOTAL_RECORDS,
added_property=ADDED_RECORDS,
removed_property=DELETED_RECORDS,
)
_update_totals(
total_property=TOTAL_FILE_SIZE,
added_property=ADDED_FILE_SIZE,
removed_property=REMOVED_FILE_SIZE,
)
_update_totals(
total_property=TOTAL_POSITION_DELETES,
added_property=ADDED_POSITION_DELETES,
removed_property=REMOVED_POSITION_DELETES,
)
_update_totals(
total_property=TOTAL_EQUALITY_DELETES,
added_property=ADDED_EQUALITY_DELETES,
removed_property=REMOVED_EQUALITY_DELETES,
)

return summary
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ markers = [
]

# Turns a warning into an error
filterwarnings = [
"error"
]
#filterwarnings = [
# "error"
#]

[tool.black]
line-length = 130
Expand Down
Loading

0 comments on commit 0cbb71c

Please sign in to comment.