Skip to content

Commit

Permalink
[CHORE] Ensure compatibility with deltalake version v0.19 (#2827)
Browse files Browse the repository at this point in the history
Deltalake v0.19 changes their `_convert_pa_schema_to_delta` function to
take in a `schema_conversion_mode` instead of `large_dtypes`. Pyarrow
also needed to be upgraded to v16.0.0 as well to be compatible with the
new version of deltalake

The difference between this PR and #2754 is that it still maintains
compatibility with older deltalake versions.

The reason why this PR also includes a change to arrow2 is because
starting in version 0.19, deltalake uses arrow-rs by default instead of
pyarrow to write files when calling `deltalake.write_deltalake`. We do
not actually use this functionality but our tests do, and arrow-rs
writes map arrays in a way that does not conform to the parquet spec. I
figured it would be good to just add that compatibility in there just in
case some user is using `arrow-rs` to write their parquet files.

However, there are also other issues with deltalake's rust writer,
including improper encoding of partitioned binary columns, so we will
use their pyarrow writer for testing.
  • Loading branch information
kevinzwang authored Sep 11, 2024
1 parent 4582192 commit 7048b97
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 12 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
matrix:
python-version: ['3.8', '3.10']
daft-runner: [py, ray]
pyarrow-version: [7.0.0, 15.0.0]
pyarrow-version: [7.0.0, 16.0.0]
os: [ubuntu-20.04, windows-latest]
exclude:
- daft-runner: ray
Expand All @@ -36,6 +36,8 @@ jobs:
python-version: '3.10'
pyarrow-version: 7.0.0
os: ubuntu-20.04
- python-version: '3.8'
pyarrow-version: 16.0.0
- os: windows-latest
python-version: '3.8'
- os: windows-latest
Expand Down
2 changes: 1 addition & 1 deletion benchmarking/parquet/benchmark-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pytest==7.4.0
pytest-benchmark==4.0.0
pytest-memray==1.4.1
pyarrow==15.0.0
pyarrow==16.0.0
boto3==1.28.3
9 changes: 6 additions & 3 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ def write_deltalake(

from daft import from_pydict
from daft.io import DataCatalogTable
from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options

if schema_mode == "merge":
Expand Down Expand Up @@ -835,12 +836,14 @@ def write_deltalake(
warnings.warn("No DynamoDB table specified for Delta Lake locking. Defaulting to unsafe writes.")

pyarrow_schema = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in self.schema())
delta_schema = _convert_pa_schema_to_delta(pyarrow_schema, large_dtypes=True)

large_dtypes = True
delta_schema = _convert_pa_schema_to_delta(pyarrow_schema, **large_dtypes_kwargs(large_dtypes))

if table:
table.update_incremental()

table_schema = table.schema().to_pyarrow(as_large_types=True)
table_schema = table.schema().to_pyarrow(as_large_types=large_dtypes)
if delta_schema != table_schema and not (mode == "overwrite" and schema_mode == "overwrite"):
raise ValueError(
"Schema of data does not match table schema\n"
Expand All @@ -865,7 +868,7 @@ def write_deltalake(
table_uri,
mode,
version,
large_dtypes=True,
large_dtypes,
io_config=io_config,
)
write_df = DataFrame(builder)
Expand Down
2 changes: 1 addition & 1 deletion daft/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
S3Credentials,
)
from daft.io._csv import read_csv
from daft.io._delta_lake import read_deltalake
from daft.io._deltalake import read_deltalake
from daft.io._hudi import read_hudi
from daft.io._iceberg import read_iceberg
from daft.io._json import read_json
Expand Down
15 changes: 14 additions & 1 deletion daft/io/_delta_lake.py → daft/io/_deltalake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# isort: dont-add-import: from __future__ import annotations

from typing import Optional, Union
from typing import Any, Dict, Optional, Union

from daft import context
from daft.api_annotations import PublicAPI
Expand Down Expand Up @@ -76,3 +76,16 @@ def read_deltalake(
handle = ScanOperatorHandle.from_python_scan_operator(delta_lake_operator)
builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)
return DataFrame(builder)


def large_dtypes_kwargs(large_dtypes: bool) -> Dict[str, Any]:
import deltalake
from packaging.version import parse

if parse(deltalake.__version__) < parse("0.19.0"):
return {"large_dtypes": large_dtypes}
else:
from deltalake.schema import ArrowSchemaConversionMode

schema_conversion_mode = ArrowSchemaConversionMode.LARGE if large_dtypes else ArrowSchemaConversionMode.NORMAL
return {"schema_conversion_mode": schema_conversion_mode}
3 changes: 2 additions & 1 deletion daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ def write_deltalake(
from packaging.version import parse
from pyarrow.fs import PyFileSystem

from daft.io._deltalake import large_dtypes_kwargs
from daft.io.object_store_options import io_config_to_storage_options
from daft.utils import ARROW_VERSION

Expand Down Expand Up @@ -745,7 +746,7 @@ def file_visitor(written_file: Any) -> None:
fs = PyFileSystem(DeltaStorageHandler(base_path, storage_options))

arrow_table = mp.to_arrow()
arrow_batch = convert_pyarrow_table(arrow_table, large_dtypes)
arrow_batch = convert_pyarrow_table(arrow_table, **large_dtypes_kwargs(large_dtypes))

execution_config = get_context().daft_execution_config

Expand Down
6 changes: 4 additions & 2 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ opencv-python==4.8.1.78
tiktoken==0.7.0

# Pyarrow
pyarrow==15.0.0
pyarrow==16.0.0; python_version >= '3.9'
pyarrow==15.0.0; python_version < '3.9'
# Ray
ray[data, client]==2.10.0; python_version == '3.8'
ray[data, client]==2.34.0; python_version >= '3.9'
Expand All @@ -49,7 +50,8 @@ tenacity==8.2.3; python_version >= '3.8'

# Delta Lake
deltalake==0.5.8; platform_system == "Windows"
deltalake==0.18.2; platform_system != "Windows" and python_version >= '3.8'
deltalake==0.18.2; platform_system != "Windows" and python_version < '3.9'
deltalake==0.19.2; platform_system != "Windows" and python_version >= '3.9'

# Databricks
databricks-sdk==0.12.0
Expand Down
8 changes: 7 additions & 1 deletion src/arrow2/src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,17 @@ fn to_group_type(
) -> Option<DataType> {
debug_assert!(!fields.is_empty());
if field_info.repetition == Repetition::Repeated {
if (field_info.name == "key_value" || field_info.name == "map") && fields.len() == 2 {
if (field_info.name == "key_value"
|| field_info.name == "map"
|| field_info.name == "entries")
&& fields.len() == 2
{
// For map types, the middle level, named key_value, is a repeated group with "key_value" as the name
// and two fields, "key" and "value". The "key" field is the key type and the "value" field is the value type.
// For backward compatibility, the "key_value" group may be named "map" instead of "key_value".
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
// For compatibility with arrow-rs, the "key_value" group may also be named "entries" instead of "key_value".
// https://github.com/apache/arrow-rs/blob/704f90bbf541896387bed030e12a39be308047e8/arrow-array/src/builder/map_builder.rs#L81
to_struct(fields, options)
} else {
Some(DataType::List(Box::new(Field::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ uvicorn==0.23.2
uvloop==0.17.0
watchfiles==0.19.0
websockets==11.0.3
pyarrow==15.0.0
pyarrow==16.0.0
slowapi==0.1.8

# Pin numpy version otherwise pyarrow doesn't work
Expand Down
1 change: 1 addition & 0 deletions tests/io/delta_lake/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,5 +441,6 @@ def deltalake_table(
table,
partition_by="part_idx" if partition_generator(0) is not None else None,
storage_options=storage_options,
engine="pyarrow",
)
return path, catalog_table, io_config, parts

0 comments on commit 7048b97

Please sign in to comment.