Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Infer Iceberg schema from the Parquet file #6997

Merged
merged 47 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
566ab9b
parquet schema visitor init draft
JonasJ-ap Mar 3, 2023
0b4aec0
add unit test for visitor
JonasJ-ap Mar 4, 2023
f542007
use singledispatch to refactor the visitor
JonasJ-ap Mar 4, 2023
8bf01a8
refactor null-check
JonasJ-ap Mar 4, 2023
59be7e6
add null check for get field id
JonasJ-ap Mar 4, 2023
2505752
fix format
JonasJ-ap Mar 4, 2023
7c04fad
fix schema to pyarrow conversion and add round test
JonasJ-ap Mar 8, 2023
af42d2b
fix pyarrow test due to schema to pyarrow change
JonasJ-ap Mar 8, 2023
b5417a8
nit fix
JonasJ-ap Mar 8, 2023
0a355e0
nit and typing fix
JonasJ-ap Mar 8, 2023
b742b63
switch from range to enumerate
JonasJ-ap Mar 8, 2023
d1c1e37
separate pyarrow test into different files
JonasJ-ap Mar 9, 2023
991150a
add unit tests for new visitor
JonasJ-ap Mar 9, 2023
e53f1c9
make conversions consistent and add tests
JonasJ-ap Mar 9, 2023
9f1e5ff
refactor the control flow
JonasJ-ap Mar 9, 2023
d78b1f0
add round-trip tests for types
JonasJ-ap Mar 9, 2023
35e076d
limit the scope of metadata fetching
JonasJ-ap Mar 9, 2023
396990e
resolve conflict
JonasJ-ap Mar 9, 2023
fe9e3a7
remove unused fixture
JonasJ-ap Mar 9, 2023
a222a57
extract str to constants
JonasJ-ap Mar 11, 2023
13b1361
substitute string with constants
JonasJ-ap Mar 11, 2023
b18ddd3
accommodate the fact that pa.struct is iterable
JonasJ-ap Mar 11, 2023
93956e9
attach PYTHON before the name
JonasJ-ap Mar 11, 2023
f07259c
fix doc typo
JonasJ-ap Mar 11, 2023
3162919
Revert "attach PYTHON before the name"
JonasJ-ap Mar 11, 2023
40d39a6
rollback to be more flexible about the key
JonasJ-ap Mar 12, 2023
736332c
rollback to use id as the end key name
JonasJ-ap Mar 12, 2023
8dbb5ad
enumerate schema directly
JonasJ-ap Mar 12, 2023
be83eb0
remove redundant field initialization
JonasJ-ap Mar 12, 2023
e7871e6
fix nit
JonasJ-ap Mar 13, 2023
3b51c7f
change to field_id
JonasJ-ap Mar 13, 2023
6da6bae
restrict the flexibility
JonasJ-ap Mar 13, 2023
35b04d2
rebase
JonasJ-ap Mar 23, 2023
d3ae307
add integration test for schema conversion from pyarrow to iceberg
JonasJ-ap Mar 23, 2023
1304481
Merge branch 'master' into python_infer_schema_from_parquet
JonasJ-ap Apr 6, 2023
b561273
fix merge issue
JonasJ-ap Apr 6, 2023
52138cc
Merge branch 'master' into python_infer_schema_from_parquet
JonasJ-ap Apr 18, 2023
9c26acc
rollback refactor to pyarrow test
JonasJ-ap Apr 18, 2023
6c28d51
clean test
JonasJ-ap Apr 18, 2023
5fffa63
revise nit, fix visitor signature, update visit function
JonasJ-ap Apr 19, 2023
615b700
split get_field_id_and_doc to two methods
JonasJ-ap Apr 19, 2023
ccd4606
simplify field callbacks
JonasJ-ap Apr 21, 2023
4c1c3cd
let timezone accept 00:00 offset too
JonasJ-ap Apr 21, 2023
0f17563
add test for 0 offset
JonasJ-ap Apr 21, 2023
878054d
Merge branch 'master' into python_infer_schema_from_parquet
JonasJ-ap Apr 27, 2023
ed45a98
incorporate PR comment
JonasJ-ap Apr 27, 2023
cd6eeb5
simplify primitive conversion
JonasJ-ap Apr 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 207 additions & 12 deletions python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
"""FileIO implementation for reading and writing table files that uses pyarrow.fs

This file contains a FileIO implementation that relies on the filesystem interface provided
Expand All @@ -26,19 +26,23 @@

import multiprocessing
import os
from functools import lru_cache
from abc import ABC, abstractmethod
from functools import lru_cache, singledispatch
from multiprocessing.pool import ThreadPool
from multiprocessing.sharedctypes import Synchronized
from typing import (
TYPE_CHECKING,
Any,
Callable,
Generic,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
cast,
)
from urllib.parse import urlparse

Expand Down Expand Up @@ -122,6 +126,12 @@
ONE_MEGABYTE = 1024 * 1024
BUFFER_SIZE = "buffer-size"
ICEBERG_SCHEMA = b"iceberg.schema"
FIELD_ID = "field_id"
DOC = "doc"
PYARROW_FIELD_ID_KEYS = [b"PARQUET:field_id", b"field_id"]
rdblue marked this conversation as resolved.
Show resolved Hide resolved
PYARROW_FIELD_DOC_KEYS = [b"PARQUET:field_doc", b"field_doc", b"doc"]

T = TypeVar("T")


class PyArrowFile(InputFile, OutputFile):
Expand Down Expand Up @@ -358,14 +368,17 @@ def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:
name=field.name,
type=field_result,
nullable=field.optional,
Fokko marked this conversation as resolved.
Show resolved Hide resolved
metadata={"doc": field.doc, "id": str(field.field_id)} if field.doc else {},
metadata={DOC: field.doc, FIELD_ID: str(field.field_id)} if field.doc else {FIELD_ID: str(field.field_id)},
)

def list(self, _: ListType, element_result: pa.DataType) -> pa.DataType:
return pa.list_(value_type=element_result)
def list(self, list_type: ListType, element_result: pa.DataType) -> pa.DataType:
element_field = self.field(list_type.element_field, element_result)
return pa.list_(value_type=element_field)
rdblue marked this conversation as resolved.
Show resolved Hide resolved

def map(self, _: MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
return pa.map_(key_type=key_result, item_type=value_result)
def map(self, map_type: MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
key_field = self.field(map_type.key_field, key_result)
value_field = self.field(map_type.value_field, value_result)
return pa.map_(key_type=key_field, item_type=value_field)

def visit_fixed(self, fixed_type: FixedType) -> pa.DataType:
return pa.binary(len(fixed_type))
Expand Down Expand Up @@ -486,6 +499,190 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())


def pyarrow_to_schema(schema: pa.Schema) -> Schema:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
visitor = _ConvertToIceberg()
return visit_pyarrow(schema, visitor)


@singledispatch
def visit_pyarrow(obj: pa.DataType | pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:
"""A generic function for applying a pyarrow schema visitor to any point within a schema

