Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Infer Iceberg schema from the Parquet file #6997

Merged
merged 47 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
566ab9b
parquet schema visitor init draft
JonasJ-ap Mar 3, 2023
0b4aec0
add unit test for visitor
JonasJ-ap Mar 4, 2023
f542007
use singledispatch to refactor the visitor
JonasJ-ap Mar 4, 2023
8bf01a8
refactor null-check
JonasJ-ap Mar 4, 2023
59be7e6
add null check for get field id
JonasJ-ap Mar 4, 2023
2505752
fix format
JonasJ-ap Mar 4, 2023
7c04fad
fix schema to pyarrow conversion and add round test
JonasJ-ap Mar 8, 2023
af42d2b
fix pyarrow test due to schema to pyarrow change
JonasJ-ap Mar 8, 2023
b5417a8
nit fix
JonasJ-ap Mar 8, 2023
0a355e0
nit and typing fix
JonasJ-ap Mar 8, 2023
b742b63
switch from range to enumerate
JonasJ-ap Mar 8, 2023
d1c1e37
separate pyarrow test into different files
JonasJ-ap Mar 9, 2023
991150a
add unit tests for new visitor
JonasJ-ap Mar 9, 2023
e53f1c9
make conversions consistent and add tests
JonasJ-ap Mar 9, 2023
9f1e5ff
refactor the control flow
JonasJ-ap Mar 9, 2023
d78b1f0
add round-trip tests for types
JonasJ-ap Mar 9, 2023
35e076d
limit the scope of metadata fetching
JonasJ-ap Mar 9, 2023
396990e
resolve conflict
JonasJ-ap Mar 9, 2023
fe9e3a7
remove unused fixture
JonasJ-ap Mar 9, 2023
a222a57
extract str to constants
JonasJ-ap Mar 11, 2023
13b1361
substitute string with constants
JonasJ-ap Mar 11, 2023
b18ddd3
accommodate the fact that pa.struct is iterable
JonasJ-ap Mar 11, 2023
93956e9
attach PYTHON before the name
JonasJ-ap Mar 11, 2023
f07259c
fix doc typo
JonasJ-ap Mar 11, 2023
3162919
Revert "attach PYTHON before the name"
JonasJ-ap Mar 11, 2023
40d39a6
rollback to be more flexible about the key
JonasJ-ap Mar 12, 2023
736332c
rollback to use id as the end key name
JonasJ-ap Mar 12, 2023
8dbb5ad
enumerate schema directly
JonasJ-ap Mar 12, 2023
be83eb0
remove redundant field initialization
JonasJ-ap Mar 12, 2023
e7871e6
fix nit
JonasJ-ap Mar 13, 2023
3b51c7f
change to field_id
JonasJ-ap Mar 13, 2023
6da6bae
restrict the flexibility
JonasJ-ap Mar 13, 2023
35b04d2
rebase
JonasJ-ap Mar 23, 2023
d3ae307
add integration test for schema conversion from pyarrow to iceberg
JonasJ-ap Mar 23, 2023
1304481
Merge branch 'master' into python_infer_schema_from_parquet
JonasJ-ap Apr 6, 2023
b561273
fix merge issue
JonasJ-ap Apr 6, 2023
52138cc
Merge branch 'master' into python_infer_schema_from_parquet
JonasJ-ap Apr 18, 2023
9c26acc
rollback refactor to pyarrow test
JonasJ-ap Apr 18, 2023
6c28d51
clean test
JonasJ-ap Apr 18, 2023
5fffa63
revise nit, fix visitor signature, update visit function
JonasJ-ap Apr 19, 2023
615b700
split get_field_id_and_doc to two methods
JonasJ-ap Apr 19, 2023
ccd4606
simplify field callbacks
JonasJ-ap Apr 21, 2023
4c1c3cd
let timezone accept 00:00 offset too
JonasJ-ap Apr 21, 2023
0f17563
add test for 0 offset
JonasJ-ap Apr 21, 2023
878054d
Merge branch 'master' into python_infer_schema_from_parquet
JonasJ-ap Apr 27, 2023
ed45a98
incorporate PR comment
JonasJ-ap Apr 27, 2023
cd6eeb5
simplify primitive conversion
JonasJ-ap Apr 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 211 additions & 12 deletions python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
"""FileIO implementation for reading and writing table files that uses pyarrow.fs

This file contains a FileIO implementation that relies on the filesystem interface provided
Expand All @@ -26,19 +26,23 @@

import multiprocessing
import os
from functools import lru_cache
from abc import ABC, abstractmethod
from functools import lru_cache, singledispatch
from multiprocessing.pool import ThreadPool
from multiprocessing.sharedctypes import Synchronized
from typing import (
TYPE_CHECKING,
Any,
Callable,
Generic,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
cast,
)
from urllib.parse import urlparse

Expand Down Expand Up @@ -122,6 +126,12 @@
ONE_MEGABYTE = 1024 * 1024
BUFFER_SIZE = "buffer-size"
ICEBERG_SCHEMA = b"iceberg.schema"
FIELD_ID = "field_id"
DOC = "doc"
PYARROW_FIELD_ID_KEYS = [b"PARQUET:field_id", b"field_id"]
rdblue marked this conversation as resolved.
Show resolved Hide resolved
PYARROW_FIELD_DOC_KEYS = [b"PARQUET:field_doc", b"field_doc", b"doc"]

T = TypeVar("T")


class PyArrowFile(InputFile, OutputFile):
Expand Down Expand Up @@ -358,14 +368,17 @@ def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:
name=field.name,
type=field_result,
nullable=field.optional,
Fokko marked this conversation as resolved.
Show resolved Hide resolved
metadata={"doc": field.doc, "id": str(field.field_id)} if field.doc else {},
metadata={DOC: field.doc, FIELD_ID: str(field.field_id)} if field.doc else {FIELD_ID: str(field.field_id)},
)

def list(self, _: ListType, element_result: pa.DataType) -> pa.DataType:
return pa.list_(value_type=element_result)
def list(self, list_type: ListType, element_result: pa.DataType) -> pa.DataType:
element_field = self.field(list_type.element_field, element_result)
return pa.list_(value_type=element_field)
rdblue marked this conversation as resolved.
Show resolved Hide resolved

def map(self, _: MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
return pa.map_(key_type=key_result, item_type=value_result)
def map(self, map_type: MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
key_field = self.field(map_type.key_field, key_result)
value_field = self.field(map_type.value_field, value_result)
return pa.map_(key_type=key_field, item_type=value_field)

def visit_fixed(self, fixed_type: FixedType) -> pa.DataType:
return pa.binary(len(fixed_type))
Expand Down Expand Up @@ -486,6 +499,195 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())


def pyarrow_to_schema(schema: pa.Schema) -> Schema:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
visitor = _ConvertToIceberg()
return visit_pyarrow(schema, visitor)


@singledispatch
def visit_pyarrow(obj: pa.DataType | pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:
"""A generic function for applying a pyarrow schema visitor to any point within a schema

