Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Databricks integration #2458

Closed
sd2k opened this issue May 14, 2020 · 10 comments · Fixed by #2468
Closed

Databricks integration #2458

sd2k opened this issue May 14, 2020 · 10 comments · Fixed by #2468
Labels
area: integrations Related to general integrations, including requests for a new integration type: project Big Projects
Milestone

Comments

@sd2k
Copy link
Contributor

sd2k commented May 14, 2020

I've been following Dagster for a month or so as we're looking to revamp our data pipelines at $company. We'll be using Spark for the majority of our ETLs, but using Databricks to manage our infrastructure rather than manually managing Spark clusters.

I noticed that there was an EMR launcher for launching PySpark solids added a week or so ago. I haven't used EMR much, but think Databricks has a similar workflow where jobs are submitted using their Jobs API (which I've been using in the past through Airflow).

Does this sound like a good candidate for a dagster-databricks integration library? And if so, are their any plans to support Databricks already, or would you be accepting contributions there?

Thanks!

@schrockn
Copy link
Member

yes please! that would be fantastic

@mgasner mgasner added area: integrations Related to general integrations, including requests for a new integration type: project Big Projects labels May 14, 2020
@sd2k
Copy link
Contributor Author

sd2k commented May 16, 2020

OK cool. I've made a start on this and have an external step launcher which is working, but I think I may need to add a new system storage for DBFS (the Databricks filesystem) at the very least - I'm not using AWS so can't rely on the S3 storage unfortunately. I'll also have to add an Azure storage system too later but that can be a separate PR :)


I've noticed something weird happening when trying to unpickle the events pickled by my main function (the entrypoint used by the remote PySpark). It looks like when the event objects are pickled and unpickled they end up as the parent namedtuple classes:

# print(events) on remote PySpark:
[DagsterEvent(event_type_value='STEP_START', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=None, message='Started execution of step "make_df_solid.compute".'), DagsterEvent(event_type_value='STEP_OUTPUT', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=StepOutputData(step_output_handle=StepOutputHandle(step_key='make_df_solid.compute', output_name='result'), intermediate_materialization=None, type_check_data=TypeCheckData(success=True, label='result', description=None, metadata_entries=[])), message='Yielded output "result" of type "PySparkDataFrame". (Type check passed).'), DagsterEvent(event_type_value='STEP_SUCCESS', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=StepSuccessData(duration_ms=285.62570000030973), message='Finished execution of step "make_df_solid.compute" in 285ms.')]

# contents of events.pkl, downloaded from DBFS, and unpickled locally:
In [1]: import pickle

In [2]: events = pickle.load(open("events.pkl", "rb"))

In [3]: events
Out[3]:
[_DagsterEvent(event_type_value='STEP_START', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=_SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=None, message='Started execution of step "make_df_solid.compute".'),
 _DagsterEvent(event_type_value='STEP_OUTPUT', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=_SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=_StepOutputData(step_output_handle=_StepOutputHandle(step_key='make_df_solid.compute', output_name='result'), intermediate_materialization=None, type_check_data=_TypeCheckData(success=True, label='result', description=None, metadata_entries=[])), message='Yielded output "result" of type "PySparkDataFrame". (Type check passed).'),
 _DagsterEvent(event_type_value='STEP_SUCCESS', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=_SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=_StepSuccessData(duration_ms=285.62570000030973), message='Finished execution of step "make_df_solid.compute" in 285ms.')]

I've worked around it by using dagster.serdes.{de}serialize_value, but I can't tell why it's happening here but not in the local_external_step_launcher. Python versions are both close if not identical (3.7.5 locally, 3.7.3 remotely). Has this behaviour been seen before?

@schrockn
Copy link
Member

Wow that's really strange. I've never seen that behavior before. Have you been able to get a repro in a more controlled environment? Or is this only happening in this environment where you are pickling in the remote execution environment that is different from the local environment?

@schrockn
Copy link
Member

import pickle
from collections import namedtuple

from dagster.serdes import whitelist_for_serdes


@whitelist_for_serdes
class ADataClass(namedtuple('_ADataClass', 'some_data')):
    pass

inst = ADataClass('foo')
print(inst)
print(pickle.loads(pickle.dumps(inst)))

Works as expected

$ python repro_pickle.py
ADataClass(some_data='foo')
ADataClass(some_data='foo')

@sd2k
Copy link
Contributor Author

sd2k commented May 16, 2020

Ah, it looks like it happens as soon as PySpark is imported; if you add import pyspark to the top of your repro script and rerun it will occur:

$ PYENV_VERSION=dagster37 pyenv exec python repro_pickle.py
ADataClass(some_data='foo')
_ADataClass(some_data='foo')

A bit of digging implicates this: https://jira.apache.org/jira/browse/SPARK-22674. I can't tell how the remote EMR executor is working since that must also import pyspark, but that seems to be the issue.

@schrockn
Copy link
Member

That pyspark bug is wild. Can you just use dagster.serdes instead?

@natekupp
Copy link
Contributor

hey @sd2k - let me know when you start looking at implementing the system storage for DBFS, happy to help. You can check out this diff for an example of what's needed to add a new system storage: https://dagster.phacility.com/D2259

@sd2k
Copy link
Contributor Author

sd2k commented May 17, 2020

That pyspark bug is wild. Can you just use dagster.serdes instead?

Yep I can do, works fine.

hey @sd2k - let me know when you start looking at implementing the system storage for DBFS, happy to help. You can check out this diff for an example of what's needed to add a new system storage: https://dagster.phacility.com/D2259

Cheers! I'll take a look today and let you know if I have any questions.

@sd2k
Copy link
Contributor Author

sd2k commented May 17, 2020

Hmm it does look like a much bigger undertaking than I expected 😄 I'm a bit unsure whether this is the best way to proceed!

The workflow I think Databricks recommend is to not use the DBFS root, instead preferring to either mount an object storage account or access the object store directly (e.g. S3 and Azure. Either way, some kind of config needs executing prior to the pipeline run to mount the object store or set credentials for API calls.

Mounting the object store allows you to access it via DBFS using Spark APIs and local filesystem APIs which is convenient for interactive/notebook use, but not much different when running jobs. I get the feeling that a DBFSSystemStorage is the wrong approach, and instead a Databricks job should require either an Azure or AWS storage account resource to handle persistence. To read/write data inside Spark we could then just rely on the SparkDataFrameS3StoragePlugin (and an equivalent Azure version) to read/write data.

If that sounds reasonable then I need to work on an Azure storage system!

@sd2k
Copy link
Contributor Author

sd2k commented May 17, 2020

Another question is how to handle 'Delta Lake' storage. For my purposes the basic idea is that Spark DataFrames would be saved using df.write.format('delta').save('<path>') rather than df.write.parquet('<path>'). My hunch is that it would currently require another TypeStoragePlugin and another system storage indicating that the data was stored in a Delta Lake, but the actual content of the new subclasses should be minimal since a Delta Lake is just a wrapper around parquet files on a filesystem anyway.

I also need to look into the lakehouse package and see if there's any overlap there, but I imagine it complements rather than overlaps!

@natekupp natekupp added this to the 0.8.0 milestone Jun 1, 2020
@schrockn schrockn modified the milestones: 0.8.0, 0.8.blog Jun 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: integrations Related to general integrations, including requests for a new integration type: project Big Projects
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants