Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanitized special character column name before writing to parquet #590

Merged

Conversation

kevinjqliu
Copy link
Contributor

@kevinjqliu kevinjqliu commented Apr 7, 2024

Fixes #584

Before this PR, PyIceberg allows writing parquet files with special characters in column names. This is currently not allowed in the Java Iceberg library. Instead, the Java Iceberg library transforms the special characters before writing to parquet. This transformation is done during reading as well.

For example, in the Java Iceberg library, an Iceberg table with TEST:A1B2.RAW.ABC-GG-1-A column is transformed into TEST_x3AA1B2_x2ERAW_x2EABC_x2DGG_x2D1_x2DA which is then used to write the parquet files.

apache/iceberg/#10120 is opened as a feature request to allow writing parquet files with special characters in the column name.

In the meantime, we want to mirror the Java Iceberg library behavior.
#83 does the column name transformation during reading.
This PR does the column name transformation during writing.

column_name_test.py Outdated Show resolved Hide resolved
column_name_test.py Outdated Show resolved Hide resolved
column_name_test.py Outdated Show resolved Hide resolved
@kevinjqliu kevinjqliu force-pushed the kevinjqliu/special-character-column-parquet branch from d09b945 to 345827e Compare April 11, 2024 01:54
@kevinjqliu kevinjqliu force-pushed the kevinjqliu/special-character-column-parquet branch from 345827e to d278ee5 Compare April 11, 2024 02:41
@kevinjqliu kevinjqliu changed the title Kevinjqliu/special character column parquet Sanitized special character column before writing to parquet Apr 11, 2024
@kevinjqliu kevinjqliu changed the title Sanitized special character column before writing to parquet Sanitized special character column name before writing to parquet Apr 11, 2024
@kevinjqliu kevinjqliu marked this pull request as ready for review April 11, 2024 02:56
@@ -1772,12 +1772,13 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
)

