Skip to content

Commit

Permalink
test: test different rollover scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
felixscherz committed May 4, 2024
1 parent d5204ea commit d3ceb8a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 237 deletions.
160 changes: 61 additions & 99 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,30 @@
from __future__ import annotations

import math
from abc import ABC
from abc import abstractmethod
from abc import ABC, abstractmethod
from enum import Enum
from types import TracebackType
from typing import Any, Generator
from typing import Callable
from typing import Dict
from typing import Iterator
from typing import List
from typing import Literal
from typing import Optional
from typing import Type

from pyiceberg.avro.file import AvroFile
from pyiceberg.avro.file import AvroOutputFile
from typing import Any, Dict, Generator, Iterator, List, Literal, Optional, Type

from pyiceberg.avro.file import AvroFile, AvroOutputFile
from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ValidationError
from pyiceberg.io import FileIO
from pyiceberg.io import InputFile
from pyiceberg.io import OutputFile
from pyiceberg.io import FileIO, InputFile, OutputFile
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import EMPTY_DICT
from pyiceberg.typedef import Record
from pyiceberg.typedef import TableVersion
from pyiceberg.types import BinaryType
from pyiceberg.types import BooleanType
from pyiceberg.types import IntegerType
from pyiceberg.types import ListType
from pyiceberg.types import LongType
from pyiceberg.types import MapType
from pyiceberg.types import NestedField
from pyiceberg.types import PrimitiveType
from pyiceberg.types import StringType
from pyiceberg.types import StructType
from pyiceberg.typedef import EMPTY_DICT, Record, TableVersion
from pyiceberg.types import (
BinaryType,
BooleanType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
)

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
Expand Down Expand Up @@ -101,9 +90,7 @@ def __repr__(self) -> str:

DATA_FILE_TYPE: Dict[int, StructType] = {
1: StructType(
NestedField(
field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"
),
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(
field_id=101,
name="file_format",
Expand All @@ -118,9 +105,7 @@ def __repr__(self) -> str:
required=True,
doc="Partition data tuple, schema based on the partition spec",
),
NestedField(
field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"
),
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
NestedField(
field_id=104,
name="file_size_in_bytes",
Expand Down Expand Up @@ -203,9 +188,7 @@ def __repr__(self) -> str:
doc="File format name: avro, orc, or parquet",
initial_default=DataFileContent.DATA,
),
NestedField(
field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"
),
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(
field_id=101,
name="file_format",
Expand All @@ -220,9 +203,7 @@ def __repr__(self) -> str:
required=True,
doc="Partition data tuple, schema based on the partition spec",
),
NestedField(
field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"
),
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
NestedField(
field_id=104,
name="file_size_in_bytes",
Expand Down Expand Up @@ -305,34 +286,30 @@ def __repr__(self) -> str:


def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:
data_file_partition_type = StructType(
*[
NestedField(
field_id=field.field_id,
name=field.name,
field_type=field.field_type,
required=field.required,
)
for field in partition_type.fields
]
)
data_file_partition_type = StructType(*[
NestedField(
field_id=field.field_id,
name=field.name,
field_type=field.field_type,
required=field.required,
)
for field in partition_type.fields
])

return StructType(
*[
(
NestedField(
field_id=102,
name="partition",
field_type=data_file_partition_type,
required=True,
doc="Partition data tuple, schema based on the partition spec",
)
if field.field_id == 102
else field
return StructType(*[
(
NestedField(
field_id=102,
name="partition",
field_type=data_file_partition_type,
required=True,
doc="Partition data tuple, schema based on the partition spec",
)
for field in DATA_FILE_TYPE[format_version].fields
]
)
if field.field_id == 102
else field
)
for field in DATA_FILE_TYPE[format_version].fields
])


class DataFile(Record):
Expand Down Expand Up @@ -413,18 +390,14 @@ def __eq__(self, other: Any) -> bool:
),
}

MANIFEST_ENTRY_SCHEMAS_STRUCT = {
format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()
}
MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()}


def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema:
return Schema(
*[
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
]
)
return Schema(*[
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
])


class ManifestEntry(Record):
Expand Down Expand Up @@ -494,9 +467,7 @@ def update(self, value: Any) -> None:
self._min = min(self._min, value)


def construct_partition_summaries(
spec: PartitionSpec, schema: Schema, partitions: List[Record]
) -> List[PartitionFieldSummary]:
def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]:
types = [field.field_type for field in spec.partition_type(schema).fields]
field_stats = [PartitionFieldStats(field_type) for field_type in types]
for partition_keys in partitions:
Expand All @@ -520,9 +491,7 @@ def construct_partition_summaries(
NestedField(512, "added_rows_count", LongType(), required=False),
NestedField(513, "existing_rows_count", LongType(), required=False),
NestedField(514, "deleted_rows_count", LongType(), required=False),
NestedField(
507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False
),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
),
2: Schema(
Expand All @@ -539,16 +508,12 @@ def construct_partition_summaries(
NestedField(512, "added_rows_count", LongType(), required=True),
NestedField(513, "existing_rows_count", LongType(), required=True),
NestedField(514, "deleted_rows_count", LongType(), required=True),
NestedField(
507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False
),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
),
}

MANIFEST_LIST_FILE_STRUCTS = {
format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()
}
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}


POSITIONAL_DELETE_SCHEMA = Schema(
Expand Down Expand Up @@ -667,16 +632,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani

# in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the data sequence number should be inherited iff the entry status is ADDED
if entry.data_sequence_number is None and (
manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED
):
if entry.data_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
entry.data_sequence_number = manifest.sequence_number

# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
if entry.file_sequence_number is None and (
manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED
):
if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
# Only available in V2, always 0 in V1
entry.file_sequence_number = manifest.sequence_number

Expand Down Expand Up @@ -827,7 +788,7 @@ class RollingManifestWriter:
_current_file_rows: int

def __init__(
self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes, target_number_of_rows
self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes: int, target_number_of_rows: int
) -> None:
self._closed = False
self._manifest_files = []
Expand All @@ -838,6 +799,7 @@ def __init__(
self._current_file_rows = 0

def __enter__(self) -> RollingManifestWriter:
"""Open the writer."""
self._get_current_writer().__enter__()
return self

Expand All @@ -847,6 +809,7 @@ def __exit__(
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Close the writer."""
self.closed = True
if self._current_writer:
self._current_writer.__exit__(exc_type, exc_value, traceback)
Expand All @@ -869,7 +832,7 @@ def _should_roll_to_new_file(self) -> bool:
or len(self._current_writer._output_file) >= self._target_file_size_in_bytes
)

def _close_current_writer(self):
def _close_current_writer(self) -> None:
if self._current_writer:
self._current_writer.__exit__(None, None, None)
current_file = self._current_writer.to_manifest_file()
Expand All @@ -887,6 +850,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter:
raise RuntimeError("Cannot add entry to closed manifest writer")
self._get_current_writer().add_entry(entry)
self._current_file_rows += entry.data_file.record_count

return self


Expand Down Expand Up @@ -1025,9 +989,7 @@ class ManifestListWriterV2(ManifestListWriter):
_commit_snapshot_id: int
_sequence_number: int

def __init__(
self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int
):
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
super().__init__(
format_version=2,
output_file=output_file,
Expand Down
Loading

0 comments on commit d3ceb8a

Please sign in to comment.