From d3ceb8aba9303f204fdb1b0166d95982a309edce Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Tue, 23 Apr 2024 18:01:38 +0200 Subject: [PATCH] test: test different rollover scenarios --- pyiceberg/manifest.py | 160 +++++++++++++-------------------- tests/utils/test_manifest.py | 166 ++++++----------------------------- 2 files changed, 89 insertions(+), 237 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 0e4ea97af0..caecc183b9 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -17,41 +17,30 @@ from __future__ import annotations import math -from abc import ABC -from abc import abstractmethod +from abc import ABC, abstractmethod from enum import Enum from types import TracebackType -from typing import Any, Generator -from typing import Callable -from typing import Dict -from typing import Iterator -from typing import List -from typing import Literal -from typing import Optional -from typing import Type - -from pyiceberg.avro.file import AvroFile -from pyiceberg.avro.file import AvroOutputFile +from typing import Any, Dict, Generator, Iterator, List, Literal, Optional, Type + +from pyiceberg.avro.file import AvroFile, AvroOutputFile from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ValidationError -from pyiceberg.io import FileIO -from pyiceberg.io import InputFile -from pyiceberg.io import OutputFile +from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.typedef import EMPTY_DICT -from pyiceberg.typedef import Record -from pyiceberg.typedef import TableVersion -from pyiceberg.types import BinaryType -from pyiceberg.types import BooleanType -from pyiceberg.types import IntegerType -from pyiceberg.types import ListType -from pyiceberg.types import LongType -from pyiceberg.types import MapType -from pyiceberg.types import NestedField -from pyiceberg.types import PrimitiveType -from pyiceberg.types import StringType -from pyiceberg.types import StructType +from pyiceberg.typedef import EMPTY_DICT, Record, TableVersion +from pyiceberg.types import ( + BinaryType, + BooleanType, + IntegerType, + ListType, + LongType, + MapType, + NestedField, + PrimitiveType, + StringType, + StructType, +) UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 @@ -101,9 +90,7 @@ def __repr__(self) -> str: DATA_FILE_TYPE: Dict[int, StructType] = { 1: StructType( - NestedField( - field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" - ), + NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), NestedField( field_id=101, name="file_format", @@ -118,9 +105,7 @@ def __repr__(self) -> str: required=True, doc="Partition data tuple, schema based on the partition spec", ), - NestedField( - field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file" - ), + NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( field_id=104, name="file_size_in_bytes", @@ -203,9 +188,7 @@ def __repr__(self) -> str: doc="File format name: avro, orc, or parquet", initial_default=DataFileContent.DATA, ), - NestedField( - field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" - ), + NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), NestedField( field_id=101, name="file_format", @@ -220,9 +203,7 @@ def __repr__(self) -> str: required=True, doc="Partition data tuple, schema based on the partition spec", ), - NestedField( - field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file" - ), + NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( field_id=104, name="file_size_in_bytes", @@ -305,34 +286,30 @@ def __repr__(self) -> str: def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType: - data_file_partition_type = StructType( - *[ - NestedField( - field_id=field.field_id, - name=field.name, - field_type=field.field_type, - required=field.required, - ) - for field in partition_type.fields - ] - ) + data_file_partition_type = StructType(*[ + NestedField( + field_id=field.field_id, + name=field.name, + field_type=field.field_type, + required=field.required, + ) + for field in partition_type.fields + ]) - return StructType( - *[ - ( - NestedField( - field_id=102, - name="partition", - field_type=data_file_partition_type, - required=True, - doc="Partition data tuple, schema based on the partition spec", - ) - if field.field_id == 102 - else field + return StructType(*[ + ( + NestedField( + field_id=102, + name="partition", + field_type=data_file_partition_type, + required=True, + doc="Partition data tuple, schema based on the partition spec", ) - for field in DATA_FILE_TYPE[format_version].fields - ] - ) + if field.field_id == 102 + else field + ) + for field in DATA_FILE_TYPE[format_version].fields + ]) class DataFile(Record): @@ -413,18 +390,14 @@ def __eq__(self, other: Any) -> bool: ), } -MANIFEST_ENTRY_SCHEMAS_STRUCT = { - format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items() -} +MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()} def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema: - return Schema( - *[ - NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field - for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields - ] - ) + return Schema(*[ + NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field + for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields + ]) class ManifestEntry(Record): @@ -494,9 +467,7 @@ def update(self, value: Any) -> None: self._min = min(self._min, value) -def construct_partition_summaries( - spec: PartitionSpec, schema: Schema, partitions: List[Record] -) -> List[PartitionFieldSummary]: +def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]: types = [field.field_type for field in spec.partition_type(schema).fields] field_stats = [PartitionFieldStats(field_type) for field_type in types] for partition_keys in partitions: @@ -520,9 +491,7 @@ def construct_partition_summaries( NestedField(512, "added_rows_count", LongType(), required=False), NestedField(513, "existing_rows_count", LongType(), required=False), NestedField(514, "deleted_rows_count", LongType(), required=False), - NestedField( - 507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False - ), + NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), NestedField(519, "key_metadata", BinaryType(), required=False), ), 2: Schema( @@ -539,16 +508,12 @@ def construct_partition_summaries( NestedField(512, "added_rows_count", LongType(), required=True), NestedField(513, "existing_rows_count", LongType(), required=True), NestedField(514, "deleted_rows_count", LongType(), required=True), - NestedField( - 507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False - ), + NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), NestedField(519, "key_metadata", BinaryType(), required=False), ), } -MANIFEST_LIST_FILE_STRUCTS = { - format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items() -} +MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} POSITIONAL_DELETE_SCHEMA = Schema( @@ -667,16 +632,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani # in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the data sequence number should be inherited iff the entry status is ADDED - if entry.data_sequence_number is None and ( - manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED - ): + if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): entry.data_sequence_number = manifest.sequence_number # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED - if entry.file_sequence_number is None and ( - manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED - ): + if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): # Only available in V2, always 0 in V1 entry.file_sequence_number = manifest.sequence_number @@ -827,7 +788,7 @@ class RollingManifestWriter: _current_file_rows: int def __init__( - self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes, target_number_of_rows + self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes: int, target_number_of_rows: int ) -> None: self._closed = False self._manifest_files = [] @@ -838,6 +799,7 @@ def __init__( self._current_file_rows = 0 def __enter__(self) -> RollingManifestWriter: + """Open the writer.""" self._get_current_writer().__enter__() return self @@ -847,6 +809,7 @@ def __exit__( exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: + """Close the writer.""" self.closed = True if self._current_writer: self._current_writer.__exit__(exc_type, exc_value, traceback) @@ -869,7 +832,7 @@ def _should_roll_to_new_file(self) -> bool: or len(self._current_writer._output_file) >= self._target_file_size_in_bytes ) - def _close_current_writer(self): + def _close_current_writer(self) -> None: if self._current_writer: self._current_writer.__exit__(None, None, None) current_file = self._current_writer.to_manifest_file() @@ -887,6 +850,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: raise RuntimeError("Cannot add entry to closed manifest writer") self._get_current_writer().add_entry(entry) self._current_file_rows += entry.data_file.record_count + return self @@ -1025,9 +989,7 @@ class ManifestListWriterV2(ManifestListWriter): _commit_snapshot_id: int _sequence_number: int - def __init__( - self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int - ): + def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int): super().__init__( format_version=2, output_file=output_file, diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 756f9e0510..8f827f9048 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,7 +16,7 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory -from typing import Dict +from typing import Dict, Generator import fastavro import pytest @@ -30,6 +30,7 @@ ManifestContent, ManifestEntryStatus, ManifestFile, + ManifestWriter, PartitionFieldSummary, RollingManifestWriter, read_manifest_list, @@ -478,8 +479,22 @@ def test_write_manifest( @pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize( + "target_number_of_rows,target_file_size_in_bytes,expected_number_of_files", + [ + (19514, 388873, 1), # should not roll over + (19513, 388873, 2), # should roll over due to target_rows + (19514, 388872, 2), # should roll over due target_bytes + (19513, 388872, 2), # should roll over due to target_rows and target_bytes + ], +) def test_rolling_manifest_writer( - generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion + generated_manifest_file_file_v1: str, + generated_manifest_file_file_v2: str, + format_version: TableVersion, + target_number_of_rows: int, + target_file_size_in_bytes: int, + expected_number_of_files: int, ) -> None: io = load_file_io() snapshot = Snapshot( @@ -501,13 +516,10 @@ def test_rolling_manifest_writer( spec_id=demo_manifest_file.partition_spec_id, ) with TemporaryDirectory() as tmpdir: - tmp_avro_file = tmpdir + "/test_write_manifest.avro" - tmp_avro_file = tmpdir + "/test_write_manifest-1.avro" - output = io.new_output(tmp_avro_file) - def supplier(): + + def supplier() -> Generator[ManifestWriter, None, None]: i = 0 while True: - i += 1 tmp_avro_file = tmpdir + f"/test_write_manifest-{i}.avro" output = io.new_output(tmp_avro_file) yield write_manifest( @@ -517,143 +529,21 @@ def supplier(): output_file=output, snapshot_id=8744736658442914487, ) - with RollingManifestWriter(supplier=supplier(), target_file_size_in_bytes=388872 + 1, target_number_of_rows=20000) as writer: + i += 1 + + with RollingManifestWriter( + supplier=supplier(), + target_file_size_in_bytes=target_file_size_in_bytes, + target_number_of_rows=target_number_of_rows, + ) as writer: for entry in manifest_entries: writer.add_entry(entry) - new_manifest = writer.to_manifest_files()[0] + manifest_files = writer.to_manifest_files() + assert len(manifest_files) == expected_number_of_files with pytest.raises(RuntimeError): # It is already closed writer.add_entry(manifest_entries[0]) - expected_metadata = { - "schema": test_schema.model_dump_json(), - "partition-spec": test_spec.model_dump_json(), - "partition-spec-id": str(test_spec.spec_id), - "format-version": str(format_version), - } - _verify_metadata_with_fastavro( - tmp_avro_file, - expected_metadata, - ) - new_manifest_entries = new_manifest.fetch_manifest_entry(io) - - manifest_entry = new_manifest_entries[0] - - assert manifest_entry.status == ManifestEntryStatus.ADDED - assert manifest_entry.snapshot_id == 8744736658442914487 - assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3 - assert isinstance(manifest_entry.data_file, DataFile) - - data_file = manifest_entry.data_file - - assert data_file.content is DataFileContent.DATA - assert ( - data_file.file_path - == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" - ) - assert data_file.file_format == FileFormat.PARQUET - assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925) - assert data_file.record_count == 19513 - assert data_file.file_size_in_bytes == 388872 - assert data_file.column_sizes == { - 1: 53, - 2: 98153, - 3: 98693, - 4: 53, - 5: 53, - 6: 53, - 7: 17425, - 8: 18528, - 9: 53, - 10: 44788, - 11: 35571, - 12: 53, - 13: 1243, - 14: 2355, - 15: 12750, - 16: 4029, - 17: 110, - 18: 47194, - 19: 2948, - } - assert data_file.value_counts == { - 1: 19513, - 2: 19513, - 3: 19513, - 4: 19513, - 5: 19513, - 6: 19513, - 7: 19513, - 8: 19513, - 9: 19513, - 10: 19513, - 11: 19513, - 12: 19513, - 13: 19513, - 14: 19513, - 15: 19513, - 16: 19513, - 17: 19513, - 18: 19513, - 19: 19513, - } - assert data_file.null_value_counts == { - 1: 19513, - 2: 0, - 3: 0, - 4: 19513, - 5: 19513, - 6: 19513, - 7: 0, - 8: 0, - 9: 19513, - 10: 0, - 11: 0, - 12: 19513, - 13: 0, - 14: 0, - 15: 0, - 16: 0, - 17: 0, - 18: 0, - 19: 0, - } - assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0} - assert data_file.lower_bounds == { - 2: b"2020-04-01 00:00", - 3: b"2020-04-01 00:12", - 7: b"\x03\x00\x00\x00", - 8: b"\x01\x00\x00\x00", - 10: b"\xf6(\\\x8f\xc2\x05S\xc0", - 11: b"\x00\x00\x00\x00\x00\x00\x00\x00", - 13: b"\x00\x00\x00\x00\x00\x00\x00\x00", - 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf", - 15: b")\\\x8f\xc2\xf5(\x08\xc0", - 16: b"\x00\x00\x00\x00\x00\x00\x00\x00", - 17: b"\x00\x00\x00\x00\x00\x00\x00\x00", - 18: b"\xf6(\\\x8f\xc2\xc5S\xc0", - 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0", - } - assert data_file.upper_bounds == { - 2: b"2020-04-30 23:5:", - 3: b"2020-05-01 00:41", - 7: b"\t\x01\x00\x00", - 8: b"\t\x01\x00\x00", - 10: b"\xcd\xcc\xcc\xcc\xcc,_@", - 11: b"\x1f\x85\xebQ\\\xe2\xfe@", - 13: b"\x00\x00\x00\x00\x00\x00\x12@", - 14: b"\x00\x00\x00\x00\x00\x00\xe0?", - 15: b"q=\n\xd7\xa3\xf01@", - 16: b"\x00\x00\x00\x00\x00`B@", - 17: b"333333\xd3?", - 18: b"\x00\x00\x00\x00\x00\x18b@", - 19: b"\x00\x00\x00\x00\x00\x00\x04@", - } - assert data_file.key_metadata is None - assert data_file.split_offsets == [4] - assert data_file.equality_ids is None - assert data_file.sort_order_id == 0 - @pytest.mark.parametrize("format_version", [1, 2]) def test_write_manifest_list(