The function traverses the schema in post-order fashion

Args:
obj(pa.DataType): An instance of a Schema or an IcebergType
visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of the generic PyarrowSchemaVisitor base class

Raises:
NotImplementedError: If attempting to visit an unrecognized object type
"""
raise NotImplementedError("Cannot visit non-type: %s" % obj)


@visit_pyarrow.register(pa.Schema)
def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
struct_results: List[Optional[T]] = []
for field in obj:
visitor.before_field(field)
struct_result = visit_pyarrow(field.type, visitor)
visitor.after_field(field)
struct_results.append(struct_result)

return visitor.schema(obj, struct_results)


@visit_pyarrow.register(pa.StructType)
def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
struct_results: List[Optional[T]] = []
for field in obj:
visitor.before_field(field)
struct_result = visit_pyarrow(field.type, visitor)
visitor.after_field(field)
struct_results.append(struct_result)

return visitor.struct(obj, struct_results)


@visit_pyarrow.register(pa.ListType)
def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
visitor.before_field(obj.value_field)
list_result = visit_pyarrow(obj.value_field.type, visitor)
visitor.after_field(obj.value_field)
return visitor.list(obj, list_result)


@visit_pyarrow.register(pa.MapType)
def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
visitor.before_field(obj.key_field)
key_result = visit_pyarrow(obj.key_field.type, visitor)
visitor.after_field(obj.key_field)
visitor.before_field(obj.item_field)
value_result = visit_pyarrow(obj.item_field.type, visitor)
visitor.after_field(obj.item_field)
return visitor.map(obj, key_result, value_result)


@visit_pyarrow.register(pa.DataType)
def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
if pa.types.is_nested(obj):
raise TypeError(f"Expected primitive type, got: {type(obj)}")
return visitor.primitive(obj)


class PyArrowSchemaVisitor(Generic[T], ABC):
JonasJ-ap marked this conversation as resolved.
Show resolved Hide resolved
def before_field(self, field: pa.Field) -> None:
"""Override this method to perform an action immediately before visiting a field."""

def after_field(self, field: pa.Field) -> None:
"""Override this method to perform an action immediately after visiting a field."""

@abstractmethod
def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) -> Optional[T]:
"""visit a schema"""

@abstractmethod
def struct(self, struct: pa.StructType, field_results: List[Optional[T]]) -> Optional[T]:
"""visit a struct"""

@abstractmethod
def list(self, list_type: pa.ListType, element_result: Optional[T]) -> Optional[T]:
"""visit a list"""

@abstractmethod
def map(self, map_type: pa.MapType, key_result: Optional[T], value_result: Optional[T]) -> Optional[T]:
"""visit a map"""

@abstractmethod
def primitive(self, primitive: pa.DataType) -> Optional[T]:
"""visit a primitive type"""


def _get_field_id(field: pa.Field) -> Optional[int]:
for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
if field_id_str := field.metadata.get(pyarrow_field_id_key):
return int(field_id_str.decode())
return None


def _get_field_doc(field: pa.Field) -> Optional[str]:
for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
if doc_str := field.metadata.get(pyarrow_doc_key):
return doc_str.decode()
return None


class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[NestedField]:
fields = []
for i, field in enumerate(arrow_fields):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
if field_type is not None and field_id is not None:
fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc))
return fields

def schema(self, schema: pa.Schema, field_results: List[Optional[IcebergType]]) -> Schema:
return Schema(*self._convert_fields(schema, field_results))

def struct(self, struct: pa.StructType, field_results: List[Optional[IcebergType]]) -> IcebergType:
return StructType(*self._convert_fields(struct, field_results))

def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> Optional[IcebergType]:
element_field = list_type.value_field
element_id = _get_field_id(element_field)
if element_result is not None and element_id is not None:
return ListType(element_id, element_result, element_required=not element_field.nullable)
return None

def map(
self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType]
) -> Optional[IcebergType]:
key_field = map_type.key_field
key_id = _get_field_id(key_field)
value_field = map_type.item_field
value_id = _get_field_id(value_field)
if key_result is not None and value_result is not None and key_id is not None and value_id is not None:
return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable)
return None

def primitive(self, primitive: pa.DataType) -> IcebergType:
if pa.types.is_boolean(primitive):
return BooleanType()
elif pa.types.is_int32(primitive):
return IntegerType()
elif pa.types.is_int64(primitive):
return LongType()
elif pa.types.is_float32(primitive):
return FloatType()
elif pa.types.is_float64(primitive):
return DoubleType()
elif isinstance(primitive, pa.Decimal128Type):
primitive = cast(pa.Decimal128Type, primitive)
return DecimalType(primitive.precision, primitive.scale)
elif pa.types.is_string(primitive):
return StringType()
elif pa.types.is_date32(primitive):
return DateType()
elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
return TimeType()
elif pa.types.is_timestamp(primitive):
primitive = cast(pa.TimestampType, primitive)
if primitive.unit == "us":
if primitive.tz == "UTC" or primitive.tz == "+00:00":
return TimestamptzType()
elif primitive.tz is None:
return TimestampType()
elif pa.types.is_binary(primitive):
return BinaryType()
elif pa.types.is_fixed_size_binary(primitive):
primitive = cast(pa.FixedSizeBinaryType, primitive)
return FixedType(primitive.byte_width)

raise TypeError(f"Unsupported type: {primitive}")


def _file_to_table(
fs: FileSystem,
task: FileScanTask,
Expand All @@ -507,11 +704,9 @@ def _file_to_table(
schema_raw = None
if metadata := physical_schema.metadata:
schema_raw = metadata.get(ICEBERG_SCHEMA)
if schema_raw is None:
raise ValueError(
"Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505"
)
file_schema = Schema.parse_raw(schema_raw)
# TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema,
# see https://github.com/apache/iceberg/issues/7451
file_schema = Schema.parse_raw(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema)

pyarrow_filter = None
if bound_row_filter is not AlwaysTrue():
Expand Down
Loading