Skip to content

Commit

Permalink
add_files support partitioned tables (apache#531)
Browse files Browse the repository at this point in the history
* add_files support partitioned tables

* docs

* more

* adopt review feedback

* split-offsets required
  • Loading branch information
sungwy authored Mar 21, 2024
1 parent 69b9e39 commit 6989b92
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 176 deletions.
5 changes: 2 additions & 3 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,8 @@ tbl.add_files(file_paths=file_paths)
!!! note "Name Mapping"
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.
<!-- prettier-ignore-end -->
<!-- prettier-ignore-start -->
!!! note "Partitions"
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`.
!!! warning "Maintenance Operations"
Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them.
Expand Down
148 changes: 98 additions & 50 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
DataFileContent,
FileFormat,
)
from pyiceberg.partitioning import PartitionField, PartitionSpec, partition_record_value
from pyiceberg.schema import (
PartnerAccessor,
PreOrderSchemaVisitor,
Expand All @@ -124,7 +125,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
Expand Down Expand Up @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
return result


def fill_parquet_file_metadata(
data_file: DataFile,
@dataclass(frozen=True)
class DataFileStatistics:
record_count: int
column_sizes: Dict[int, int]
value_counts: Dict[int, int]
null_value_counts: Dict[int, int]
nan_value_counts: Dict[int, int]
column_aggregates: Dict[int, StatsAggregator]
split_offsets: List[int]

def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any:
if partition_field.source_id not in self.column_aggregates:
return None

if not partition_field.transform.preserves_order:
raise ValueError(
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}"
)

lower_value = partition_record_value(
partition_field=partition_field,
value=self.column_aggregates[partition_field.source_id].current_min,
schema=schema,
)
upper_value = partition_record_value(
partition_field=partition_field,
value=self.column_aggregates[partition_field.source_id].current_max,
schema=schema,
)
if lower_value != upper_value:
raise ValueError(
f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
)
return lower_value

def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})

def to_serialized_dict(self) -> Dict[str, Any]:
lower_bounds = {}
upper_bounds = {}

for k, agg in self.column_aggregates.items():
_min = agg.min_as_bytes()
if _min is not None:
lower_bounds[k] = _min
_max = agg.max_as_bytes()
if _max is not None:
upper_bounds[k] = _max
return {
"record_count": self.record_count,
"column_sizes": self.column_sizes,
"value_counts": self.value_counts,
"null_value_counts": self.null_value_counts,
"nan_value_counts": self.nan_value_counts,
"lower_bounds": lower_bounds,
"upper_bounds": upper_bounds,
"split_offsets": self.split_offsets,
}


def data_file_statistics_from_parquet_metadata(
parquet_metadata: pq.FileMetaData,
stats_columns: Dict[int, StatisticsCollector],
parquet_column_mapping: Dict[str, int],
) -> None:
) -> DataFileStatistics:
"""
Compute and fill the following fields of the DataFile object.
Compute and return DataFileStatistics that includes the following.
- file_format
- record_count
- column_sizes
- value_counts
- null_value_counts
- nan_value_counts
- lower_bounds
- upper_bounds
- column_aggregates
- split_offsets
Args:
data_file (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
set the mode for column metrics collection
parquet_column_mapping (Dict[str, int]): The mapping of the parquet file name to the field ID
"""
if parquet_metadata.num_columns != len(stats_columns):
raise ValueError(
Expand Down Expand Up @@ -1695,30 +1755,19 @@ def fill_parquet_file_metadata(

split_offsets.sort()

lower_bounds = {}
upper_bounds = {}

for k, agg in col_aggs.items():
_min = agg.min_as_bytes()
if _min is not None:
lower_bounds[k] = _min
_max = agg.max_as_bytes()
if _max is not None:
upper_bounds[k] = _max

for field_id in invalidate_col:
del lower_bounds[field_id]
del upper_bounds[field_id]
del col_aggs[field_id]
del null_value_counts[field_id]

data_file.record_count = parquet_metadata.num_rows
data_file.column_sizes = column_sizes
data_file.value_counts = value_counts
data_file.null_value_counts = null_value_counts
data_file.nan_value_counts = nan_value_counts
data_file.lower_bounds = lower_bounds
data_file.upper_bounds = upper_bounds
data_file.split_offsets = split_offsets
return DataFileStatistics(
record_count=parquet_metadata.num_rows,
column_sizes=column_sizes,
value_counts=value_counts,
null_value_counts=null_value_counts,
nan_value_counts=nan_value_counts,
column_aggregates=col_aggs,
split_offsets=split_offsets,
)


def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
Expand Down Expand Up @@ -1747,6 +1796,11 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
writer.write_table(task.df, row_group_size=row_group_size)

statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
Expand All @@ -1761,47 +1815,41 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
)

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
return iter([data_file])


def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]:
for task in tasks:
input_file = io.new_input(task.file_path)
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:
for file_path in file_paths:
input_file = io.new_input(file_path)
with input_file.open() as input_stream:
parquet_metadata = pq.read_metadata(input_stream)

if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()):
raise NotImplementedError(
f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
)

schema = table_metadata.schema()
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)
data_file = DataFile(
content=DataFileContent.DATA,
file_path=task.file_path,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=task.partition_field_value,
record_count=parquet_metadata.num_rows,
partition=statistics.partition(table_metadata.spec(), table_metadata.schema()),
file_size_in_bytes=len(input_file),
sort_order_id=None,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=parquet_metadata,
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
parquet_column_mapping=parquet_path_to_id_mapping(schema),
)

yield data_file


Expand Down
25 changes: 21 additions & 4 deletions pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,16 +388,33 @@ def partition(self) -> Record: # partition key transformed with iceberg interna
if len(partition_fields) != 1:
raise ValueError("partition_fields must contain exactly one field.")
partition_field = partition_fields[0]
iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value)
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
iceberg_typed_key_values[partition_field.name] = transformed_value
iceberg_typed_key_values[partition_field.name] = partition_record_value(
partition_field=partition_field,
value=raw_partition_field_value.value,
schema=self.schema,
)
return Record(**iceberg_typed_key_values)

def to_path(self) -> str:
return self.partition_spec.partition_to_path(self.partition, self.schema)


def partition_record_value(partition_field: PartitionField, value: Any, schema: Schema) -> Any:
"""
Return the Partition Record representation of the value.
The value is first converted to internal partition representation.
For example, UUID is converted to bytes[16], DateType to days since epoch, etc.
Then the corresponding PartitionField's transform is applied to return
the final partition record value.
"""
iceberg_type = schema.find_field(name_or_id=partition_field.source_id).field_type
iceberg_typed_value = _to_partition_representation(iceberg_type, value)
transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
return transformed_value


@singledispatch
def _to_partition_representation(type: IcebergType, value: Any) -> Any:
return TypeError(f"Unsupported partition field type: {type}")
Expand Down
18 changes: 1 addition & 17 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
Dict,
Generic,
Iterable,
Iterator,
List,
Literal,
Optional,
Expand Down Expand Up @@ -1179,9 +1178,6 @@ def add_files(self, file_paths: List[str]) -> None:
Raises:
FileNotFoundError: If the file does not exist.
"""
if len(self.spec().fields) > 0:
raise ValueError("Cannot add files to partitioned tables")

with self.transaction() as tx:
if self.name_mapping() is None:
tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()})
Expand Down Expand Up @@ -2524,17 +2520,6 @@ def _dataframe_to_data_files(
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))


def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]:
if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:
raise ValueError("Cannot add files to partitioned tables")

for file_path in file_paths:
yield AddFileTask(
file_path=file_path,
partition_field_value=Record(),
)


def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:
"""Convert a list files into DataFiles.
Expand All @@ -2543,8 +2528,7 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List
"""
from pyiceberg.io.pyarrow import parquet_files_to_data_files

tasks = add_file_tasks_from_file_paths(file_paths, table_metadata)
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks)
yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, file_paths=iter(file_paths))


class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
Expand Down
Loading

0 comments on commit 6989b92

Please sign in to comment.