Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Infer Iceberg schema from the Parquet file #6997

Merged
merged 47 commits into from
May 2, 2023

Conversation

JonasJ-ap
Copy link
Contributor

@JonasJ-ap JonasJ-ap commented Mar 3, 2023

Problem Addressed:

This PR fixes #6505, #6647, and #7457.
It adds support to infer iceberg schema from parquet schema when the parquet file does not contain metadata holding the encoded iceberg schema.

Tests:

Working on unit tests.

Sample on AWS Athena (reproduce and fix the bug following the procedures in #6505 ):

  1. Create a table on AWS Glue:
val type_frame = spark
                        .range(0, 5, 1, 5)
                        .withColumnRenamed("id", "longCol")
                        .withColumn("intCol", expr("CAST(longCol AS INT)"))
                        .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
                        .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
                        .withColumn("dateCol", date_add(current_date(), 1))
                        .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
                        .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
                        .withColumn("booleanCol", expr("longCol > 5"))
                        .withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
                        .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
                        .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
                        .withColumn("shortCol", expr("CAST(longCol AS SHORT)"))
                        .withColumn("mapCol", expr("MAP(longCol, decimalCol)"))
                        .withColumn("arrayCol", expr("ARRAY(longCol)"))
                        .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)"))
type_frame.writeTo(s"demo.$DB_NAME.type_test_ref_unpartitioned2").tableProperty("format-version", "2").createOrReplace()
  1. Use AWS Athena to optimize the table
OPTIMIZE type_test_ref_unpartitioned REWRITE DATA USING BIN_PACK;
  1. Run code snippet leads to error:
from pyiceberg.catalog import load_catalog

catalog = load_catalog("default", warehouse="s3://gluetestjonas/warehouse")
table = catalog.load_table("iceberg_ref.type_test_ref_unpartitioned")

df = table.scan().to_pandas()
print(df)

On current master branch:

