diff --git a/.vscode/settings.json b/.vscode/settings.json index 7c57ac3e73386..84360ef17f9fc 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,7 +10,7 @@ "-s" ], "python.unitTest.pyTestEnabled": true, - "python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python", + "python.pythonPath": "/Users/schrockn/code/venvs/dagster/bin/python", "python.linting.lintOnSave": true, "python.linting.pep8Enabled": false, "python.linting.mypyEnabled": false, diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index e6e5136a4b8c0..1b0934dbc790e 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -1,10 +1,13 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from builtins import * # pylint: disable=W0622,W0401 +import json + from dagster import check from dagster.core import types +from dagster.core.execution import DagsterExecutionContext -from .definitions import InputDefinition, Solid +from .definitions import (InputDefinition, create_dagster_single_file_input) from .graph import DagsterPipeline @@ -19,3 +22,15 @@ def input_definition(**kwargs): def file_input_definition(argument_def_dict=None, **kwargs): check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict') return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs) + + +def create_json_input(name): + check.str_param(name, 'name') + + def load_file(context, path): + check.inst_param(context, 'context', DagsterExecutionContext) + check.str_param(path, 'path') + with open(path) as ff: + return json.load(ff) + + return create_dagster_single_file_input(name, load_file) diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 648fe71a0e11c..ed856ab6d5260 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -6,6 +6,7 @@ from dagster import check from dagster.utils import has_context_argument +from dagster.core import create_json_input from dagster.core.definitions import Solid from dagster.core.execution import DagsterExecutionContext from dagster.core.errors import (DagsterUserCodeExecutionError, DagsterInvariantViolationError) @@ -14,7 +15,9 @@ create_dagster_pd_csv_output, create_dagster_pd_dependency_input, create_dagster_pd_parquet_output, + create_dagster_pd_read_table_input, ) +from dagster.core import (create_json_input) def solid(**kwargs): @@ -83,7 +86,7 @@ def dataframe_solid(*args, name, inputs, transform_fn=None, **kwargs): return Solid( name=name, inputs=inputs, - outputs=[csv_output(), parquet_output()], + outputs=[csv_output(), parquet_output(), null_output()], transform_fn=_dependency_transform_wrapper(name, transform_fn), **kwargs ) @@ -103,5 +106,17 @@ def csv_output(): return create_dagster_pd_csv_output() +def read_table_input(name, delimiter=',', **read_table_kwargs): + return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs) + + +def json_input(name): + return create_json_input(name) + + def parquet_output(): return create_dagster_pd_parquet_output() + + +def null_output(): + return create_dagster_pd_csv_output() diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py index 1caedf0ba2925..71d26eb66d787 100644 --- a/dagster/pandas_kernel/definitions.py +++ b/dagster/pandas_kernel/definitions.py @@ -57,14 +57,15 @@ def check_path(context, path): return create_dagster_single_file_input(name, check_path) -def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs): + +def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs): check.str_param(name, 'name') check.str_param(delimiter, 'delimiter') def check_path(context, path): check.inst_param(context, 'context', DagsterExecutionContext) check.str_param(path, 'path') - df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs) + df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs) context.metric('rows', df.shape[0]) return df @@ -97,3 +98,19 @@ def output_fn_inst(df, context, arg_dict): return OutputDefinition( name='PARQUET', output_fn=output_fn_inst, argument_def_dict={'path': types.PATH} ) + + +def create_dagster_pd_null_output(): + def output_fn_inst(context, arg_dict): + # check.inst_param(df, 'df', pd.DataFrame) + check.inst_param(context, 'context', DagsterExecutionContext) + check.dict_param(arg_dict, 'arg_dict') + # path = check.str_elem(arg_dict, 'path') + + # df.to_csv(path, index=False) + + return OutputDefinition( + name='None', + output_fn=output_fn_inst, + # argument_def_dict={'path': types.PATH} + ) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index c0db2e2fea6c6..fee2939db7233 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -70,6 +70,26 @@ def output_fn(sql_expr, context, arg_dict): ) +def truncate_and_insert_table_output(): + def output_fn(sql_expr, context, arg_dict): + check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression) + check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext) + check.dict_param(arg_dict, 'arg_dict') + + output_table_name = check.str_elem(arg_dict, 'table_name') + total_sql = '''TRUNCATE TABLE {output_table_name}; + INSERT INTO {output_table_name} ({sql_text})'''.format( + output_table_name=output_table_name, sql_text=sql_expr.sql_text + ) + context.engine.connect().execute(total_sql) + + return OutputDefinition( + name='TRUNCATE_AND_INSERT', + output_fn=output_fn, + argument_def_dict={'table_name': types.STRING}, + ) + + def _table_input_fn(context, arg_dict): check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext) @@ -126,7 +146,7 @@ def create_sql_solid(name, inputs, sql_text): name, inputs=inputs, transform_fn=create_sql_transform(sql_text), - outputs=[create_table_output()], + outputs=[create_table_output(), truncate_and_insert_table_output()], ) diff --git a/dagster/sqlalchemy_kernel/common.py b/dagster/sqlalchemy_kernel/common.py index 43e3aa71712fd..d02c64bbba35f 100644 --- a/dagster/sqlalchemy_kernel/common.py +++ b/dagster/sqlalchemy_kernel/common.py @@ -35,4 +35,7 @@ def execute_sql_text_on_context(context, sql_text): finally: cursor.close() else: - context.engine.connect().execute(sql_text) + connection = context.engine.connect() + transaction = connection.begin() + connection.execute(sql_text) + transaction.commit() diff --git a/dagster/sqlalchemy_kernel/templated.py b/dagster/sqlalchemy_kernel/templated.py index 73adda1485511..4b1011494a933 100644 --- a/dagster/sqlalchemy_kernel/templated.py +++ b/dagster/sqlalchemy_kernel/templated.py @@ -1,3 +1,4 @@ +import logging import jinja2 import dagster @@ -16,7 +17,14 @@ def _create_table_input(name, depends_on=None): ) -def create_templated_sql_transform_solid(name, sql, table_arguments, output, dependencies=None): +def create_templated_sql_transform_solid( + name, + sql, + table_arguments, + output, + dependencies=None, + extra_inputs=[], +): ''' Create a solid that is a templated sql statement. This assumes that the sql statement is creating or modifying a table, and that that table will be used downstream in the pipeline @@ -93,7 +101,7 @@ def create_templated_sql_transform_solid(name, sql, table_arguments, output, dep dep_inputs = [_create_table_input(dep.name, depends_on=dep) for dep in dependencies] return Solid( name=name, - inputs=table_inputs + dep_inputs, + inputs=table_inputs + dep_inputs + extra_inputs, transform_fn=_create_templated_sql_transform_with_output(sql, output), outputs=[], )