def write_parquet(task: WriteTask) -> DataFile:
df = pa.Table.from_batches(task.record_batches)
df = df.rename_columns(schema.column_names)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to change the Schema (column names) of the arrow data frame, if there's a better way to do this, please let me know

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we extend the integration test to test the nested schema case? For example,

        pa.field('name', pa.string()),
        pa.field('address', pa.struct([
            pa.field('street', pa.string()),
            pa.field('city', pa.string()),
            pa.field('zip', pa.int32())
        ]

Updated: I got

pyarrow.lib.ArrowInvalid: tried to rename a table of 4 columns but only 7 names were provided

when trying with the following dataset

TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = {
        column_name_with_special_character: ['a', None, 'z'],
        'id': [1, 2, 3],
        'name': ['AB', 'CD', 'EF'],
        'address': [
            {'street': '123', 'city': 'SFO', 'zip': 12345},
            {'street': '456', 'city': 'SW', 'zip': 67890},
            {'street': '789', 'city': 'Random', 'zip': 10112}
        ]
    }
    pa_schema = pa.schema([
        pa.field(column_name_with_special_character, pa.string()),
        pa.field('id', pa.int32()),
        pa.field('name', pa.string()),
        pa.field('address', pa.struct([
            pa.field('street', pa.string()),
            pa.field('city', pa.string()),
            pa.field('zip', pa.int32())
        ]))
    ])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! the i dont think rename_columns works well with nested schema

@kevinjqliu kevinjqliu requested a review from gwindes April 11, 2024 03:14
@kevinjqliu
Copy link
Contributor Author

cc @Fokko / @HonahX

tests/integration/test_writes/test_writes.py Outdated Show resolved Hide resolved
@@ -1772,12 +1772,13 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
)

def write_parquet(task: WriteTask) -> DataFile:
df = pa.Table.from_batches(task.record_batches)
df = df.rename_columns(schema.column_names)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we extend the integration test to test the nested schema case? For example,

        pa.field('name', pa.string()),
        pa.field('address', pa.struct([
            pa.field('street', pa.string()),
            pa.field('city', pa.string()),
            pa.field('zip', pa.int32())
        ]

Updated: I got

pyarrow.lib.ArrowInvalid: tried to rename a table of 4 columns but only 7 names were provided

when trying with the following dataset

TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = {
        column_name_with_special_character: ['a', None, 'z'],
        'id': [1, 2, 3],
        'name': ['AB', 'CD', 'EF'],
        'address': [
            {'street': '123', 'city': 'SFO', 'zip': 12345},
            {'street': '456', 'city': 'SW', 'zip': 67890},
            {'street': '789', 'city': 'Random', 'zip': 10112}
        ]
    }
    pa_schema = pa.schema([
        pa.field(column_name_with_special_character, pa.string()),
        pa.field('id', pa.int32()),
        pa.field('name', pa.string()),
        pa.field('address', pa.struct([
            pa.field('street', pa.string()),
            pa.field('city', pa.string()),
            pa.field('zip', pa.int32())
        ]))
    ])

@@ -1122,12 +1121,12 @@ def project_table(
return result


def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table:
struct_array = visit_with_partner(requested_schema, table, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema))
def to_requested_schema(table: pa.Table, from_schema: Schema, to_schema: Schema) -> pa.Table:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the helper method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can pull this refactor into a separate PR if it helps with review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public method, so we're breaking the API here. Not sure if a refactor justifies the the breaking change. Also, The file_Schema and requested_schema are more informative to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize it's a public API. I reverted the refactor

@@ -1772,16 +1772,17 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
)

def write_parquet(task: WriteTask) -> DataFile:
arrow_table = pa.Table.from_batches(task.record_batches)
df = to_requested_schema(table=arrow_table, from_schema=iceberg_table_schema, to_schema=parquet_schema)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to the read side (#83 & #597), turns the Arrow table from unsanitized Iceberg table schema to sanitized parquet schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we batch the incoming dataframe first (in _dataframe_to_data_files) and then transform the scheme for each batch.

We can optimize by transforming first and then batching.

I want the schema transformation to happen as closely to the parquet writing as possible, so going with the first method for now

@kevinjqliu kevinjqliu requested a review from HonahX April 11, 2024 22:41
Copy link
Contributor

@HonahX HonahX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Sorry for the merge conflict because I took the integration test in #597 .

Follow-up PR: As proposed in apache/iceberg#10120, shall we also add a configuration to also allow writing parquet file with original column names?

About milestone: Shall we add this to 0.7.0 milestone? I think changing the column naming behavior of parquet generation may be too much for a patch release. Also, I hesitate to label it as a "bug" since Iceberg relies on the field-id and it does not violate the spec. Releasing it in 0.7.0 can also give us more time to develop the follow-up PR and discuss more about the apache/iceberg#10120. WDYT?

@kevinjqliu @Fokko


tbl.overwrite(arrow_table_with_special_character_column)
# PySpark toPandas() turns nested field into tuple by default, but returns the proper schema when Arrow is enabled
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add this to the spark fixture in conftest.py? Since the fixture's scope is "session", if we change the config here, all tests before this line will not have the configuration and all after this line will have this enabled. Moving it to the initialization part can ensure we have a consistent set of spark configs throughout the integration tests. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! i didn't know about the fixture scope behavior. Moved to conftest

@kevinjqliu
Copy link
Contributor Author

+1 to adding this to the 0.7.0 release. The original issue in #584 is already fixed by #597.

@@ -186,8 +185,6 @@ def test_inspect_entries(

assert df_lhs == df_rhs, f"Difference in data_file column {df_column}: {df_lhs} != {df_rhs}"
elif column == 'readable_metrics':
right = right.asDict(recursive=True)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont need this anymore because of spark.sql.execution.arrow.pyspark.enabled, pandas DF turns tuple to dict

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@HonahX HonahX added this to the PyIceberg 0.7.0 release milestone Apr 13, 2024
@kevinjqliu kevinjqliu requested a review from Fokko April 13, 2024 19:57
Comment on lines 1773 to 1774
parquet_schema = sanitize_column_names(iceberg_table_schema)
arrow_file_schema = parquet_schema.as_arrow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I realize we have many names, but that might be confusing. Parquet-schema is appropriate today since we only support parquet, but we might also support ORC and Avro later.

Suggested change
parquet_schema = sanitize_column_names(iceberg_table_schema)
arrow_file_schema = parquet_schema.as_arrow()
arrow_file_schema = sanitize_column_names(iceberg_table_schema).as_arrow()

@@ -1780,16 +1781,17 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT
)

def write_parquet(task: WriteTask) -> DataFile:
arrow_table = pa.Table.from_batches(task.record_batches)
df = to_requested_schema(requested_schema=parquet_schema, file_schema=iceberg_table_schema, table=arrow_table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know if from_arrays in the ArrowProjectionVisitor is no-op?

The I'm asking is that we're introducing quite a bit of logic here, and I think the rewrites are only applicable for Avro: https://avro.apache.org/docs/1.8.1/spec.html#names

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick check, depending on how long the from_arrays take, it doesn't seem to copy anything:

python3.9
Python 3.9.18 (main, Aug 24 2023, 18:16:58) 
[Clang 15.0.0 (clang-1500.1.0.2.5)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> numbers = pa.array(range(100000000))
>>> pa.Table.from_arrays([numbers], names=['abc'])
pyarrow.Table
abc: int64
----
abc: [[0,1,2,3,4,...,99999995,99999996,99999997,99999998,99999999]]

Comment on lines +1780 to +1788
table_schema = task.schema
arrow_table = pa.Table.from_batches(task.record_batches)
# if schema needs to be transformed, use the transformed schema and adjust the arrow table accordingly
# otherwise use the original schema
if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema:
file_schema = sanitized_schema
arrow_table = to_requested_schema(requested_schema=file_schema, file_schema=table_schema, table=arrow_table)
else:
file_schema = table_schema
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko wdyt of this? only used the transformed schema when necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option could be to push this logic up to the caller of write_file or maybe even in WriteTask

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to leave out all this logic when writing Parquet files. It doesn't affect Parquet files (since the characters are supported). With Iceberg we resolve the fields by ID, so names are unimportant. I would like to get @rdblue's opinion since he did the initial work (I also have a meeting with him tomorrow, and I can ping him there).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good! I opened iceberg/#10120 to track this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pinged Ryan and he's in favor of adding the aliases 👍

@Fokko Fokko merged commit 62b527e into apache:main Apr 17, 2024
7 checks passed
@Fokko
Copy link
Contributor

Fokko commented Apr 17, 2024

Thanks for working on this @kevinjqliu and thanks @HonahX for the review

@kevinjqliu kevinjqliu deleted the kevinjqliu/special-character-column-parquet branch April 17, 2024 16:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Valid column characters fail on to_arrow() or to_pandas() ArrowInvalid: No match for FieldRef.Name
4 participants