Skip to content

Commit

Permalink
add merge append
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Jun 3, 2024
1 parent 6803eba commit f0fc260
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 2 deletions.
208 changes: 206 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
# under the License.
from __future__ import annotations

import concurrent
import itertools
import uuid
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from concurrent.futures import Future
from copy import copy
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -62,7 +65,7 @@
inclusive_projection,
manifest_evaluator,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.io import FileIO, OutputFile, load_file_io
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
Expand All @@ -71,6 +74,7 @@
ManifestEntry,
ManifestEntryStatus,
ManifestFile,
ManifestWriter,
PartitionFieldSummary,
write_manifest,
write_manifest_list,
Expand Down Expand Up @@ -136,6 +140,7 @@
StructType,
transform_dict_value_to_str,
)
from pyiceberg.utils.bin_packing import ListPacker
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.datetime import datetime_to_millis
from pyiceberg.utils.singleton import _convert_to_hashable_type
Expand Down Expand Up @@ -240,6 +245,15 @@ class TableProperties:
FORMAT_VERSION = "format-version"
DEFAULT_FORMAT_VERSION = 2

MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"
MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB

MANIFEST_MIN_MERGE_COUNT = "commit.manifest.min-count-to-merge"
MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100

MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled"
MANIFEST_MERGE_ENABLED_DEFAULT = True


class PropertyUtil:
@staticmethod
Expand Down Expand Up @@ -2751,10 +2765,12 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List

class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
commit_uuid: uuid.UUID
_io: FileIO
_operation: Operation
_snapshot_id: int
_parent_snapshot_id: Optional[int]
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]

def __init__(
self,
Expand All @@ -2775,6 +2791,7 @@ def __init__(
)
self._added_data_files = []
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)

def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer:
self._added_data_files.append(data_file)
Expand All @@ -2786,6 +2803,10 @@ def _deleted_entries(self) -> List[ManifestEntry]: ...
@abstractmethod
def _existing_manifests(self) -> List[ManifestFile]: ...

def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
"""To perform any post-processing on the manifests before writing them to the new snapshot."""
return manifests

def _manifests(self) -> List[ManifestFile]:
def _write_added_manifest() -> List[ManifestFile]:
if self._added_data_files:
Expand Down Expand Up @@ -2840,7 +2861,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
delete_manifests = executor.submit(_write_delete_manifest)
existing_manifests = executor.submit(self._existing_manifests)

return added_manifests.result() + delete_manifests.result() + existing_manifests.result()
return self._process_manifests(added_manifests.result() + delete_manifests.result() + existing_manifests.result())

def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
ssc = SnapshotSummaryCollector()
Expand Down Expand Up @@ -2913,6 +2934,34 @@ def _commit(self) -> UpdatesAndRequirements:
),
)

@property
def snapshot_id(self) -> int:
return self._snapshot_id

def spec(self, spec_id: int) -> PartitionSpec:
return self._transaction.table_metadata.specs()[spec_id]

def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
return write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=spec,
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
)

def new_manifest_output(self) -> OutputFile:
return self._io.new_output(
_new_manifest_path(
location=self._transaction.table_metadata.location,
num=next(self._manifest_num_counter),
commit_uuid=self.commit_uuid,
)
)

def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)


class FastAppendFiles(_MergingSnapshotProducer):
def _existing_manifests(self) -> List[ManifestFile]:
Expand Down Expand Up @@ -2943,6 +2992,61 @@ def _deleted_entries(self) -> List[ManifestEntry]:
return []


class MergeAppendFiles(FastAppendFiles):
_target_size_bytes: int
_min_count_to_merge: int
_merge_enabled: bool

def __init__(
self,
operation: Operation,
transaction: Transaction,
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
) -> None:
super().__init__(operation, transaction, io, commit_uuid)
self._target_size_bytes = PropertyUtil.property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
) # type: ignore
self._min_count_to_merge = PropertyUtil.property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_MIN_MERGE_COUNT,
TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT,
) # type: ignore
self._merge_enabled = PropertyUtil.property_as_bool(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)

def _deleted_entries(self) -> List[ManifestEntry]:
"""To determine if we need to record any deleted manifest entries.
In case of an append, nothing is deleted.
"""
return []

def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
"""To perform any post-processing on the manifests before writing them to the new snapshot.
In MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
if automatic merge is enabled.
"""
unmerged_data_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DATA]
unmerged_deletes_manifests = [manifest for manifest in manifests if manifest.content == ManifestContent.DELETES]

data_manifest_merge_manager = _ManifestMergeManager(
target_size_bytes=self._target_size_bytes,
min_count_to_merge=self._min_count_to_merge,
merge_enabled=self._merge_enabled,
snapshot_producer=self,
)

return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests


class OverwriteFiles(_MergingSnapshotProducer):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
Expand Down Expand Up @@ -3001,6 +3105,9 @@ def fast_append(self) -> FastAppendFiles:
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)

def merge_append(self) -> MergeAppendFiles:
return MergeAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io)

def overwrite(self) -> OverwriteFiles:
return OverwriteFiles(
operation=Operation.OVERWRITE
Expand Down Expand Up @@ -3735,3 +3842,100 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions)

return table_partitions


class _ManifestMergeManager:
_target_size_bytes: int
_min_count_to_merge: int
_merge_enabled: bool
_snapshot_producer: _MergingSnapshotProducer

