Skip to content

Commit

Permalink
Add dagster_databricks package for Databricks integration (#2468)
Browse files Browse the repository at this point in the history
* Add dagster-databricks package

This package is closely modeled off the dagster_aws.emr subpackage and
provides the databricks_pyspark_step_launcher resource and the
DatabricksRunJobSolidDefinition solid for running Databricks jobs.

* Reference Databricks docs in dagster-databricks configs module

* Move build_pyspark_zip into dagster_pyspark utils module

* Fix style/minor issues in dagster-databricks

Specifically:

- triple single quotes instead of triple double quotes for docstrings
- single quotes instead of double quotes everywhere else
- oneline docstrings where possible; start on same line everywhere else
- rename 'is_terminal' to 'has_terminated'
- use 'databricks_run_id' instead of 'run_id' for clarity
- make DatabricksJobRunner.client a property
- remove unnecessary blank lines

* Add references to Databricks storage docs in 'main' script

* Add comment explaining global vars in databricks_step_main.py

* Fix Python 2 issues in dagster-databricks

* Check invariants when setting up storage in Databricks job

* Fix dependencies in dagster-databricks/tox.ini

* Move 'secret_scope' field into inner credentials object to simplify Databricks storage

* isort dagster-databricks

* Add pylint to tox.ini for dagster_databricks

* Install dagster-databricks in 'make install_dev_python_modules'

* Reference GitHub issue for better storage setup in databricks_step_main.py

* Uncomment dagster-azure related config

* Replace assert_called_once with call_count for Python3.5 compat

* Fix lint errors in databricks.py

* Improve handling of libraries by including required libs by default

* Fix version to match other dagster libraries

* Specify supported_pythons to exclude Python 3.8 from dagster-databricks tests on buildkite

See #1960.

* Add README for dagster-databricks

* Install dagster-databricks in dagster-examples tox.ini

* Update snapshot test for dagster example using databricks

* Add API docs for dagster_databricks

* Add coveragerc for dagster-databricks
  • Loading branch information
Ben Sully authored Jun 9, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent f8c89ee commit 19146d4
Showing 36 changed files with 2,318 additions and 75 deletions.
5 changes: 5 additions & 0 deletions .buildkite/pipeline.py
Original file line number Diff line number Diff line change
@@ -258,6 +258,11 @@ def postgres_extra_cmds_fn(_):
supported_pythons=[SupportedPython.V3_6, SupportedPython.V3_7],
extra_cmds_fn=dask_extra_cmds_fn,
),
ModuleBuildSpec(
'python_modules/libraries/dagster-databricks',
# See: https://github.com/dagster-io/dagster/issues/1960
supported_pythons=SupportedPythonsNo38,
),
ModuleBuildSpec('python_modules/libraries/dagster-flyte', supported_pythons=SupportedPython3s,),
ModuleBuildSpec(
'python_modules/libraries/dagster-gcp',
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ install_dev_python_modules:
-e python_modules/libraries/dagster-bash \
-e python_modules/libraries/dagster-celery \
-e python_modules/libraries/dagster-cron \
-e python_modules/libraries/dagster-databricks \
-e python_modules/libraries/dagster-datadog \
-e python_modules/libraries/dagster-dbt \
-e python_modules/libraries/dagster-gcp \
4 changes: 4 additions & 0 deletions docs/next/public/sitemap.xml
Original file line number Diff line number Diff line change
@@ -49,6 +49,10 @@
<loc>https://docs.dagster.io/docs/apidocs/libraries/dagster_dask</loc>
</url>

<url>
<loc>https://docs.dagster.io/docs/apidocs/libraries/dagster_databricks</loc>
</url>

<url>
<loc>https://docs.dagster.io/docs/apidocs/libraries/dagster_datadog</loc>
</url>
4 changes: 4 additions & 0 deletions docs/next/src/treeOfContents.json
Original file line number Diff line number Diff line change
@@ -164,6 +164,10 @@
"name": "Datadog (dagster_datadog)",
"path": "/docs/apidocs/libraries/dagster_datadog"
},
{
"name": "Databricks (dagster_databricks)",
"path": "/docs/apidocs/libraries/dagster_databricks"
},
{
"name": "DBT (dagster_dbt)",
"path": "/docs/apidocs/libraries/dagster_dbt"
26 changes: 26 additions & 0 deletions docs/sections/api/apidocs/libraries/dagster_databricks.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Databricks (dagster_databricks)
-------------------------

The ``dagster_databricks`` package provides two main pieces of functionality:

- a resource, ``databricks_pyspark_step_launcher``, which will execute a solid within a Databricks
context on a cluster, such that the ``pyspark`` resource uses the cluster's Spark instance; and
- a solid, ``DatabricksRunJobSolidDefinition``, which submits an external configurable job to
Databricks using the 'Run Now' API.

See the 'simple_pyspark' Dagster example for an example of how to use the resource.

Note that either S3 or Azure Data Lake Storage config **must** be specified for solids to succeed,
and the credentials for this storage must also be stored as a Databricks Secret and stored in the
resource config so that the Databricks cluster can access storage.

.. currentmodule:: dagster_databricks

.. autodata:: dagster_databricks.databricks_pyspark_step_launcher
:annotation: ResourceDefinition

.. autoclass:: dagster_databricks.DatabricksRunJobSolidDefinition

.. autoclass:: dagster_databricks.DatabricksJobRunner

.. autoclass:: dagster_databricks.DatabricksError
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
resources:
pyspark_step_launcher:
config:
run_config:
cluster:
new:
nodes:
node_types:
node_type_id: Standard_DS3_v2
size:
num_workers: 1
spark_version: 6.5.x-scala2.11
run_name: dagster-tests
databricks_host: uksouth.azuredatabricks.net
databricks_token:
env: DATABRICKS_TOKEN
local_pipeline_package_path: .
staging_prefix: /dagster-databricks-tests
storage:
s3:
secret_scope: dagster-databricks-tests
access_key_key: aws-access-key
secret_key_key: aws-secret-key
solids:
make_weather_samples:
inputs:
file_path: s3://dagster-databricks-tests/sfo_q2_weather_fixed_header.txt
storage:
s3:
config:
s3_bucket: dagster-databricks-tests
s3_prefix: simple-pyspark
34 changes: 27 additions & 7 deletions examples/dagster_examples/simple_pyspark/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'''Pipeline definitions for the simple_pyspark example.'''
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.s3 import s3_plus_default_storage_defs, s3_resource
from dagster_databricks import databricks_pyspark_step_launcher
from dagster_pyspark import pyspark_resource

from dagster import ModeDefinition, PresetDefinition, pipeline
@@ -18,8 +19,8 @@
)


