Skip to content

Commit

Permalink
Write support (apache#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Jan 18, 2024
1 parent 8614ba0 commit 8f7927b
Show file tree
Hide file tree
Showing 13 changed files with 1,035 additions and 136 deletions.
98 changes: 98 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata(

The static-table is considered read-only.

## Write support

With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table:

```python
import pyarrow as pa
df = pa.Table.from_pylist(
[
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
],
)
```

Next, create a table based on the schema:

```python
from pyiceberg.catalog import load_catalog
catalog = load_catalog("default")
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType
schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
)
tbl = catalog.create_table("default.cities", schema=schema)
```

Now write the data to the table:

<!-- prettier-ignore-start -->

!!! note inline end "Fast append"
PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables quick writes, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a normal commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads.

<!-- prettier-ignore-end -->

```python
tbl.append(df)
# or
tbl.overwrite(df)
```

The data is written to the table, and when the table is read using `tbl.scan().to_arrow()`:

```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"]]
lat: [[52.371807,37.773972,53.11254,48.864716]]
long: [[4.896029,-122.431297,6.0989,2.349014]]
```

You both can use `append(df)` or `overwrite(df)` since there is no data yet. If we want to add more data, we can use `.append()` again:

```python
df = pa.Table.from_pylist(
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
)
tbl.append(df)
```

When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table:

```
pyarrow.Table
city: string
lat: double
long: double
----
city: [["Amsterdam","San Francisco","Drachten","Paris"],["Groningen"]]
lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]
```

The nested lists indicate the different Arrow buffers, where the first write results into a buffer, and the second append in a separate buffer. This is expected since it will read two parquet files.

<!-- prettier-ignore-start -->

!!! example "Under development"
Writing using PyIceberg is still under development. Support for [partial overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to [partitioned tables](https://github.com/apache/iceberg-python/issues/208) is planned and being worked on.

<!-- prettier-ignore-end -->

## Schema evolution

PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden).
Expand Down
36 changes: 32 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 62 additions & 20 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@
OutputFile,
OutputStream,
)
from pyiceberg.manifest import DataFile, FileFormat
from pyiceberg.manifest import (
DataFile,
DataFileContent,
FileFormat,
)
from pyiceberg.schema import (
PartnerAccessor,
PreOrderSchemaVisitor,
Expand All @@ -119,8 +123,9 @@
visit,
visit_with_partner,
)
from pyiceberg.table import WriteTask
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1443,18 +1448,15 @@ def parquet_path_to_id_mapping(


def fill_parquet_file_metadata(
df: DataFile,
data_file: DataFile,
parquet_metadata: pq.FileMetaData,
file_size: int,
stats_columns: Dict[int, StatisticsCollector],
parquet_column_mapping: Dict[str, int],
) -> None:
"""
Compute and fill the following fields of the DataFile object.
- file_format
- record_count
- file_size_in_bytes
- column_sizes
- value_counts
- null_value_counts
Expand All @@ -1464,11 +1466,8 @@ def fill_parquet_file_metadata(
- split_offsets
Args:
df (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
data_file (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled.
parquet_metadata (pyarrow.parquet.FileMetaData): A pyarrow metadata object.
file_size (int): The total compressed file size cannot be retrieved from the metadata and hence has to
be passed here. Depending on the kind of file system and pyarrow library call used, different
ways to obtain this value might be appropriate.
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
set the mode for column metrics collection
"""
Expand Down Expand Up @@ -1565,13 +1564,56 @@ def fill_parquet_file_metadata(
del upper_bounds[field_id]
del null_value_counts[field_id]

df.file_format = FileFormat.PARQUET
df.record_count = parquet_metadata.num_rows
df.file_size_in_bytes = file_size
df.column_sizes = column_sizes
df.value_counts = value_counts
df.null_value_counts = null_value_counts
df.nan_value_counts = nan_value_counts
df.lower_bounds = lower_bounds
df.upper_bounds = upper_bounds
df.split_offsets = split_offsets
data_file.record_count = parquet_metadata.num_rows
data_file.column_sizes = column_sizes
data_file.value_counts = value_counts
data_file.null_value_counts = null_value_counts
data_file.nan_value_counts = nan_value_counts
data_file.lower_bounds = lower_bounds
data_file.upper_bounds = upper_bounds
data_file.split_offsets = split_offsets


def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
task = next(tasks)

try:
_ = next(tasks)
# If there are more tasks, raise an exception
raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
except StopIteration:
pass

file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
file_schema = schema_to_pyarrow(table.schema())

collected_metrics: List[pq.FileMetaData] = []
fo = table.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer:
writer.write_table(task.df)

data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(fo),
sort_order_id=task.sort_order_id,
# Just copy these from the table for now
spec_id=table.spec().spec_id,
equality_ids=None,
key_metadata=None,
)

if len(collected_metrics) != 1:
# One file has been written
raise ValueError(f"Expected 1 entry, got: {collected_metrics}")

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=collected_metrics[0],
stats_columns=compute_statistics_plan(table.schema(), table.properties),
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
)
return iter([data_file])
Loading

0 comments on commit 8f7927b

Please sign in to comment.