issue_6505 python3.10 parquet_schema.py
Traceback (most recent call last):
  File "/Users/jonasjiang/Workspace/Apache_Iceberg_ws/python_test/issue_6505/parquet_schema.py", line 6, in <module>
    df = table.scan().to_pandas()
  File "/Users/jonasjiang/Workspace/Apache_Iceberg_ws/python_test/iceberg/python/pyiceberg/table/__init__.py", line 409, in to_pandas
    return self.to_arrow().to_pandas(**kwargs)
  File "/Users/jonasjiang/Workspace/Apache_Iceberg_ws/python_test/iceberg/python/pyiceberg/table/__init__.py", line 404, in to_arrow
    return project_table(
  File "/Users/jonasjiang/Workspace/Apache_Iceberg_ws/python_test/iceberg/python/pyiceberg/io/pyarrow.py", line 559, in project_table
    for table in pool.starmap(
  File "/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 375, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "/Users/jonasjiang/Workspace/Apache_Iceberg_ws/python_test/iceberg/python/pyiceberg/io/pyarrow.py", line 496, in _file_to_table
    raise ValueError(
ValueError: Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505

On this PR:

issue_6505 python3.10 parquet_schema.py
   longCol  intCol  floatCol  doubleCol     dateCol              timestampCol   stringCol  booleanCol                            binaryCol  byteCol decimalCol  shortCol       mapCol arrayCol                                 structCol
0        3       3       3.0        3.0  2023-03-04 2023-03-04 00:00:00+00:00  2023-03-04       False  b'\x00\x00\x00\x00\x00\x00\x00\x03'        3       3.00         3  [(3, 3.00)]      [3]  {'mapCol': [(3, 3.00)], 'arrayCol': [3]}
1        0       0       0.0        0.0  2023-03-04 2023-03-04 00:00:00+00:00  2023-03-04       False  b'\x00\x00\x00\x00\x00\x00\x00\x00'        0       0.00         0  [(0, 0.00)]      [0]  {'mapCol': [(0, 0.00)], 'arrayCol': [0]}
2        2       2       2.0        2.0  2023-03-04 2023-03-04 00:00:00+00:00  2023-03-04       False  b'\x00\x00\x00\x00\x00\x00\x00\x02'        2       2.00         2  [(2, 2.00)]      [2]  {'mapCol': [(2, 2.00)], 'arrayCol': [2]}
3        1       1       1.0        1.0  2023-03-04 2023-03-04 00:00:00+00:00  2023-03-04       False  b'\x00\x00\x00\x00\x00\x00\x00\x01'        1       1.00         1  [(1, 1.00)]      [1]  {'mapCol': [(1, 1.00)], 'arrayCol': [1]}
4        4       4       4.0        4.0  2023-03-04 2023-03-04 00:00:00+00:00  2023-03-04       False  b'\x00\x00\x00\x00\x00\x00\x00\x04'        4       4.00         4  [(4, 4.00)]      [4]  {'mapCol': [(4, 4.00)], 'arrayCol': [4]}

Indicating now the table can be read normally

.pyiceberg.yaml:

catalog:
  default:
    type: glue

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JonasJ-ap for creating this PR, looks good! I've added some suggestions, let me know what you think!

Copy link
Contributor Author

@JonasJ-ap JonasJ-ap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Thank you for reviewing this. I added my concerns about some details below.

python/tests/io/test_pyarrow.py Outdated Show resolved Hide resolved
python/tests/io/test_pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JonasJ-ap for changing this to @singledispatch. I think we're almost there, added some more suggestions. Let me know what you think!

python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/tests/io/test_pyarrow.py Outdated Show resolved Hide resolved
python/tests/io/test_pyarrow.py Outdated Show resolved Hide resolved
python/tests/io/test_pyarrow.py Outdated Show resolved Hide resolved
python/tests/conftest.py Outdated Show resolved Hide resolved
"Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505"
)
file_schema = Schema.parse_raw(schema_raw)
# TODO: if field_ids are not present, another fallback level should be implemented to look them up in the table schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Iceberg's "name mapping" feature.

The basics:

  1. Name mapping is applied ONLY if there are not field IDs in the Parquet file
  2. If a field is present in the name mapping, the ID from the mapping is used. Multiple names can be mapped to the same ID
  3. If a field is not present in the name mapping, it is omitted.

For PyIceberg, the name mapping would probably be implemented by pre-processing the PyArrow schema to add the PYTHON:field_id properties. That way this code doesn't need to change.

As a result, if there is no field ID for any field, it should be omitted from the Iceberg schema.

return TimeType()
elif pa.types.is_timestamp(primitive):
primitive = cast(pa.TimestampType, primitive)
if primitive.tz is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko, can we handle non-UTC time zones, or should we fail if this is not UTC or +00:00?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can handle non-utc timezones

return StringType()
elif pa.types.is_date(primitive):
return DateType()
elif pa.types.is_time(primitive):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check time or timestamp precision?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko, I think this could be a problem. We should make sure there are good tests for these types.

The read code operates using PyIceberg schemas. So that is going to convert PyArrow to PyIceberg, prune columns, and the convert back to request a projection from PyArrow. If there is a lossy conversion then that will cause a problem. For example if a time type is in millis but is converted without that information, then the requested type will be a timestamp in micros and that may cause a problem.

When looking at this, I found another problem. Rather than requesting a specific schema, the projected Iceberg schema is used to request top-level columns:

        arrow_table = pq.read_table(
            source=fout,
            schema=parquet_schema,
            pre_buffer=True,
            buffer_size=8 * ONE_MEGABYTE,
            filters=pyarrow_filter,
            columns=[col.name for col in file_project_schema.columns],
        )

I assume that's going to read full nested structures because the only information it has is the top-level column name. If that happens, I'm concerned that we're reading leaf columns that we don't actually need to. I think projection will correctly ignore them, but it would be nice to avoid reading them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read code operates using PyIceberg schemas. So that is going to convert PyArrow to PyIceberg, prune columns, and the convert back to request a projection from PyArrow. If there is a lossy conversion then that will cause a problem. For example if a time type is in millis but is converted without that information, then the requested type will be a timestamp in micros and that may cause a problem.

Good point, I think we should have round-trip tests here, where we go from Iceberg to PyArrow, and back to Iceberg.

I assume that's going to read full nested structures because the only information it has is the top-level column name. If that happens, I'm concerned that we're reading leaf columns that we don't actually need to. I think projection will correctly ignore them, but it would be nice to avoid reading them.

I agree, I'm would have to look into the details if we prune the fields of the nested fields properly. In the future I'd like to replace this with the custom evolution strategy: apache/arrow#33972 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your explanation. I refactored the code to make it consistent with the primitive conversion in iceberg to pyarrow conversion. Currently, It only allow UTC as timezone and us as unit.

I also added some round trip tests for these types

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still seems suspicious to me. While it's now correct, how does PyArrow read Python files that have other time or timestamp representations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review and for bringing up your concerns. I'd like to understand better what you find suspicious about PyArrow's ability to read Python files with different time or timestamp representations.

From what I understand, Iceberg's TimeType, TimestampType, and TimestampzType require us and UTC, and my current conversion ensures that no data is lost.

In this case, PyArrow can support reading non-UTC timezones and s, ms, and us precision, but it does not support nanosecond precision since the final requested type during projection will be us and UTC.:

def visit_time(self, _: TimeType) -> pa.DataType:
return pa.time64("us")
def visit_timestamp(self, _: TimestampType) -> pa.DataType:
return pa.timestamp(unit="us")
def visit_timestampz(self, _: TimestamptzType) -> pa.DataType:
return pa.timestamp(unit="us", tz="UTC")

I chose to restrict the precision to us and the timezone to UTC because the Iceberg specification requires all stored time/timestamp to be in this precision and timezone. Since the pyarrow_to_schema visitor is used to read an Iceberg table's data file, I believe we should only support us and UTC.

However, I am also not very sure about it. Regarding support for other precision and timezone here, I think more discussion and modifications may be needed if we want to add other support. How about creating another PR if needed to address these concerns?

Thank you again for your feedback, and please let me know if you have any further questions or concerns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an Iceberg schema in the file, I think we can assume that it is written according to the spec:
image

With the current check, it is correct:

        elif pa.types.is_time(primitive):
            if isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
                return TimeType()

We could even simplify it:

        elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
            return TimeType()

The tests are in place:

def test_pyarrow_time32_to_iceberg() -> None:
    pyarrow_type = pa.time32("ms")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[ms]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())
    pyarrow_type = pa.time32("s")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[s]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())


