Skip to content

Commit

Permalink
cache manifests
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmay-bhat committed Jun 3, 2024
1 parent 18448fd commit 173ddb9
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
manifest_lists_to_delete = set()
manifests_to_delete: List[ManifestFile] = []
for snapshot in metadata.snapshots:
manifests_to_delete += snapshot.manifests(io)
manifests_to_delete += snapshot.manifests(io, snapshot.manifest_list)
if snapshot.manifest_list is not None:
manifest_lists_to_delete.add(snapshot.manifest_list)

Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def files(self, table: Table, history: bool) -> None:
manifest_list_str = f": {snapshot.manifest_list}" if snapshot.manifest_list else ""
list_tree = snapshot_tree.add(f"Snapshot {snapshot.snapshot_id}, schema {snapshot.schema_id}{manifest_list_str}")

manifest_list = snapshot.manifests(io)
manifest_list = snapshot.manifests(io, manifest_list_str)
for manifest in manifest_list:
manifest_tree = list_tree.add(f"Manifest: {manifest.manifest_path}")
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=False):
Expand Down
12 changes: 6 additions & 6 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,7 @@ def plan_files(self) -> Iterable[FileScanTask]:

manifests = [
manifest_file
for manifest_file in snapshot.manifests(self.io)
for manifest_file in snapshot.manifests(self.io, snapshot.manifest_list)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

Expand Down Expand Up @@ -2941,7 +2941,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
if previous_snapshot is None:
raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}")

for manifest in previous_snapshot.manifests(io=self._io):
for manifest in previous_snapshot.manifests(io=self._io, manifest_list=previous_snapshot.manifest_list):
if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id:
existing_manifests.append(manifest)

Expand Down Expand Up @@ -2992,7 +2992,7 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
if entry.data_file.content == DataFileContent.DATA
]

list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io))
list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._io, previous_snapshot.manifest_list))
return list(chain(*list_of_entries))
else:
return []
Expand Down Expand Up @@ -3384,7 +3384,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:

entries = []
snapshot = self._get_snapshot(snapshot_id)
for manifest in snapshot.manifests(self.tbl.io):
for manifest in snapshot.manifests(self.tbl.io, snapshot.manifest_list):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
column_sizes = entry.data_file.column_sizes or {}
value_counts = entry.data_file.value_counts or {}
Expand Down Expand Up @@ -3546,7 +3546,7 @@ def update_partitions_map(

partitions_map: Dict[Tuple[str, Any], Any] = {}
snapshot = self._get_snapshot(snapshot_id)
for manifest in snapshot.manifests(self.tbl.io):
for manifest in snapshot.manifests(self.tbl.io, snapshot.manifest_list):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
partition = entry.data_file.partition
partition_record_dict = {
Expand Down Expand Up @@ -3624,7 +3624,7 @@ def _partition_summaries_to_rows(
specs = self.tbl.metadata.specs()
manifests = []
if snapshot := self.tbl.metadata.current_snapshot():
for manifest in snapshot.manifests(self.tbl.io):
for manifest in snapshot.manifests(self.tbl.io, snapshot.manifest_list):
is_data_file = manifest.content == ManifestContent.DATA
is_delete_file = manifest.content == ManifestContent.DELETES
manifests.append({
Expand Down
10 changes: 7 additions & 3 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import time
from collections import defaultdict
from enum import Enum
from functools import lru_cache
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional

from pydantic import Field, PrivateAttr, model_serializer
Expand Down Expand Up @@ -247,9 +248,12 @@ def __str__(self) -> str:
result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}"
return result_str

def manifests(self, io: FileIO) -> List[ManifestFile]:
if self.manifest_list is not None:
file = io.new_input(self.manifest_list)
@staticmethod
@lru_cache
def manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]:
"""Return the manifests for the given snapshot."""
if manifest_list not in (None, ""):
file = io.new_input(manifest_list)
return list(read_manifest_list(file))
return []

Expand Down
8 changes: 6 additions & 2 deletions tests/integration/test_partitioning_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,14 @@ def test_partition_key(
snapshot = iceberg_table.current_snapshot()
assert snapshot
spark_partition_for_justification = (
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.partition
snapshot.manifests(iceberg_table.io, snapshot.manifest_list)[0]
.fetch_manifest_entry(iceberg_table.io)[0]
.data_file.partition
)
spark_path_for_justification = (
snapshot.manifests(iceberg_table.io)[0].fetch_manifest_entry(iceberg_table.io)[0].data_file.file_path
snapshot.manifests(iceberg_table.io, snapshot.manifest_list)[0]
.fetch_manifest_entry(iceberg_table.io)[0]
.data_file.file_path
)
assert spark_partition_for_justification == expected_partition_record
assert expected_hive_partition_path_slice in spark_path_for_justification
2 changes: 1 addition & 1 deletion tests/integration/test_rest_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None:
if test_snapshot is None:
raise ValueError("Table has no current snapshot, check the docker environment")
io = table_test_all_types.io
test_manifest_file = test_snapshot.manifests(io)[0]
test_manifest_file = test_snapshot.manifests(io, test_snapshot.manifest_list)[0]
test_manifest_entries = test_manifest_file.fetch_manifest_entry(io)
entry = test_manifest_entries[0]
test_schema = table_test_all_types.schema()
Expand Down
8 changes: 4 additions & 4 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def test_read_manifest_v1(generated_manifest_file_file_v1: str) -> None:
summary=Summary(Operation.APPEND),
schema_id=3,
)
manifest_list = snapshot.manifests(io)[0]
manifest_list = snapshot.manifests(io, snapshot.manifest_list)[0]

assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
Expand Down Expand Up @@ -267,7 +267,7 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
summary=Summary(Operation.APPEND),
schema_id=3,
)
manifest_list = snapshot.manifests(io)[0]
manifest_list = snapshot.manifests(io, manifest_list=snapshot.manifest_list)[0]

assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
Expand Down Expand Up @@ -319,7 +319,7 @@ def test_write_manifest(
summary=Summary(Operation.APPEND),
schema_id=3,
)
demo_manifest_file = snapshot.manifests(io)[0]
demo_manifest_file = snapshot.manifests(io, snapshot.manifest_list)[0]
manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
test_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False)
Expand Down Expand Up @@ -491,7 +491,7 @@ def test_write_manifest_list(
schema_id=3,
)

demo_manifest_list = snapshot.manifests(io)
demo_manifest_list = snapshot.manifests(io, snapshot.manifest_list)
with TemporaryDirectory() as tmp_dir:
path = tmp_dir + "/manifest-list.avro"
output = io.new_output(path)
Expand Down

0 comments on commit 173ddb9

Please sign in to comment.