Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pyspark): run pre-execute hooks for to_delta #8848

Merged

Conversation

deepyaman
Copy link
Contributor

@mark-druffel needs Spark-native export functionality. We had put off implementing the fast path because we were trying to figure out if we could unify output options, but we should probably add the fast path until we figure out a better alternative.

@deepyaman
Copy link
Contributor Author

deepyaman commented Apr 2, 2024

(Still in draft, because I need to replace the tests I xfailed with Spark-specific tests.) Ready!

@deepyaman deepyaman self-assigned this Apr 2, 2024
@deepyaman deepyaman force-pushed the feat/flink/native-parquet-and-csv-export branch from 0960a68 to 5026500 Compare April 3, 2024 04:45
@deepyaman deepyaman marked this pull request as ready for review April 3, 2024 04:45
@@ -311,6 +313,7 @@ def test_table_to_csv(tmp_path, backend, awards_players):
reason="cannot inline WriteOptions objects",
raises=DuckDBParserException,
)
@pytest.mark.never(["pyspark"], reason="backend writes a dir", raises=IsADirectoryError)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this result in effectively no testing for PySpark to_csv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No; I added equivalent tests in ibis/backends/pyspark/tests/test_export.py.

@jcrist
Copy link
Member

jcrist commented Apr 3, 2024

Apologies for the drive by comment (I'm on leave, so don't block merging this on me). IMO we do want to figure out a consistent story around the artifacts produced by to_parquet/to_csv across backends. Having one backend produce a directory and another produce just a single file feels odd IMO and defeats the portable nature of ibis code. See #8584 for an issue outlining the issue (IMO options 4 or 5 are best). I'd rather we do this "correct" now than have to deprecate and change this behavior later.

@mark-druffel
Copy link

mark-druffel commented Apr 11, 2024

@deepyaman Thanks again for opening this PR! We were able to write a work around in our use case so this isn't a rush for us although it would be nice so that we don't have to continue maintaining a work around...

On a separate note, while implementing our work around I tried to use compile as you did in your code. I'm guessing I'm just using it improperly or it's maybe a version issue... I tried it a few different ways and always got back to the same error:

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

I'm just sharing this in case it's an actual issue. I'm using pyspark 3.3.4 & ibis 8.0.0. Below is a reproducible example.

import ibis
from ibis import _
from pyspark.sql import SparkSession
ibis.options.interactive = False

filepath = "penguins.parquet"
table_name = "penguins"
ibis.examples.penguins.fetch().to_parquet(path = filepath)
spark = SparkSession.builder.getOrCreate()
ispark = ibis.pyspark.connect(spark)
df = ispark.read_parquet(filepath, table_name = table_name)
expr = (df
 .filter(_.bill_length_mm < 37)
 .filter(_.bill_depth_mm > 17)
 .group_by(by = [_.sex, _.species])
 .agg(
     n = _.count(),
     avg_bill_length_mm = _.bill_length_mm.mean(),
     avg_bill_depth_mm = _.bill_depth_mm.mean()
 )
 .order_by(-_.n)
).as_table()
df = ispark._session.sql(ispark.compile(expr))

Error

AttributeError                            Traceback (most recent call last)
Cell In[31], line 23
     11 df = ispark.read_parquet(filepath, table_name = table_name)
     12 expr = (df
     13  .filter(_.bill_length_mm < 37)
     14  .filter(_.bill_depth_mm > 17)
   (...)
     21  .order_by(-_.n)
     22 ).as_table()
---> 23 df = ispark._session.sql(ispark.compile(expr))

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/pyspark/sql/session.py:1034, in SparkSession.sql(self, sqlQuery, **kwargs)
   1032     sqlQuery = formatter.format(sqlQuery, **kwargs)
   1033 try:
-> 1034     return DataFrame(self._jsparkSession.sql(sqlQuery), self)
   1035 finally:
   1036     if len(kwargs) > 0:

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1313, in JavaMember.__call__(self, *args)
   1312 def __call__(self, *args):
-> 1313     args_command, temp_args = self._build_args(*args)
   1315     command = proto.CALL_COMMAND_NAME +\
   1316         self.command_header +\
   1317         args_command +\
   1318         proto.END_COMMAND_PART
   1320     answer = self.gateway_client.send_command(command)

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1283, in JavaMember._build_args(self, *args)
   1279     new_args = args
   1280     temp_args = []
   1282 args_command = "".join(
-> 1283     [get_command_part(arg, self.pool) for arg in new_args])
   1285 return args_command, temp_args

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/java_gateway.py:1283, in <listcomp>(.0)
   1279     new_args = args
   1280     temp_args = []
   1282 args_command = "".join(
-> 1283     [get_command_part(arg, self.pool) for arg in new_args])
   1285 return args_command, temp_args

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/py4j/protocol.py:298, in get_command_part(parameter, python_proxy_pool)
    296         command_part += ";" + interface
    297 else:
--> 298     command_part = REFERENCE_TYPE + parameter._get_object_id()
    300 command_part += "\n"
    302 return command_part

File ~/miniconda3/envs/ibis_sparkDataset/lib/python3.10/site-packages/pyspark/sql/dataframe.py:1988, in DataFrame.__getattr__(self, name)
   1978 """Returns the :class:`Column` denoted by ``name``.
   1979 
   1980 .. versionadded:: 1.3.0
   (...)
   1985 [Row(age=2), Row(age=5)]
   1986 """
   1987 if name not in self.columns:
-> 1988     raise AttributeError(
   1989         "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)
   1990     )
   1991 jc = self._jdf.apply(name)
   1992 return Column(jc)

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

@cpcloud cpcloud added this to the 9.2 milestone Jun 13, 2024
@cpcloud
Copy link
Member

cpcloud commented Jul 15, 2024

@deepyaman Is this PR still viable? @chloeh13q Implemented the various to_*_dir methods recently, so maybe this can be closed?

@deepyaman
Copy link
Contributor Author

@deepyaman Is this PR still viable? @chloeh13q Implemented the various to_*_dir methods recently, so maybe this can be closed?

I'll double-check tomorrow!

@cpcloud cpcloud removed this from the 9.2 milestone Jul 16, 2024
@deepyaman
Copy link
Contributor Author

@cpcloud @chloeh13q I think most things are covered, but the changes to to_delta should still be relevant. I can update this PR (including title) to just have that, if it makes sense.

@cpcloud
Copy link
Member

cpcloud commented Jul 18, 2024

@deepyaman Yeah, let's wrap up the to_delta bits and then merge this!

@cpcloud cpcloud added pyspark The Apache PySpark backend feature Features or general enhancements labels Jul 18, 2024
@cpcloud cpcloud added this to the 9.2 milestone Jul 18, 2024
@deepyaman deepyaman force-pushed the feat/flink/native-parquet-and-csv-export branch from 5026500 to 635d98c Compare July 18, 2024 13:06
@deepyaman deepyaman changed the title feat(pyspark): implement native CSV/Parquet export fix(pyspark): run pre-execute hooks for to_delta Jul 18, 2024
@deepyaman
Copy link
Contributor Author

@deepyaman Yeah, let's wrap up the to_delta bits and then merge this!

Pared it down to just that change.

@cpcloud cpcloud enabled auto-merge (squash) July 18, 2024 13:20
@cpcloud cpcloud merged commit fe0466a into ibis-project:main Jul 18, 2024
83 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Features or general enhancements pyspark The Apache PySpark backend
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants