Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running edits with no tests #8

Merged
merged 32 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4541258
create_json_input
abegong May 23, 2018
0ee9ca7
Get json_input working
abegong May 23, 2018
bbfc298
Add read_table_input. Make arg names more sensible
abegong May 23, 2018
9175c33
Add json dependencies. Load the file, not the path
abegong May 23, 2018
0408353
Implement horrible, horrible SQL syntax hack.
abegong May 23, 2018
47cb3d8
Implement truncate_and_insert_table_output
abegong May 23, 2018
ea442a4
create_json_input
abegong May 23, 2018
06f3709
Get json_input working
abegong May 23, 2018
8cc0d0d
Add read_table_input. Make arg names more sensible
abegong May 23, 2018
9354181
Add json dependencies. Load the file, not the path
abegong May 23, 2018
a20088a
Implement horrible, horrible SQL syntax hack.
abegong May 23, 2018
9b903e2
Implement truncate_and_insert_table_output
abegong May 23, 2018
818fbe9
Add null_output to dagster_pd
abegong May 28, 2018
add8a40
Merge branch 'running_edits_with_no_tests' of github.com:schrockn/dag…
abegong May 28, 2018
5b695b7
Merge branch 'master' into running_edits_with_no_tests
abegong May 28, 2018
6dc8f11
Decruft
abegong May 28, 2018
b9908e0
Add extra_inputs to create_templated_sql_transform_solid
abegong May 31, 2018
070d326
create_json_input
abegong May 23, 2018
bedd218
Get json_input working
abegong May 23, 2018
ca587ac
Add read_table_input. Make arg names more sensible
abegong May 23, 2018
7ef3893
Add json dependencies. Load the file, not the path
abegong May 23, 2018
c61977f
Implement horrible, horrible SQL syntax hack.
abegong May 23, 2018
1c7d7d2
Implement truncate_and_insert_table_output
Jun 1, 2018
c513c00
Add null_output to dagster_pd
abegong May 28, 2018
2e5e292
create_json_input
abegong May 23, 2018
d3dcba6
Get json_input working
abegong May 23, 2018
117fdd1
Add read_table_input. Make arg names more sensible
abegong May 23, 2018
ffa3a75
Add json dependencies. Load the file, not the path
abegong May 23, 2018
7db955e
Implement truncate_and_insert_table_output
abegong May 23, 2018
2dcd6d2
Add extra_inputs to create_templated_sql_transform_solid
abegong May 31, 2018
fc50b03
Merge branch 'running_edits_with_no_tests' of github.com:schrockn/dag…
Jun 1, 2018
3a28885
minor cleanup
Jun 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"-s"
],
"python.unitTest.pyTestEnabled": true,
"python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python",
"python.pythonPath": "/Users/schrockn/code/venvs/dagster/bin/python",
"python.linting.lintOnSave": true,
"python.linting.pep8Enabled": false,
"python.linting.mypyEnabled": false,
Expand Down
17 changes: 16 additions & 1 deletion dagster/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import (absolute_import, division, print_function, unicode_literals)
from builtins import * # pylint: disable=W0622,W0401

import json

from dagster import check
from dagster.core import types
from dagster.core.execution import DagsterExecutionContext

from .definitions import InputDefinition, Solid
from .definitions import (InputDefinition, create_dagster_single_file_input)
from .graph import DagsterPipeline


Expand All @@ -19,3 +22,15 @@ def input_definition(**kwargs):
def file_input_definition(argument_def_dict=None, **kwargs):
check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict')
return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs)


def create_json_input(name):
check.str_param(name, 'name')

def load_file(context, path):
check.inst_param(context, 'context', DagsterExecutionContext)
check.str_param(path, 'path')
with open(path) as ff:
return json.load(ff)

return create_dagster_single_file_input(name, load_file)
17 changes: 16 additions & 1 deletion dagster/pandas_kernel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster import check
from dagster.utils import has_context_argument

from dagster.core import create_json_input
from dagster.core.definitions import Solid
from dagster.core.execution import DagsterExecutionContext
from dagster.core.errors import (DagsterUserCodeExecutionError, DagsterInvariantViolationError)
Expand All @@ -14,7 +15,9 @@
create_dagster_pd_csv_output,
create_dagster_pd_dependency_input,
create_dagster_pd_parquet_output,
create_dagster_pd_read_table_input,
)
from dagster.core import (create_json_input)


def solid(**kwargs):
Expand Down Expand Up @@ -83,7 +86,7 @@ def dataframe_solid(*args, name, inputs, transform_fn=None, **kwargs):
return Solid(
name=name,
inputs=inputs,
outputs=[csv_output(), parquet_output()],
outputs=[csv_output(), parquet_output(), null_output()],
transform_fn=_dependency_transform_wrapper(name, transform_fn),
**kwargs
)
Expand All @@ -103,5 +106,17 @@ def csv_output():
return create_dagster_pd_csv_output()


def read_table_input(name, delimiter=',', **read_table_kwargs):
return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)


def json_input(name):
return create_json_input(name)


def parquet_output():
return create_dagster_pd_parquet_output()


def null_output():
return create_dagster_pd_csv_output()
21 changes: 19 additions & 2 deletions dagster/pandas_kernel/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ def check_path(context, path):

return create_dagster_single_file_input(name, check_path)

def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs):

def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs):
check.str_param(name, 'name')
check.str_param(delimiter, 'delimiter')

def check_path(context, path):
check.inst_param(context, 'context', DagsterExecutionContext)
check.str_param(path, 'path')
df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs)
df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs)
context.metric('rows', df.shape[0])
return df

Expand Down Expand Up @@ -97,3 +98,19 @@ def output_fn_inst(df, context, arg_dict):
return OutputDefinition(
name='PARQUET', output_fn=output_fn_inst, argument_def_dict={'path': types.PATH}
)


def create_dagster_pd_null_output():
def output_fn_inst(context, arg_dict):
# check.inst_param(df, 'df', pd.DataFrame)
check.inst_param(context, 'context', DagsterExecutionContext)
check.dict_param(arg_dict, 'arg_dict')
# path = check.str_elem(arg_dict, 'path')

# df.to_csv(path, index=False)

return OutputDefinition(
name='None',
output_fn=output_fn_inst,
# argument_def_dict={'path': types.PATH}
)
22 changes: 21 additions & 1 deletion dagster/sqlalchemy_kernel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ def output_fn(sql_expr, context, arg_dict):
)


def truncate_and_insert_table_output():
def output_fn(sql_expr, context, arg_dict):
check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression)
check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
check.dict_param(arg_dict, 'arg_dict')

output_table_name = check.str_elem(arg_dict, 'table_name')
total_sql = '''TRUNCATE TABLE {output_table_name};
INSERT INTO {output_table_name} ({sql_text})'''.format(
output_table_name=output_table_name, sql_text=sql_expr.sql_text
)
context.engine.connect().execute(total_sql)

return OutputDefinition(
name='TRUNCATE_AND_INSERT',
output_fn=output_fn,
argument_def_dict={'table_name': types.STRING},
)


def _table_input_fn(context, arg_dict):

check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
Expand Down Expand Up @@ -126,7 +146,7 @@ def create_sql_solid(name, inputs, sql_text):
name,
inputs=inputs,
transform_fn=create_sql_transform(sql_text),
outputs=[create_table_output()],
outputs=[create_table_output(), truncate_and_insert_table_output()],
)


Expand Down
5 changes: 4 additions & 1 deletion dagster/sqlalchemy_kernel/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ def execute_sql_text_on_context(context, sql_text):
finally:
cursor.close()
else:
context.engine.connect().execute(sql_text)
connection = context.engine.connect()
transaction = connection.begin()
connection.execute(sql_text)
transaction.commit()
12 changes: 10 additions & 2 deletions dagster/sqlalchemy_kernel/templated.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import jinja2

import dagster
Expand All @@ -16,7 +17,14 @@ def _create_table_input(name, depends_on=None):
)


def create_templated_sql_transform_solid(name, sql, table_arguments, output, dependencies=None):
def create_templated_sql_transform_solid(
name,
sql,
table_arguments,
output,
dependencies=None,
extra_inputs=[],
):
'''
Create a solid that is a templated sql statement. This assumes that the sql statement
is creating or modifying a table, and that that table will be used downstream in the pipeline
Expand Down Expand Up @@ -93,7 +101,7 @@ def create_templated_sql_transform_solid(name, sql, table_arguments, output, dep
dep_inputs = [_create_table_input(dep.name, depends_on=dep) for dep in dependencies]
return Solid(
name=name,
inputs=table_inputs + dep_inputs,
inputs=table_inputs + dep_inputs + extra_inputs,
transform_fn=_create_templated_sql_transform_with_output(sql, output),
outputs=[],
)
Expand Down