Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pyspark dataframe_loader #2790

Merged
merged 5 commits into from
Aug 20, 2020

Conversation

DavidKatz-il
Copy link
Contributor

Add dataframe_loader to support inputs.

@DavidKatz-il DavidKatz-il changed the title Add dataframe_loader Add pyspark dataframe_loader Aug 7, 2020
@DavidKatz-il DavidKatz-il changed the title Add pyspark dataframe_loader Add pyspark dataframe_loader Aug 7, 2020
@DavidKatz-il DavidKatz-il force-pushed the add-pyspark-dataframe-loader branch 2 times, most recently from 33bc81b to 0ef8287 Compare August 10, 2020 21:22
@natekupp
Copy link
Contributor

hey @sryza can you take a look at this one too?

@sryza
Copy link
Contributor

sryza commented Aug 11, 2020

Thanks for your contribution @DavidKatz-il ! I'm impressed with your thoroughness.

An important area to think through is that Dagster should be able to support any version of Spark. I'm concerned that, by wrapping every individual option to every Spark input format in Dagster config, it will tie us too closely to specific Spark versions. I.e. Spark might add or remove an option, and then it becomes unclear which Spark version Dagster's configuration should reflect.

Although it doesn't give us the same advantage of having all the options nicely display in the Dagit config editor, I think a safer approach would be to use permissive dictionaries for the set of options within each input format. E.g.

'jdbc': {},
'orc': {},

This allows Spark itself to remain as the arbiter of what config options are available.

@DavidKatz-il
Copy link
Contributor Author

Hey @sryza, thanks for reviewing.
Right now the user can add options that we didn't configure because we are using Permissive.
So you suggesting to put an empty Permissive?
e.g.

'jdbc': Permissive(),
'orc': Permissive(),
...

If so, what about options that are required, e.g. 'path'.

@sryza
Copy link
Contributor

sryza commented Aug 12, 2020

Thought about it a little bit more, and I think you're right. As long as we use Permissive, it's helpful to include the detailed config.

I just launched a build for the PR.

@flvndh
Copy link
Contributor

flvndh commented Aug 12, 2020

Great contribution @DavidKatz-il! What do you think of adding support for Apache Iceberg and Delta Lake as well ?

@DavidKatz-il
Copy link
Contributor Author

Hi @flvndh, Does it require packages outside pyspark?

@flvndh
Copy link
Contributor

flvndh commented Aug 12, 2020

@DavidKatz-il just jars packages : org.apache.iceberg:iceberg-spark3-runtime:0.9.0 and io.delta:delta-core_2.12:0.7.0.

@DavidKatz-il
Copy link
Contributor Author

@sryza what do you think?

@sryza
Copy link
Contributor

sryza commented Aug 12, 2020

This change looks ready to go in. I think adding support for Iceberg and Delta Lake would be cool. @DavidKatz-il - if you're interested in adding those to this PR, then great. If you'd like me to merge this as-is, also great.

@DavidKatz-il
Copy link
Contributor Author

I added the file_type 'other'.

e.g.

'other': {    
    'path': 'path_to_file',    
    'format': 'delta',    
    ...
}

@@ -66,3 +74,28 @@ def return_df(_):
assert result.success
actual = read(temp_path)
assert sorted(df.collect()) == sorted(actual.collect())


@pytest.mark.parametrize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you mind adding tests for the "other" cases for input and output?

@sryza
Copy link
Contributor

sryza commented Aug 18, 2020

@schrockn brought up the question of whether we should avoid the "other" escape hatch in favor of making it easy to compose type materializers: https://dagster.phacility.com/D4188.

@DavidKatz-il - it could take some time for us to reach some resolution on that, but I still think the core of this PR, which adds support for the native Spark formats, is useful. If you want to take out the "other" option for now, I can merge the rest of it. Thanks for bearing with us on the thrash here.

EDIT: it looks like we actually got to a resolution on that issue already ^^, and the escape hatch you added here seems like a reasonable direction. It would still be good to have tests for it.

'compression': 'gzip',
}
file_type: dict(
{'mode': 'overwrite', 'compression': 'gzip',}, **options,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm noticing a syntax error here for in the python 2.7 tests: https://buildkite.com/dagster/dagster/builds/14227#9a8db1d9-518e-4491-87a8-d247ca90946c

Copy link
Contributor Author

@DavidKatz-il DavidKatz-il Aug 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comma at the end of the line caused this error on python 2.7.
 

@sryza
Copy link
Contributor

sryza commented Aug 20, 2020

This looks good. Merging. Thanks for your contribution @DavidKatz-il !

@sryza sryza merged commit acfcbba into dagster-io:master Aug 20, 2020
@DavidKatz-il DavidKatz-il deleted the add-pyspark-dataframe-loader branch August 20, 2020 16:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants