Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests to check compatibility with fastparquet #9366

Merged
merged 23 commits into from
Oct 23, 2023

Conversation

mythrocks
Copy link
Collaborator

@mythrocks mythrocks commented Oct 2, 2023

This commit adds tests to check for compatibility between the Spark RAPIDS plugin and fastparquet.

The tests include the following scenarios:

  1. To check whether Parquet files written with fastparquet are read correctly with the Spark RAPIDS plugin.
  2. To check whether Parquet files written with the Spark RAPIDS plugin are read correctly with fastparquet.
  3. To check that files written with Apache Spark are read similarly with the Spark RAPIDS plugin, and fastparquet.

There are known limitations for these tests:

  1. Converting Pandas dataframes to Spark dataframes seems to fail with Spark versions preceding spark-3.4.x. This limits the Spark versions where the test may be run.
  2. Pandas and fastparquet have known limitations with certain datatypes. For instance:
    a. fastparquet does not support DECIMAL columns, as per documentation. These are treated as FLOAT.
    b. There seem to be limits to the ranges of Date and Timestamp rows, as compared to Apache Spark. For instance, dates like date(year=8543, month=12, day=31) is deemed out of range in Pandas/fastparquet. Such a date cannot be written via fastparquet, nor could they be read correctly via fastparquet.

@mythrocks mythrocks self-assigned this Oct 2, 2023
@mythrocks mythrocks added the test Only impacts tests label Oct 2, 2023
@mythrocks mythrocks marked this pull request as draft October 2, 2023 21:51
@mythrocks
Copy link
Collaborator Author

Build

1. Fixed dataframe size in test_read_fastparquet_single_column_tables.
2. Removed unnecessary test params for write confs.
3. Moved fastparquet import to top of test file.
4. Expanded on test failure description for timestamps.
@mythrocks
Copy link
Collaborator Author

Build

@pxLi
Copy link
Collaborator

pxLi commented Oct 6, 2023

To introduce new test dep, please also update

  1. https://github.com/NVIDIA/spark-rapids/blob/branch-23.10/integration_tests/README.md#dependencies
  2. dockerfiles under https://github.com/NVIDIA/spark-rapids/tree/branch-23.10/jenkins/ if need to cover in CI, OR
  3. include the requirement installation in the run_pyspark script

Also, I would recommend providing the strategy here to enable/disable the cases,
e.g. if not finding the required pkg skip the case or provide a flag to help others control if test against specific cases
better to be disabled as default so missing dep would not affect others
https://github.com/NVIDIA/spark-rapids/blob/branch-23.10/integration_tests/conftest.py#L42-L50

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My biggest concern here really is just follow on work. There are so many xfails. Are they things that we don't think will ever be fixed? If so then we might want to not test them at all. Are they things that we just have not had time to track down and file issues to fix? If so then we really should have a follow on issue to do that. Is it something else?

@mythrocks
Copy link
Collaborator Author

I'll reply to the individual comments in a moment. A lot of the xfails have to do with the inter-conversions between Pandas and Spark, and dtype inference.

  1. A Pandas dataframe containing <NA>/null, when converted to a Spark dataframe (via sparkSession.createDataFrame() fails to be constructed, because the Pandas dataframe type seems to be Struct, for some reason. This seems independent of fastparquet. Adding a schema changes the error, but not the fact that there is an error.
  2. Some dataframes simply change types during conversion. For instance, ARRAY<INT> when converted to Pandas turns into STRING.

@mythrocks
Copy link
Collaborator Author

mythrocks commented Oct 6, 2023

Classification of xfails:

For fixing in Spark RAPIDS / CUDF:

  1. [BUG] String columns written with fastparquet are read differently with Spark RAPIDS #9387, [BUG] String columns written with fastparquet seem to be read incorrectly via CUDF's Parquet reader rapidsai/cudf#14258: String columns written with fastparquet have extra null characters at the end of the last row.

For follow-up in Spark RAPIDS tests:

  1. [BUG] Fix STRUCT comparison between Pandas and Spark dataframes in fastparquet tests #9399: STRUCT<INT, FLOAT> columns, when read through fastparquet and converted to Spark dataframe for diffing, have a different notation from those read through the plugin:
    a. CPU: Row(a.first=-341142443, a.second=3.333994866005594e-37)
    b. GPU: Row(a=Row(first=-341142443, second=3.333994866005594e-37))
    Might need custom diff logic to convert between notations.

Problems in fastparquet:

  1. fastparquet reads DECIMAL columns in Parquet files as FLOAT columns, as per fastparquet documentation.
    This manifests as differences from Spark results.
  2. fastparquet reads DATE columns in Parquet files as TIMESTAMP.
  3. fastparquet has different date/timestamp validity limits, as compared to Apache Spark. For instance, year=8705 and year=705 are both read incorrectly (i.e. completely different values) by fastparquet.
  4. Any Pandas dataframe that contains None causes sparkSession.createDataFrame() to see "merge errors":
    >>> sql(" values (0), (1), (2), (null) as foo(i) ").write.mode('overwrite').parquet("/tmp/foobar")
    >>> ipdf = fastparquet.ParquetFile("/tmp/foobar").to_pandas()
    >>> ipdf.dtypes
     i    Int32
     dtype: object
    
    >>> spark.createDataFrame(ipdf)
    ...
    TypeError: field i: Can not merge type <class 'pyspark.sql.types.LongType'> and <class 'pyspark.sql.types.StructType'>
    Using an explicit schema doesn't improve matters much:
    >>> schema = StructType([StructField('i', IntegerType())])
    >>> spark.createDataFrame(ipdf, schema=schema)
    ...
    TypeError: field i: IntegerType() can not accept object <NA> in type <class 'pandas._libs.missing.NAType'>
    It's possible that this needs followup at our end.
  5. A Pandas dataframe (say, from fastparquet.ParquetFile().to_pandas()) containing ARRAY<INT>, when passed to sparkSession.createDataFrame() produces the following error:
    >>> sql(" values (array(1,2,3)), (array(4,5,6)) as foo(arr) ").write.mode("overwrite").parquet("/tmp/foobar")
    >>> pdf = fastparquet.ParquetFile("/tmp/foobar").to_pandas()
    >>> spark.createDataFrame(pdf)
    ...
    TypeError: Unable to infer the type of the field arr.
    Specifying a schema does not seem to help matters.
    >>> spark.createDataFrame(pdf, schema=StructType([StructField('arr', ArrayType(IntegerType(), True), True)]))
    ...
    TypeError: element in array field arr: IntegerType() can not accept object 1 in type <class 'numpy.int32'>
  6. Date/timestamp columns written by fastparquet in int96 cannot be rebased for correctness, to account for
    dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z, as Apache Spark does with
    spark.sql.legacy.parquet.int96RebaseModeInRead.
  7. fastparquet loses ARRAY<INT> type information when writing Pandas dataframe of type ARRAY<INT>. The column is written as STRING.
    >>> pdf = pandas.DataFrame({ 'arr': [ [1,2,3], [4,5,6] ] })
    >>> pdf
            arr
    0  [1, 2, 3]
    1  [4, 5, 6]
    
    >>> spark.createDataFrame(pdf).printSchema()
    root
    |-- arr: array (nullable = true)
    |    |-- element: long (containsNull = true)
    
    >>> fastparquet.write("/tmp/foobar", pdf)
    >>> spark.read.parquet("/tmp/foobar").printSchema()
    root
    |-- arr: string (nullable = true)

For fixing in Apache Spark (possibly).

  1. Date/timestamp columns written by fastparquet in int64 cannot be read by Spark or the plugin.

@mythrocks
Copy link
Collaborator Author

mythrocks commented Oct 10, 2023

  1. dockerfiles under https://github.com/NVIDIA/spark-rapids/tree/branch-23.10/jenkins/ if need to cover in CI, OR
  2. include the requirement installation in the run_pyspark script

I'm at a loss. I'm not sure where I've missed adding fastparquet as a dependency.

@pxLi
Copy link
Collaborator

pxLi commented Oct 10, 2023

FYI
to check which docker image or file would be used in pre-merge

https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/jenkins/Jenkinsfile-blossom.premerge#L34
https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/jenkins/Jenkinsfile-blossom.premerge#L135-L154 (run w/ modified dockerfile)

NOTE: pre-merge CI image build would run every 4 hours (there could be some gap after merge this PR and build the new pre-merge image for others), so to try not affect others' premerge CI you also need to manually trigger an ops/docker_image-manager with build after merge instantly. You can also allow me to do the merge to avoid potential blocking

@pxLi
Copy link
Collaborator

pxLi commented Oct 10, 2023

  1. dockerfiles under https://github.com/NVIDIA/spark-rapids/tree/branch-23.10/jenkins/ if need to cover in CI, OR
  2. include the requirement installation in the run_pyspark script

I'm at a loss. I'm not sure where I've missed adding fastparquet as a dependency.

I see your point, your dockerfile change did was not considered as modified files in pre-merge CI, let me take a look

UPDATE: fixed in #9415

@pxLi
Copy link
Collaborator

pxLi commented Oct 10, 2023

build

@pxLi
Copy link
Collaborator

pxLi commented Oct 10, 2023

git fetch --tags --force --progress -- https://github.com/NVIDIA/spark-rapids.git "+refs/pull/9366/merge"
git checkout 008afd78a21c79c0b271e34a40a48509e641bd84
BASE=$(git --no-pager log --oneline -1 | awk '{ print $NF }')
git --no-pager diff --name-only HEAD $(git merge-base HEAD) -- jenkins/Dockerfile-blossom.ubuntu

the dockerfile change detector works fine with the same merged commit locally.
Not sure why its failed to trigger the corresponding build stage in jenkins, triggered another run to check

UPDATE: fixed in #9415

@pxLi
Copy link
Collaborator

pxLi commented Oct 10, 2023

build

in current trigger


[2023-10-10T08:58:55.873Z] Collecting fastparquet

[2023-10-10T08:58:55.873Z]   Obtaining dependency information for fastparquet from https://files.pythonhosted.org/packages/d5/99/d6ed5914e30e794775d3bc645e952ba7b6855ca8db2dc41d6d5069e76abb/fastparquet-2023.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata

[2023-10-10T08:58:55.873Z]   Downloading fastparquet-2023.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)

the installed one

@pxLi
Copy link
Collaborator

pxLi commented Oct 10, 2023

FYI to check which docker image or file would be used in pre-merge

https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/jenkins/Jenkinsfile-blossom.premerge#L34 https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/jenkins/Jenkinsfile-blossom.premerge#L135-L154 (run w/ modified dockerfile)

NOTE: pre-merge CI image build would run every 4 hours (there could be some gap after merge this PR and build the new pre-merge image for others), so to try not affect others' premerge CI you also need to manually trigger an ops/docker_image-manager with build after merge instantly. You can also allow me to do the merge to avoid potential blocking

I still recommend adding the flag for people to choose if enable the cases. thanks

@mythrocks
Copy link
Collaborator Author

@pxLi: So as not to break anyone else's tests, I have modified the test to check whether fastparquet is available for import, per your earlier suggestion. I have verified that this skips the tests if fastparquet is not installed. The relevant commit is here:
fa356f8.

@mythrocks
Copy link
Collaborator Author

Not sure why the so many reviewers got added here. I'll lean on @revans2, @pxLi and @res-life for this one.

@mythrocks
Copy link
Collaborator Author

Build

Copy link
Collaborator

@pxLi pxLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, for CI update thanks!

@res-life
Copy link
Collaborator

For fixing in Apache Spark (possibly).
Date/timestamp columns written by fastparquet in int64 cannot be read by Spark or the plugin.

Spark does not support us timestamp, so I think it's impossible to fix.
https://github.com/apache/spark/blob/v3.4.0/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala#L27-L30

For fixing in fastparquet:
1 - 7 items

We also can not fix the fastparquet code.

@res-life
Copy link
Collaborator

LGTM

@res-life
Copy link
Collaborator

Did not find tests for map type.
We can file a follow-up.

Copy link
Collaborator

@NvTimLiu NvTimLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mythrocks mythrocks merged commit a8cbfca into NVIDIA:branch-23.12 Oct 23, 2023
28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
test Only impacts tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants