Skip to content

Commit

Permalink
fix snapshot inheritance
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Jun 3, 2024
1 parent cbb8cec commit bf63c03
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 24 deletions.
67 changes: 67 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,46 @@ class ManifestEntry(Record):
def __init__(self, *data: Any, **named_data: Any) -> None:
super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data})

def _wrap(
self,
new_status: ManifestEntryStatus,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
self.status = new_status
self.snapshot_id = new_snapshot_id
self.data_sequence_number = new_data_sequence_number
self.file_sequence_number = new_file_sequence_number
self.data_file = new_file
return self

def _wrap_append(self, new_snapshot_id: int, new_data_sequence_number: Optional[int], new_file: DataFile) -> ManifestEntry:
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)

def _wrap_delete(
self,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)

def _wrap_existing(
self,
new_snapshot_id: int,
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)


PARTITION_FIELD_SUMMARY_TYPE = StructType(
NestedField(509, "contains_null", BooleanType(), required=True),
Expand Down Expand Up @@ -654,6 +694,7 @@ class ManifestWriter(ABC):
_deleted_rows: int
_min_data_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry

def __init__(
self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int, meta: Dict[str, str] = EMPTY_DICT
Expand All @@ -673,6 +714,7 @@ def __init__(
self._deleted_rows = 0
self._min_data_sequence_number = None
self._partitions = []
self._reused_entry_wrapper = ManifestEntry()

def __enter__(self) -> ManifestWriter:
"""Open the writer."""
Expand Down Expand Up @@ -763,6 +805,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
self._writer.write_block([self.prepare_entry(entry)])
return self

def add(self, entry: ManifestEntry) -> ManifestWriter:
if entry.data_sequence_number is not None and entry.data_sequence_number >= 0:
self.add_entry(
self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file)
)
else:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
return self

def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_delete(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self

def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self


class ManifestWriterV1(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
Expand Down
40 changes: 16 additions & 24 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,12 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)

def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for appending a PyArrow table to a table transaction.
Shorthand API for appending a PyArrow table to a table transaction.
Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
try:
import pyarrow as pa
except ModuleNotFoundError as e:
Expand All @@ -461,11 +461,11 @@ def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df,
io=self._table.io
table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def overwrite(
self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
) -> None:
Expand Down Expand Up @@ -1392,12 +1392,12 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)

def merge_append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand API for appending a PyArrow table to the table.
Shorthand API for appending a PyArrow table to the table.
Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
Args:
df: The Arrow dataframe that will be appended to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
with self.transaction() as tx:
tx.merge_append(df=df, snapshot_properties=snapshot_properties)

Expand Down Expand Up @@ -3919,26 +3919,18 @@ def _group_by_spec(
def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile:
with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer:
for manifest in manifest_bin:
for entry in self._snapshot_producer.fetch_manifest_entry(manifest):
for entry in self._snapshot_producer.fetch_manifest_entry(manifest=manifest, discard_deleted=False):
if entry.status == ManifestEntryStatus.DELETED:
# suppress deletes from previous snapshots. only files deleted by this snapshot
# should be added to the new manifest
if entry.snapshot_id == self._snapshot_producer.snapshot_id:
writer.add_entry(entry)
writer.delete(entry)
elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id:
# adds from this snapshot are still adds, otherwise they should be existing
writer.add_entry(entry)
writer.add(entry)
else:
# add all files from the old manifest as existing files
writer.add_entry(
ManifestEntry(
status=ManifestEntryStatus.EXISTING,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
)
writer.existing(entry)

return writer.to_manifest_file()

Expand Down

0 comments on commit bf63c03

Please sign in to comment.