The function traverses the schema in post-order fashion

Args:
obj(pa.DataType): An instance of a Schema or an IcebergType
visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class

Raises:
NotImplementedError: If attempting to visit an unrecognized object type
"""
raise NotImplementedError("Cannot visit non-type: %s" % obj)


@visit_pyarrow.register(pa.Schema)
def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
struct_results: List[Optional[T]] = []
for field in obj:
visitor.before_field(field)
struct_result = visit_pyarrow(field.type, visitor)
visitor.after_field(field)
struct_results.append(struct_result)

return visitor.schema(obj, struct_results)


@visit_pyarrow.register(pa.StructType)
def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
struct_results: List[Optional[T]] = []
for field in obj:
visitor.before_field(field)
struct_result = visit_pyarrow(field.type, visitor)
visitor.after_field(field)
struct_results.append(struct_result)

return visitor.struct(obj, struct_results)


@visit_pyarrow.register(pa.ListType)
def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
visitor.before_field(obj.value_field)
list_result = visit_pyarrow(obj.value_field.type, visitor)
visitor.after_field(obj.value_field)
return visitor.list(obj, list_result)


@visit_pyarrow.register(pa.MapType)
def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
visitor.before_field(obj.key_field)
key_result = visit_pyarrow(obj.key_field.type, visitor)
visitor.after_field(obj.key_field)
visitor.before_field(obj.item_field)
value_result = visit_pyarrow(obj.item_field.type, visitor)
visitor.after_field(obj.item_field)
return visitor.map(obj, key_result, value_result)


@visit_pyarrow.register(pa.DataType)
def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
if pa.types.is_nested(obj):
raise TypeError(f"Expected primitive type, got: {type(obj)}")
return visitor.primitive(obj)


class PyArrowSchemaVisitor(Generic[T], ABC):
JonasJ-ap marked this conversation as resolved.
Show resolved Hide resolved
def before_field(self, field: pa.Field) -> None:
"""Override this method to perform an action immediately before visiting a field."""

def after_field(self, field: pa.Field) -> None:
"""Override this method to perform an action immediately after visiting a field."""

@abstractmethod
def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) -> Optional[T]:
"""visit a schema"""

@abstractmethod
def struct(self, struct: pa.StructType, field_results: List[Optional[T]]) -> Optional[T]:
"""visit a struct"""

@abstractmethod
def list(self, list_type: pa.ListType, element_result: Optional[T]) -> Optional[T]:
"""visit a list"""

@abstractmethod
def map(self, map_type: pa.MapType, key_result: Optional[T], value_result: Optional[T]) -> Optional[T]:
"""visit a map"""

@abstractmethod
def primitive(self, primitive: pa.DataType) -> Optional[T]:
"""visit a primitive type"""


def _get_field_id(field: pa.Field) -> Optional[int]:
for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
if field_id_str := field.metadata.get(pyarrow_field_id_key):
return int(field_id_str.decode())
return None


def _get_field_doc(field: pa.Field) -> Optional[str]:
for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
if doc_str := field.metadata.get(pyarrow_doc_key):
return doc_str.decode()
return None


class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
def schema(self, schema: pa.Schema, field_results: List[Optional[IcebergType]]) -> Schema:
JonasJ-ap marked this conversation as resolved.
Show resolved Hide resolved
fields = []
for i, field in enumerate(schema):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
if field_type is not None and field_id is not None:
fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc))
return Schema(*fields)

def struct(self, struct: pa.StructType, field_results: List[Optional[IcebergType]]) -> IcebergType:
fields = []
for i, field in enumerate(struct):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
if field_type is not None and field_id is not None:
fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc))
return StructType(*fields)

def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> Optional[IcebergType]:
element_field = list_type.value_field
element_id = _get_field_id(element_field)
if element_result is not None and element_id is not None:
return ListType(element_id, element_result, element_required=not element_field.nullable)
return None

def map(
self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType]
) -> Optional[IcebergType]:
key_field = map_type.key_field
key_id = _get_field_id(key_field)
value_field = map_type.item_field
value_id = _get_field_id(value_field)
if key_result is not None and value_result is not None and key_id is not None and value_id is not None:
return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable)
return None

def primitive(self, primitive: pa.DataType) -> IcebergType:
if pa.types.is_boolean(primitive):
return BooleanType()
elif pa.types.is_int32(primitive):
return IntegerType()
elif pa.types.is_int64(primitive):
return LongType()
elif pa.types.is_float32(primitive):
return FloatType()
elif pa.types.is_float64(primitive):
return DoubleType()
elif pa.types.is_decimal(primitive) and isinstance(primitive, pa.Decimal128Type):
primitive = cast(pa.Decimal128Type, primitive)
return DecimalType(primitive.precision, primitive.scale)
elif pa.types.is_string(primitive):
return StringType()
elif pa.types.is_date32(primitive):
return DateType()
elif pa.types.is_time(primitive):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check time or timestamp precision?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko, I think this could be a problem. We should make sure there are good tests for these types.

The read code operates using PyIceberg schemas. So that is going to convert PyArrow to PyIceberg, prune columns, and the convert back to request a projection from PyArrow. If there is a lossy conversion then that will cause a problem. For example if a time type is in millis but is converted without that information, then the requested type will be a timestamp in micros and that may cause a problem.

When looking at this, I found another problem. Rather than requesting a specific schema, the projected Iceberg schema is used to request top-level columns:

        arrow_table = pq.read_table(
            source=fout,
            schema=parquet_schema,
            pre_buffer=True,
            buffer_size=8 * ONE_MEGABYTE,
            filters=pyarrow_filter,
            columns=[col.name for col in file_project_schema.columns],
        )

I assume that's going to read full nested structures because the only information it has is the top-level column name. If that happens, I'm concerned that we're reading leaf columns that we don't actually need to. I think projection will correctly ignore them, but it would be nice to avoid reading them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read code operates using PyIceberg schemas. So that is going to convert PyArrow to PyIceberg, prune columns, and the convert back to request a projection from PyArrow. If there is a lossy conversion then that will cause a problem. For example if a time type is in millis but is converted without that information, then the requested type will be a timestamp in micros and that may cause a problem.

Good point, I think we should have round-trip tests here, where we go from Iceberg to PyArrow, and back to Iceberg.

I assume that's going to read full nested structures because the only information it has is the top-level column name. If that happens, I'm concerned that we're reading leaf columns that we don't actually need to. I think projection will correctly ignore them, but it would be nice to avoid reading them.

I agree, I'm would have to look into the details if we prune the fields of the nested fields properly. In the future I'd like to replace this with the custom evolution strategy: apache/arrow#33972 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your explanation. I refactored the code to make it consistent with the primitive conversion in iceberg to pyarrow conversion. Currently, It only allow UTC as timezone and us as unit.

I also added some round trip tests for these types

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still seems suspicious to me. While it's now correct, how does PyArrow read Python files that have other time or timestamp representations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review and for bringing up your concerns. I'd like to understand better what you find suspicious about PyArrow's ability to read Python files with different time or timestamp representations.

From what I understand, Iceberg's TimeType, TimestampType, and TimestampzType require us and UTC, and my current conversion ensures that no data is lost.

In this case, PyArrow can support reading non-UTC timezones and s, ms, and us precision, but it does not support nanosecond precision since the final requested type during projection will be us and UTC.:

def visit_time(self, _: TimeType) -> pa.DataType:
return pa.time64("us")
def visit_timestamp(self, _: TimestampType) -> pa.DataType:
return pa.timestamp(unit="us")
def visit_timestampz(self, _: TimestamptzType) -> pa.DataType:
return pa.timestamp(unit="us", tz="UTC")

I chose to restrict the precision to us and the timezone to UTC because the Iceberg specification requires all stored time/timestamp to be in this precision and timezone. Since the pyarrow_to_schema visitor is used to read an Iceberg table's data file, I believe we should only support us and UTC.

However, I am also not very sure about it. Regarding support for other precision and timezone here, I think more discussion and modifications may be needed if we want to add other support. How about creating another PR if needed to address these concerns?

Thank you again for your feedback, and please let me know if you have any further questions or concerns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an Iceberg schema in the file, I think we can assume that it is written according to the spec:
image

With the current check, it is correct:

        elif pa.types.is_time(primitive):
            if isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
                return TimeType()

We could even simplify it:

        elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
            return TimeType()

The tests are in place:

def test_pyarrow_time32_to_iceberg() -> None:
    pyarrow_type = pa.time32("ms")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[ms]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())
    pyarrow_type = pa.time32("s")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[s]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())


def test_pyarrow_time64_us_to_iceberg() -> None:
    pyarrow_type = pa.time64("us")
    converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
    assert converted_iceberg_type == TimeType()
    assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type


def test_pyarrow_time64_ns_to_iceberg() -> None:
    pyarrow_type = pa.time64("ns")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())

I think we can resolve this issue

if isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
return TimeType()
elif pa.types.is_timestamp(primitive):
primitive = cast(pa.TimestampType, primitive)
if primitive.unit == "us":
if primitive.tz == "UTC" or primitive.tz == "+00:00":
return TimestamptzType()
elif primitive.tz is None:
return TimestampType()
elif pa.types.is_binary(primitive):
return BinaryType()
elif pa.types.is_fixed_size_binary(primitive):
primitive = cast(pa.FixedSizeBinaryType, primitive)
return FixedType(primitive.byte_width)

raise TypeError(f"Unsupported type: {primitive}")


def _file_to_table(
fs: FileSystem,
task: FileScanTask,
Expand All @@ -507,11 +709,8 @@ def _file_to_table(
schema_raw = None
if metadata := physical_schema.metadata:
schema_raw = metadata.get(ICEBERG_SCHEMA)
if schema_raw is None:
raise ValueError(
"Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505"
)
file_schema = Schema.parse_raw(schema_raw)
# TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create an issue for this, and still raise an exception?

Copy link
Contributor Author

@JonasJ-ap JonasJ-ap Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created the issue: #7451.

But I am not sure if what is the proper way to raise exception in this case. Based on my understanding, name mapping is also needed if portion of parquet fields miss the field ids. However, in this case, pyarrow_to_schema can still generate a valid iceberg schema for the the rest of parquet fields. It seems we should not raise exception in this case.

Should we only raise exception when no field id exist in the whole data file? I think we can also log some warning messages when a pyarrow field containing a field id. What do you think?

file_schema = Schema.parse_raw(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema)

pyarrow_filter = None
if bound_row_filter is not AlwaysTrue():
Expand Down
Loading