def __init__(
self, target_size_bytes: int, min_count_to_merge: int, merge_enabled: bool, snapshot_producer: _MergingSnapshotProducer
) -> None:
self._target_size_bytes = target_size_bytes
self._min_count_to_merge = min_count_to_merge
self._merge_enabled = merge_enabled
self._snapshot_producer = snapshot_producer

def _group_by_spec(
self, first_manifest: ManifestFile, remaining_manifests: List[ManifestFile]
) -> Dict[int, List[ManifestFile]]:
groups = defaultdict(list)
groups[first_manifest.partition_spec_id].append(first_manifest)
for manifest in remaining_manifests:
groups[manifest.partition_spec_id].append(manifest)
return groups

def _create_manifest(self, spec_id: int, manifest_bin: List[ManifestFile]) -> ManifestFile:
with self._snapshot_producer.new_manifest_writer(spec=self._snapshot_producer.spec(spec_id)) as writer:
for manifest in manifest_bin:
for entry in self._snapshot_producer.fetch_manifest_entry(manifest):
if entry.status == ManifestEntryStatus.DELETED:
# suppress deletes from previous snapshots. only files deleted by this snapshot
# should be added to the new manifest
if entry.snapshot_id == self._snapshot_producer.snapshot_id:
writer.add_entry(entry)
elif entry.status == ManifestEntryStatus.ADDED and entry.snapshot_id == self._snapshot_producer.snapshot_id:
# adds from this snapshot are still adds, otherwise they should be existing
writer.add_entry(entry)
else:
# add all files from the old manifest as existing files
writer.add_entry(
ManifestEntry(
status=ManifestEntryStatus.EXISTING,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
)

return writer.to_manifest_file()

def _merge_group(self, first_manifest: ManifestFile, spec_id: int, manifests: List[ManifestFile]) -> List[ManifestFile]:
packer: ListPacker[ManifestFile] = ListPacker(target_weight=self._target_size_bytes, lookback=1, largest_bin_first=False)
bins: List[List[ManifestFile]] = packer.pack_end(manifests, lambda m: m.manifest_length)

def merge_bin(manifest_bin: List[ManifestFile]) -> List[ManifestFile]:
output_manifests = []
if len(manifest_bin) == 1:
output_manifests.append(manifest_bin[0])
elif first_manifest in manifest_bin and len(manifest_bin) < self._min_count_to_merge:
# if the bin has the first manifest (the new data files or an appended manifest file)
# then only merge it
# if the number of manifests is above the minimum count. this is applied only to bins
# with an in-memory
# manifest so that large manifests don't prevent merging older groups.
output_manifests.extend(manifest_bin)
else:
output_manifests.append(self._create_manifest(spec_id, manifest_bin))

return output_manifests

executor = ExecutorFactory.get_or_create()
futures = [executor.submit(merge_bin, b) for b in bins]

# for consistent ordering, we need to maintain future order
futures_index = {f: i for i, f in enumerate(futures)}
completed_futures: SortedList[Future[List[ManifestFile]]] = SortedList(iterable=[], key=lambda f: futures_index[f])
for future in concurrent.futures.as_completed(futures):
completed_futures.add(future)

bin_results: List[List[ManifestFile]] = [f.result() for f in completed_futures if f.result()]

return [manifest for bin_result in bin_results for manifest in bin_result]

def merge_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile]:
if not self._merge_enabled or len(manifests) == 0:
return manifests

first_manifest = manifests[0]
remaining_manifests = manifests[1:]
groups = self._group_by_spec(first_manifest, remaining_manifests)

merged_manifests = []
for spec_id in reversed(groups.keys()):
merged_manifests.extend(self._merge_group(first_manifest, spec_id, groups[spec_id]))

return merged_manifests
49 changes: 49 additions & 0 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,3 +871,52 @@ def table_write_subset_of_schema(session_catalog: Catalog, arrow_table_with_null
tbl.append(arrow_table_without_some_columns)
# overwrite and then append should produce twice the data
assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_merge_manifest_min_count_to_merge(
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
) -> None:
tbl_a = _create_table(
session_catalog,
"default.merge_manifest_a",
{"commit.manifest.min-count-to-merge": "1", "format-version": format_version},
[],
)
tbl_b = _create_table(
session_catalog,
"default.merge_manifest_b",
{"commit.manifest.min-count-to-merge": "1", "commit.manifest.target-size-bytes": "1", "format-version": format_version},
[],
)
tbl_c = _create_table(
session_catalog,
"default.merge_manifest_c",
{"commit.manifest-merge.enabled": "false", "format-version": format_version},
[],
)

# tbl_a should merge all manifests into 1
tbl_a.append(arrow_table_with_null)
tbl_a.append(arrow_table_with_null)
tbl_a.append(arrow_table_with_null)

# tbl_b should not merge any manifests because the target size is too small
tbl_b.append(arrow_table_with_null)
tbl_b.append(arrow_table_with_null)
tbl_b.append(arrow_table_with_null)

# tbl_c should not merge any manifests because merging is disabled
tbl_c.append(arrow_table_with_null)
tbl_c.append(arrow_table_with_null)
tbl_c.append(arrow_table_with_null)

assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore
assert len(tbl_b.current_snapshot().manifests(tbl_b.io)) == 3 # type: ignore
assert len(tbl_c.current_snapshot().manifests(tbl_c.io)) == 3 # type: ignore

# tbl_a and tbl_c should contain the same data
assert tbl_a.scan().to_arrow().equals(tbl_c.scan().to_arrow())
# tbl_b and tbl_c should contain the same data
assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow())

0 comments on commit f0fc260

Please sign in to comment.