Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support merge manifests on writes (MergeAppend) #363

Merged
merged 22 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ Iceberg tables support table properties to configure table behavior.
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.parquet.row-group-limit` | Number of rows | 122880 | The Parquet row group limit |

## Table behavior options

| Key | Options | Default | Description |
| ------------------------------------ | ------------------- | ------------- | ----------------------------------------------------------- |
| `commit.manifest.target-size-bytes` | Size in bytes | 8388608 (8MB) | Target size when merging manifest files |
| `commit.manifest.min-count-to-merge` | Number of manifests | 100 | Target size when merging manifest files |
| `commit.manifest-merge.enabled` | Boolean | False | Controls whether to automatically merge manifests on writes |

<!-- prettier-ignore-start -->

!!! note "Fast append"
Unlike Java implementation, PyIceberg default to the [fast append](api.md#write-support) and thus `commit.manifest-merge.enabled` is set to `False` by default.

<!-- prettier-ignore-end -->

# FileIO

Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. By default, PyIceberg will try to initialize the FileIO that's suitable for the scheme (`s3://`, `gs://`, etc.) and will use the first one that's installed.
Expand Down
69 changes: 69 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,48 @@ class ManifestEntry(Record):
def __init__(self, *data: Any, **named_data: Any) -> None:
super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMAS_STRUCT[DEFAULT_READ_VERSION], **named_data})

def _wrap(
self,
new_status: ManifestEntryStatus,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
self.status = new_status
self.snapshot_id = new_snapshot_id
self.data_sequence_number = new_data_sequence_number
self.file_sequence_number = new_file_sequence_number
self.data_file = new_file
return self

def _wrap_append(
self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile
) -> ManifestEntry:
return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file)

def _wrap_delete(
self,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.DELETED, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)

def _wrap_existing(
self,
new_snapshot_id: Optional[int],
new_data_sequence_number: Optional[int],
new_file_sequence_number: Optional[int],
new_file: DataFile,
) -> ManifestEntry:
return self._wrap(
ManifestEntryStatus.EXISTING, new_snapshot_id, new_data_sequence_number, new_file_sequence_number, new_file
)


PARTITION_FIELD_SUMMARY_TYPE = StructType(
NestedField(509, "contains_null", BooleanType(), required=True),
Expand Down Expand Up @@ -655,6 +697,7 @@ class ManifestWriter(ABC):
_deleted_rows: int
_min_data_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry

def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
self.closed = False
Expand All @@ -671,6 +714,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
self._deleted_rows = 0
self._min_data_sequence_number = None
self._partitions = []
self._reused_entry_wrapper = ManifestEntry()

def __enter__(self) -> ManifestWriter:
"""Open the writer."""
Expand Down Expand Up @@ -776,6 +820,31 @@ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
self._writer.write_block([self.prepare_entry(entry)])
return self

def add(self, entry: ManifestEntry) -> ManifestWriter:
if entry.data_sequence_number is not None and entry.data_sequence_number >= 0:
self.add_entry(
self._reused_entry_wrapper._wrap_append(self._snapshot_id, entry.data_sequence_number, entry.data_file)
)
else:
self.add_entry(self._reused_entry_wrapper._wrap_append(self._snapshot_id, None, entry.data_file))
return self

def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_delete(
self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self

def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
self._reused_entry_wrapper._wrap_existing(
entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file
)
)
return self


class ManifestWriterV1(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
Expand Down
Loading