Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dask dataframe #2758

Merged
merged 4 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from dagster.core.utils import check_dagster_package_version

from .data_frame import DataFrame
from .executor import dask_executor
from .version import __version__

check_dagster_package_version('dagster-dask', __version__)

__all__ = ['dask_executor']
__all__ = [
'DataFrame',
'dask_executor',
]
898 changes: 898 additions & 0 deletions python_modules/libraries/dagster-dask/dagster_dask/data_frame.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
num1,num2
1,2
3,4
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"num1":1,"num2":2}
{"num1":3,"num2":4}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import shutil

import dask.dataframe as dd
import pytest
from dagster_dask import DataFrame
from dask.dataframe.utils import assert_eq

from dagster import InputDefinition, OutputDefinition, execute_solid, file_relative_path, solid
from dagster.utils.test import get_temp_dir


def create_dask_df():
path = file_relative_path(__file__, 'num.csv')
return dd.read_csv(path)


@pytest.mark.parametrize(
'file_type,read,kwargs',
[
pytest.param('csv', dd.read_csv, {'index': False}, id='csv'),
pytest.param('parquet', dd.read_parquet, {'write_index': False}, id='parquet'),
pytest.param('json', dd.read_json, {}, id='json'),
],
)
def test_dataframe_outputs(file_type, read, kwargs):
df = create_dask_df()

@solid(output_defs=[OutputDefinition(dagster_type=DataFrame, name='output_df')])
def return_df(_):
return df

with get_temp_dir() as temp_path:
shutil.rmtree(temp_path)
result = execute_solid(
return_df,
run_config={
'solids': {
'return_df': {
'outputs': [{'output_df': {file_type: {'path': temp_path, **kwargs}}}]
}
}
},
)
assert result.success
actual = read(f"{temp_path}/*")
assert assert_eq(actual, df)


@pytest.mark.parametrize(
'file_type',
[
pytest.param('csv', id='csv'),
pytest.param('parquet', id='parquet'),
pytest.param('json', id='json'),
],
)
def test_dataframe_inputs(file_type):
@solid(input_defs=[InputDefinition(dagster_type=DataFrame, name='input_df')])
def return_df(_, input_df):
return input_df

file_name = file_relative_path(__file__, f"num.{file_type}")
result = execute_solid(
return_df,
run_config={
'solids': {'return_df': {'inputs': {'input_df': {file_type: {'path': file_name}}}}}
},
)
assert result.success
assert assert_eq(result.output_value(), create_dask_df())
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dagster_pandas as dagster_pd
from dagster_dask import dask_executor
from dagster_dask import DataFrame, dask_executor

from dagster import (
InputDefinition,
Expand Down Expand Up @@ -88,3 +88,35 @@ def test_pandas_dask():
)

assert result.success


@solid(input_defs=[InputDefinition('df', DataFrame)])
def dask_solid(_, df): # pylint: disable=unused-argument
pass


@pipeline(mode_defs=[ModeDefinition(executor_defs=default_executors + [dask_executor])])
def dask_pipeline():
return dask_solid()


def test_dask():
run_config = {
'solids': {
'dask_solid': {
'inputs': {'df': {'csv': {'path': file_relative_path(__file__, 'ex*.csv')}}}
}
}
}

result = execute_pipeline(
ReconstructablePipeline.for_file(__file__, dask_pipeline.name),
run_config={
'storage': {'filesystem': {}},
'execution': {'dask': {'config': {'cluster': {'local': {'timeout': 30}}}}},
**run_config,
},
instance=DagsterInstance.local_temp(),
)

assert result.success
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-dask/dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# we need `pyarrow` for testing read/write parquet files.
pyarrow
2 changes: 1 addition & 1 deletion python_modules/libraries/dagster-dask/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_version():
'bokeh',
'dagster',
'dagster_graphql',
'dask>=1.2.2',
'dask[dataframe]>=1.2.2',
'distributed>=1.28.1',
],
extras_require={
Expand Down
1 change: 1 addition & 0 deletions python_modules/libraries/dagster-dask/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ deps =
-e ../dagster-aws
-e ../dagster-pandas
-e .
-r dev-requirements.txt
extras = yarn,pbs,kube
usedevelop = true
whitelist_externals =
Expand Down