def test_pyarrow_time64_us_to_iceberg() -> None:
    pyarrow_type = pa.time64("us")
    converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
    assert converted_iceberg_type == TimeType()
    assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type


def test_pyarrow_time64_ns_to_iceberg() -> None:
    pyarrow_type = pa.time64("ns")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())

I think we can resolve this issue

Fokko added a commit to Fokko/iceberg that referenced this pull request Mar 7, 2023
Creates fragments based on the FileFormat.

Blocked by: apache#6997
@Fokko Fokko added this to the PyIceberg 0.4.0 release milestone Mar 7, 2023
@JonasJ-ap JonasJ-ap marked this pull request as ready for review March 10, 2023 21:19
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small improvements, but looks good in general 👍🏻 Thanks @JonasJ-ap for working on this!

python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/tests/io/test_pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
@Fokko
Copy link
Contributor

Fokko commented Mar 12, 2023

@JonasJ-ap I think we're good. Thanks so much for this PR and the refactoring of the tests. Let's @rdblue some time to hear if he has any final thoughts before merging.

key_field = map_type.key_field
key_id, _ = _get_field_id_and_doc(key_field)
value_field = map_type.item_field
value_id, _ = _get_field_id_and_doc(value_field)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why combine the lookup into a single method when doc is ignored most of the time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I separated it into _get_field_id and _get_field_doc

return TimeType()
elif pa.types.is_timestamp(primitive):
primitive = cast(pa.TimestampType, primitive)
if primitive.unit == "us" and primitive.tz == "UTC":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also accept a zero offset, like +00:00?

@JonasJ-ap
Copy link
Contributor Author

@rdblue Thanks for your review and suggestions. I will try to update the PR by this Friday.

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few nits, but looks good!

python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
return StringType()
elif pa.types.is_date(primitive):
return DateType()
elif pa.types.is_time(primitive):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an Iceberg schema in the file, I think we can assume that it is written according to the spec:
image

With the current check, it is correct:

        elif pa.types.is_time(primitive):
            if isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
                return TimeType()

We could even simplify it:

        elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
            return TimeType()

The tests are in place:

def test_pyarrow_time32_to_iceberg() -> None:
    pyarrow_type = pa.time32("ms")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[ms]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())
    pyarrow_type = pa.time32("s")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[s]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())


def test_pyarrow_time64_us_to_iceberg() -> None:
    pyarrow_type = pa.time64("us")
    converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
    assert converted_iceberg_type == TimeType()
    assert visit(converted_iceberg_type, _ConvertToArrowSchema()) == pyarrow_type


def test_pyarrow_time64_ns_to_iceberg() -> None:
    pyarrow_type = pa.time64("ns")
    with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")):
        visit_pyarrow(pyarrow_type, _ConvertToIceberg())

I think we can resolve this issue

"Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505"
)
file_schema = Schema.parse_raw(schema_raw)
# TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create an issue for this, and still raise an exception?

Copy link
Contributor Author

@JonasJ-ap JonasJ-ap Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created the issue: #7451.

But I am not sure if what is the proper way to raise exception in this case. Based on my understanding, name mapping is also needed if portion of parquet fields miss the field ids. However, in this case, pyarrow_to_schema can still generate a valid iceberg schema for the the rest of parquet fields. It seems we should not raise exception in this case.

Should we only raise exception when no field id exist in the whole data file? I think we can also log some warning messages when a pyarrow field containing a field id. What do you think?

@rdblue rdblue merged commit 6cf4d9e into apache:master May 2, 2023
@rdblue
Copy link
Contributor

rdblue commented May 2, 2023

Thanks, @JonasJ-ap! Great to have this improvement in.

manisin pushed a commit to Snowflake-Labs/iceberg that referenced this pull request May 9, 2023
Fokko added a commit to apache/iceberg-python that referenced this pull request Sep 29, 2023
* Python: Add support for ORC

Creates fragments based on the FileFormat.

Blocked by: apache/iceberg#6997

* Revert

* TableScan add limit

* pyarrow limit number of rows fetched from files if limit is set

* add tests for scan limit

* python ci rebuild container if changes on python/dev/

* remove support for ORC

* remove unused imports

* increase sleep before running tests

* update python docs to include limit in table query

* docs fix format

---------

Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Daniel Rückert García <[email protected]>
Fokko added a commit to apache/iceberg-python that referenced this pull request Sep 30, 2023
* Python: Add support for ORC

Creates fragments based on the FileFormat.

Blocked by: apache/iceberg#6997

* Revert

* TableScan add limit

* pyarrow limit number of rows fetched from files if limit is set

* add tests for scan limit

* python ci rebuild container if changes on python/dev/

* remove support for ORC

* remove unused imports

* increase sleep before running tests

* update python docs to include limit in table query

* docs fix format

---------

Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Daniel Rückert García <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Python: Infer Iceberg schema from the Parquet file
4 participants