prod_mode = ModeDefinition(
name='prod',
prod_emr_mode = ModeDefinition(
name='prod_emr',
resource_defs={
'pyspark_step_launcher': emr_pyspark_step_launcher,
'pyspark': pyspark_resource,
@@ -29,8 +30,19 @@
)


prod_databricks_mode = ModeDefinition(
name='prod_databricks',
resource_defs={
'pyspark_step_launcher': databricks_pyspark_step_launcher,
'pyspark': pyspark_resource,
's3': s3_resource,
},
system_storage_defs=s3_plus_default_storage_defs,
)


@pipeline(
mode_defs=[local_mode, prod_mode],
mode_defs=[local_mode, prod_emr_mode, prod_databricks_mode],
preset_defs=[
PresetDefinition.from_pkg_resources(
name='local',
@@ -41,18 +53,26 @@
],
),
PresetDefinition.from_pkg_resources(
name='prod',
mode='prod',
name='prod_emr',
mode='prod_emr',
pkg_resource_defs=[
('dagster_examples.simple_pyspark.environments', 'prod_emr.yaml'),
('dagster_examples.simple_pyspark.environments', 's3_storage.yaml'),
],
),
PresetDefinition.from_pkg_resources(
name='prod_databricks',
mode='prod_databricks',
pkg_resource_defs=[
('dagster_examples.simple_pyspark.environments', 'prod.yaml'),
('dagster_examples.simple_pyspark.environments', 'prod_databricks.yaml'),
('dagster_examples.simple_pyspark.environments', 's3_storage.yaml'),
],
),
],
)
def simple_pyspark_sfo_weather_pipeline():
'''Computes some basic statistics over weather data from SFO airport'''
make_daily_temperature_high_diffs(make_daily_temperature_highs(make_weather_samples()),)
make_daily_temperature_high_diffs(make_daily_temperature_highs(make_weather_samples()))


def define_simple_pyspark_sfo_weather_pipeline():
Loading

0 comments on commit 19146d4

Please sign in to comment.