From 4541258c3beb37e48a0cfbc76038367162ee377f Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 09:52:27 -0700 Subject: [PATCH 01/29] create_json_input --- dagster/core/__init__.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 9370bba033b51..97dfba9e78770 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -19,3 +19,17 @@ def input_definition(**kwargs): def file_input_definition(argument_def_dict=None, **kwargs): check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict') return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs) + + +def create_json_input(name): + check.str_param(name, 'name') + + #Note: I don't understand the function of check_path. + def check_path(context, path): + check.inst_param(context, 'context', DagsterExecutionContext) + check.str_param(path, 'path') + json_obj = json.loads(path) + # context.metric('rows', df.shape[0]) + return json_obj + + return create_dagster_json_input(name, check_path) \ No newline at end of file From 0ee9ca746e0f1c20d794660bd4281e276cefeae7 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:05:38 -0700 Subject: [PATCH 02/29] Get json_input working --- dagster/core/__init__.py | 4 ++-- dagster/pandas_kernel/__init__.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 97dfba9e78770..49a5ddc541289 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -4,7 +4,7 @@ from dagster import check from dagster.core import types -from .definitions import InputDefinition +from .definitions import InputDefinition, create_dagster_single_file_input from .graph import DagsterPipeline @@ -32,4 +32,4 @@ def check_path(context, path): # context.metric('rows', df.shape[0]) return json_obj - return create_dagster_json_input(name, check_path) \ No newline at end of file + return create_dagster_single_file_input(name, check_path) \ No newline at end of file diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 79ed93b555a8c..76af4d13c8879 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -14,6 +14,9 @@ create_dagster_pd_dependency_input, create_dagster_pd_parquet_output, ) +from dagster.core import ( + create_json_input +) def solid(**kwargs): @@ -95,10 +98,11 @@ def single_path_arg(input_name, path): def csv_input(name, delimiter=',', **read_csv_kwargs): return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs) - def csv_output(): return create_dagster_pd_csv_output() +def json_input(name): + return create_json_input(name) def parquet_output(): return create_dagster_pd_parquet_output() From bbfc29869015c44f77e58c37e33531db234a1b36 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:11:41 -0700 Subject: [PATCH 03/29] Add read_table_input. Make arg names more sensible --- dagster/pandas_kernel/__init__.py | 4 ++++ dagster/pandas_kernel/definitions.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 76af4d13c8879..3fbfb2965abc3 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -13,6 +13,7 @@ create_dagster_pd_csv_output, create_dagster_pd_dependency_input, create_dagster_pd_parquet_output, + create_dagster_pd_read_table_input, ) from dagster.core import ( create_json_input @@ -101,6 +102,9 @@ def csv_input(name, delimiter=',', **read_csv_kwargs): def csv_output(): return create_dagster_pd_csv_output() +def read_table_input(name, delimiter=',', **read_table_kwargs): + return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs) + def json_input(name): return create_json_input(name) diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py index 1caedf0ba2925..635a9a833d112 100644 --- a/dagster/pandas_kernel/definitions.py +++ b/dagster/pandas_kernel/definitions.py @@ -57,14 +57,14 @@ def check_path(context, path): return create_dagster_single_file_input(name, check_path) -def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs): +def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs): check.str_param(name, 'name') check.str_param(delimiter, 'delimiter') def check_path(context, path): check.inst_param(context, 'context', DagsterExecutionContext) check.str_param(path, 'path') - df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs) + df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs) context.metric('rows', df.shape[0]) return df From 9175c33dabe7da84c5564057b95efa10c65076de Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:16:43 -0700 Subject: [PATCH 04/29] Add json dependencies. Load the file, not the path --- dagster/core/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 49a5ddc541289..e6bf91e223368 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -7,6 +7,8 @@ from .definitions import InputDefinition, create_dagster_single_file_input from .graph import DagsterPipeline +from dagster.core.execution import DagsterExecutionContext +import json def pipeline(**kwargs): return DagsterPipeline(**kwargs) @@ -28,7 +30,7 @@ def create_json_input(name): def check_path(context, path): check.inst_param(context, 'context', DagsterExecutionContext) check.str_param(path, 'path') - json_obj = json.loads(path) + json_obj = json.load(open(path)) # context.metric('rows', df.shape[0]) return json_obj From 04083534a5a5a912f1b2c6a3875937d0395160d1 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 14:11:35 -0700 Subject: [PATCH 05/29] Implement horrible, horrible SQL syntax hack. --- dagster/sqlalchemy_kernel/__init__.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index 2a3adfabf22f8..e48ee454a0703 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -21,9 +21,24 @@ class DagsterSqlExpression: def __init__(self, sql_text): self.sql_text = check.str_param(sql_text, 'sql_text') + # @property + # def from_target(self): + # return f'({self.sql_text})' + + #FIXME: This is the worst hack I've implemented in a long time. + #Without it, you get errors like this: + """ +E sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")" +E LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table)) +E ^ +E [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405) +""" @property def from_target(self): - return f'({self.sql_text})' + if ' ' in self.sql_text: + return f'({self.sql_text})' + else: + return f'{self.sql_text}' def create_table_output(): From 47cb3d8ae3acbc4e3407d65bc4c252e87654386b Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 14:43:01 -0700 Subject: [PATCH 06/29] Implement truncate_and_insert_table_output --- .vscode/settings.json | 2 +- dagster/core/__init__.py | 1 + dagster/sqlalchemy_kernel/__init__.py | 23 ++++++++++++++++++++++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 7c57ac3e73386..ae136e8796638 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,7 +10,7 @@ "-s" ], "python.unitTest.pyTestEnabled": true, - "python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python", + "python.pythonPath": "/Users/abe/anaconda2/envs/py36/bin/python", "python.linting.lintOnSave": true, "python.linting.pep8Enabled": false, "python.linting.mypyEnabled": false, diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index e6bf91e223368..b09627e15a256 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -10,6 +10,7 @@ from dagster.core.execution import DagsterExecutionContext import json + def pipeline(**kwargs): return DagsterPipeline(**kwargs) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index e48ee454a0703..158a90fa4aa04 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -33,6 +33,7 @@ def __init__(self, sql_text): E ^ E [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405) """ + @property def from_target(self): if ' ' in self.sql_text: @@ -60,6 +61,26 @@ def output_fn(sql_expr, context, arg_dict): ) +def truncate_and_insert_table_output(): + def output_fn(sql_expr, context, arg_dict): + check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression) + check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext) + check.dict_param(arg_dict, 'arg_dict') + + output_table_name = check.str_elem(arg_dict, 'table_name') + total_sql = '''TRUNCATE TABLE {output_table_name}; INSERT INTO {output_table_name} ({sql_text})'''.format( + output_table_name=output_table_name, sql_text=sql_expr.sql_text + ) + print(total_sql) + context.engine.connect().execute(total_sql) + + return OutputDefinition( + name='TRUNCATE_AND_INSERT', + output_fn=output_fn, + argument_def_dict={'table_name': types.STRING}, + ) + + def _table_input_fn(context, arg_dict): check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext) @@ -116,5 +137,5 @@ def create_sql_solid(name, inputs, sql_text): name, inputs=inputs, transform_fn=create_sql_transform(sql_text), - outputs=[create_table_output()], + outputs=[create_table_output(), truncate_and_insert_table_output()], ) From ea442a48569e513b5981bd444d4d902c37be4413 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 09:52:27 -0700 Subject: [PATCH 07/29] create_json_input --- dagster/core/__init__.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 9370bba033b51..97dfba9e78770 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -19,3 +19,17 @@ def input_definition(**kwargs): def file_input_definition(argument_def_dict=None, **kwargs): check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict') return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs) + + +def create_json_input(name): + check.str_param(name, 'name') + + #Note: I don't understand the function of check_path. + def check_path(context, path): + check.inst_param(context, 'context', DagsterExecutionContext) + check.str_param(path, 'path') + json_obj = json.loads(path) + # context.metric('rows', df.shape[0]) + return json_obj + + return create_dagster_json_input(name, check_path) \ No newline at end of file From 06f37097c60268c95b6372329788cc723c0dace7 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:05:38 -0700 Subject: [PATCH 08/29] Get json_input working --- dagster/core/__init__.py | 4 ++-- dagster/pandas_kernel/__init__.py | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 97dfba9e78770..49a5ddc541289 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -4,7 +4,7 @@ from dagster import check from dagster.core import types -from .definitions import InputDefinition +from .definitions import InputDefinition, create_dagster_single_file_input from .graph import DagsterPipeline @@ -32,4 +32,4 @@ def check_path(context, path): # context.metric('rows', df.shape[0]) return json_obj - return create_dagster_json_input(name, check_path) \ No newline at end of file + return create_dagster_single_file_input(name, check_path) \ No newline at end of file diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 6cd6b772e0ddc..149f6e2e85954 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -15,6 +15,9 @@ create_dagster_pd_dependency_input, create_dagster_pd_parquet_output, ) +from dagster.core import ( + create_json_input +) def solid(**kwargs): @@ -98,10 +101,11 @@ def single_path_arg(input_name, path): def csv_input(name, delimiter=',', **read_csv_kwargs): return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs) - def csv_output(): return create_dagster_pd_csv_output() +def json_input(name): + return create_json_input(name) def parquet_output(): return create_dagster_pd_parquet_output() From 8cc0d0d20219a945bddaa8529dbfd695ccd61422 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:11:41 -0700 Subject: [PATCH 09/29] Add read_table_input. Make arg names more sensible --- dagster/pandas_kernel/__init__.py | 4 ++++ dagster/pandas_kernel/definitions.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 149f6e2e85954..90f933a9c682f 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -14,6 +14,7 @@ create_dagster_pd_csv_output, create_dagster_pd_dependency_input, create_dagster_pd_parquet_output, + create_dagster_pd_read_table_input, ) from dagster.core import ( create_json_input @@ -104,6 +105,9 @@ def csv_input(name, delimiter=',', **read_csv_kwargs): def csv_output(): return create_dagster_pd_csv_output() +def read_table_input(name, delimiter=',', **read_table_kwargs): + return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs) + def json_input(name): return create_json_input(name) diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py index 1caedf0ba2925..635a9a833d112 100644 --- a/dagster/pandas_kernel/definitions.py +++ b/dagster/pandas_kernel/definitions.py @@ -57,14 +57,14 @@ def check_path(context, path): return create_dagster_single_file_input(name, check_path) -def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs): +def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs): check.str_param(name, 'name') check.str_param(delimiter, 'delimiter') def check_path(context, path): check.inst_param(context, 'context', DagsterExecutionContext) check.str_param(path, 'path') - df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs) + df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs) context.metric('rows', df.shape[0]) return df From 935418168a86ac6b567ff687ff3c8622b03a5666 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:16:43 -0700 Subject: [PATCH 10/29] Add json dependencies. Load the file, not the path --- dagster/core/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 49a5ddc541289..e6bf91e223368 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -7,6 +7,8 @@ from .definitions import InputDefinition, create_dagster_single_file_input from .graph import DagsterPipeline +from dagster.core.execution import DagsterExecutionContext +import json def pipeline(**kwargs): return DagsterPipeline(**kwargs) @@ -28,7 +30,7 @@ def create_json_input(name): def check_path(context, path): check.inst_param(context, 'context', DagsterExecutionContext) check.str_param(path, 'path') - json_obj = json.loads(path) + json_obj = json.load(open(path)) # context.metric('rows', df.shape[0]) return json_obj From a20088aa701c1d58abcdb0c88b9327cf34b53613 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 14:11:35 -0700 Subject: [PATCH 11/29] Implement horrible, horrible SQL syntax hack. --- dagster/sqlalchemy_kernel/__init__.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index 405f92a81bc79..7412920c6e924 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -24,9 +24,24 @@ class DagsterSqlExpression: def __init__(self, sql_text): self.sql_text = check.str_param(sql_text, 'sql_text') + # @property + # def from_target(self): + # return f'({self.sql_text})' + + #FIXME: This is the worst hack I've implemented in a long time. + #Without it, you get errors like this: + """ +E sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")" +E LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table)) +E ^ +E [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405) +""" @property def from_target(self): - return f'({self.sql_text})' + if ' ' in self.sql_text: + return f'({self.sql_text})' + else: + return f'{self.sql_text}' def create_table_output(): From 9b903e2a45be73be87a385fe8e243adf09c8b74d Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 14:43:01 -0700 Subject: [PATCH 12/29] Implement truncate_and_insert_table_output --- .vscode/settings.json | 2 +- dagster/core/__init__.py | 1 + dagster/sqlalchemy_kernel/__init__.py | 23 ++++++++++++++++++++++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 7c57ac3e73386..ae136e8796638 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,7 +10,7 @@ "-s" ], "python.unitTest.pyTestEnabled": true, - "python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python", + "python.pythonPath": "/Users/abe/anaconda2/envs/py36/bin/python", "python.linting.lintOnSave": true, "python.linting.pep8Enabled": false, "python.linting.mypyEnabled": false, diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index e6bf91e223368..b09627e15a256 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -10,6 +10,7 @@ from dagster.core.execution import DagsterExecutionContext import json + def pipeline(**kwargs): return DagsterPipeline(**kwargs) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index 7412920c6e924..e51ee81e0c5bc 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -36,6 +36,7 @@ def __init__(self, sql_text): E ^ E [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405) """ + @property def from_target(self): if ' ' in self.sql_text: @@ -63,6 +64,26 @@ def output_fn(sql_expr, context, arg_dict): ) +def truncate_and_insert_table_output(): + def output_fn(sql_expr, context, arg_dict): + check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression) + check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext) + check.dict_param(arg_dict, 'arg_dict') + + output_table_name = check.str_elem(arg_dict, 'table_name') + total_sql = '''TRUNCATE TABLE {output_table_name}; INSERT INTO {output_table_name} ({sql_text})'''.format( + output_table_name=output_table_name, sql_text=sql_expr.sql_text + ) + print(total_sql) + context.engine.connect().execute(total_sql) + + return OutputDefinition( + name='TRUNCATE_AND_INSERT', + output_fn=output_fn, + argument_def_dict={'table_name': types.STRING}, + ) + + def _table_input_fn(context, arg_dict): check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext) @@ -119,7 +140,7 @@ def create_sql_solid(name, inputs, sql_text): name, inputs=inputs, transform_fn=create_sql_transform(sql_text), - outputs=[create_table_output()], + outputs=[create_table_output(), truncate_and_insert_table_output()], ) From 818fbe95065719590fd53d2a9d7f2d33eed7fd4b Mon Sep 17 00:00:00 2001 From: Abe Date: Mon, 28 May 2018 08:11:12 -0700 Subject: [PATCH 13/29] Add null_output to dagster_pd --- dagster/pandas_kernel/__init__.py | 14 ++++++++++---- dagster/pandas_kernel/definitions.py | 17 +++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 90f933a9c682f..994c07864712a 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -16,9 +16,7 @@ create_dagster_pd_parquet_output, create_dagster_pd_read_table_input, ) -from dagster.core import ( - create_json_input -) +from dagster.core import (create_json_input) def solid(**kwargs): @@ -87,7 +85,7 @@ def dataframe_solid(*args, name, inputs, transform_fn=None, **kwargs): return Solid( name=name, inputs=inputs, - outputs=[csv_output(), parquet_output()], + outputs=[csv_output(), parquet_output(), null_output()], transform_fn=_dependency_transform_wrapper(name, transform_fn), **kwargs ) @@ -102,14 +100,22 @@ def single_path_arg(input_name, path): def csv_input(name, delimiter=',', **read_csv_kwargs): return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs) + def csv_output(): return create_dagster_pd_csv_output() + def read_table_input(name, delimiter=',', **read_table_kwargs): return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs) + def json_input(name): return create_json_input(name) + def parquet_output(): return create_dagster_pd_parquet_output() + + +def null_output(): + return create_dagster_pd_csv_output() diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py index 635a9a833d112..71d26eb66d787 100644 --- a/dagster/pandas_kernel/definitions.py +++ b/dagster/pandas_kernel/definitions.py @@ -57,6 +57,7 @@ def check_path(context, path): return create_dagster_single_file_input(name, check_path) + def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs): check.str_param(name, 'name') check.str_param(delimiter, 'delimiter') @@ -97,3 +98,19 @@ def output_fn_inst(df, context, arg_dict): return OutputDefinition( name='PARQUET', output_fn=output_fn_inst, argument_def_dict={'path': types.PATH} ) + + +def create_dagster_pd_null_output(): + def output_fn_inst(context, arg_dict): + # check.inst_param(df, 'df', pd.DataFrame) + check.inst_param(context, 'context', DagsterExecutionContext) + check.dict_param(arg_dict, 'arg_dict') + # path = check.str_elem(arg_dict, 'path') + + # df.to_csv(path, index=False) + + return OutputDefinition( + name='None', + output_fn=output_fn_inst, + # argument_def_dict={'path': types.PATH} + ) From 6dc8f111a114b11d46b581885702d5b865ba0673 Mon Sep 17 00:00:00 2001 From: Abe Date: Mon, 28 May 2018 14:36:44 -0700 Subject: [PATCH 14/29] Decruft --- dagster/sqlalchemy_kernel/__init__.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index 4c2e1c3678156..744c9868868f6 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -32,19 +32,6 @@ def __init__(self, subquery_text): def query_text(self): return self._subquery_text - # @property - # def from_target(self): - # return f'({self.sql_text})' - - #FIXME: This is the worst hack I've implemented in a long time. - #Without it, you get errors like this: - """ -E sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")" -E LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table)) -E ^ -E [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405) -""" - @property def from_target(self): return f'({self._subquery_text})' From b9908e01d12b31890bbe5a8e995343ec6ed3d33b Mon Sep 17 00:00:00 2001 From: Abe Date: Thu, 31 May 2018 15:45:04 -0700 Subject: [PATCH 15/29] Add extra_inputs to create_templated_sql_transform_solid --- dagster/sqlalchemy_kernel/common.py | 7 ++++++- dagster/sqlalchemy_kernel/templated.py | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/dagster/sqlalchemy_kernel/common.py b/dagster/sqlalchemy_kernel/common.py index 43e3aa71712fd..705ec30695e46 100644 --- a/dagster/sqlalchemy_kernel/common.py +++ b/dagster/sqlalchemy_kernel/common.py @@ -35,4 +35,9 @@ def execute_sql_text_on_context(context, sql_text): finally: cursor.close() else: - context.engine.connect().execute(sql_text) + # context.engine.connect().execute(sql_text) + connection = context.engine.connect() + transaction = connection.begin() + connection.execute(sql_text) + transaction.commit() + # cursor.commit() diff --git a/dagster/sqlalchemy_kernel/templated.py b/dagster/sqlalchemy_kernel/templated.py index 73adda1485511..0d5f5becd39d6 100644 --- a/dagster/sqlalchemy_kernel/templated.py +++ b/dagster/sqlalchemy_kernel/templated.py @@ -1,3 +1,4 @@ +import logging import jinja2 import dagster @@ -16,7 +17,14 @@ def _create_table_input(name, depends_on=None): ) -def create_templated_sql_transform_solid(name, sql, table_arguments, output, dependencies=None): +def create_templated_sql_transform_solid( + name, + sql, + table_arguments, + output, + dependencies=None, + extra_inputs=[], +): ''' Create a solid that is a templated sql statement. This assumes that the sql statement is creating or modifying a table, and that that table will be used downstream in the pipeline @@ -93,7 +101,7 @@ def create_templated_sql_transform_solid(name, sql, table_arguments, output, dep dep_inputs = [_create_table_input(dep.name, depends_on=dep) for dep in dependencies] return Solid( name=name, - inputs=table_inputs + dep_inputs, + inputs=table_inputs + dep_inputs + extra_inputs, transform_fn=_create_templated_sql_transform_with_output(sql, output), outputs=[], ) @@ -107,6 +115,8 @@ def _render_template_string(template_text, **kwargs): def _create_templated_sql_transform_with_output(sql, output_table): def do_transform(context, **kwargs): rendered_sql = _render_template_string(sql, **kwargs) + # logging.info(rendered_sql) + print(rendered_sql[:1000]) execute_sql_text_on_context(context, rendered_sql) return kwargs[output_table] From 070d3265ad03d15b9989e6c90b477334778f45c2 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 09:52:27 -0700 Subject: [PATCH 16/29] create_json_input --- dagster/core/__init__.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index e6e5136a4b8c0..491253b812868 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -19,3 +19,17 @@ def input_definition(**kwargs): def file_input_definition(argument_def_dict=None, **kwargs): check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict') return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs) + + +def create_json_input(name): + check.str_param(name, 'name') + + #Note: I don't understand the function of check_path. + def check_path(context, path): + check.inst_param(context, 'context', DagsterExecutionContext) + check.str_param(path, 'path') + json_obj = json.loads(path) + # context.metric('rows', df.shape[0]) + return json_obj + + return create_dagster_json_input(name, check_path) \ No newline at end of file From bedd21885d9eb9b31def468db233feae214f284c Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:05:38 -0700 Subject: [PATCH 17/29] Get json_input working --- dagster/core/__init__.py | 6 ++++-- dagster/pandas_kernel/__init__.py | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 491253b812868..d89549d4123e9 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -3,8 +3,9 @@ from dagster import check from dagster.core import types +from dagster.core.execution import DagsterExecutionContext -from .definitions import InputDefinition, Solid +from .definitions import (InputDefinition, Solid, create_dagster_single_file_input) from .graph import DagsterPipeline @@ -22,6 +23,7 @@ def file_input_definition(argument_def_dict=None, **kwargs): def create_json_input(name): + import json check.str_param(name, 'name') #Note: I don't understand the function of check_path. @@ -32,4 +34,4 @@ def check_path(context, path): # context.metric('rows', df.shape[0]) return json_obj - return create_dagster_json_input(name, check_path) \ No newline at end of file + return create_dagster_single_file_input(name, check_path) \ No newline at end of file diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 648fe71a0e11c..6bf469f245178 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -15,6 +15,9 @@ create_dagster_pd_dependency_input, create_dagster_pd_parquet_output, ) +from dagster.core import ( + create_json_input +) def solid(**kwargs): @@ -98,10 +101,11 @@ def single_path_arg(input_name, path): def csv_input(name, delimiter=',', **read_csv_kwargs): return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs) - def csv_output(): return create_dagster_pd_csv_output() +def json_input(name): + return create_json_input(name) def parquet_output(): return create_dagster_pd_parquet_output() From ca587ac9076bfadef3c088b2f0c3bd5045b42d85 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:11:41 -0700 Subject: [PATCH 18/29] Add read_table_input. Make arg names more sensible --- dagster/pandas_kernel/__init__.py | 4 ++++ dagster/pandas_kernel/definitions.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 6bf469f245178..72bd71ebd7f74 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -14,6 +14,7 @@ create_dagster_pd_csv_output, create_dagster_pd_dependency_input, create_dagster_pd_parquet_output, + create_dagster_pd_read_table_input, ) from dagster.core import ( create_json_input @@ -104,6 +105,9 @@ def csv_input(name, delimiter=',', **read_csv_kwargs): def csv_output(): return create_dagster_pd_csv_output() +def read_table_input(name, delimiter=',', **read_table_kwargs): + return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs) + def json_input(name): return create_json_input(name) diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py index 1caedf0ba2925..635a9a833d112 100644 --- a/dagster/pandas_kernel/definitions.py +++ b/dagster/pandas_kernel/definitions.py @@ -57,14 +57,14 @@ def check_path(context, path): return create_dagster_single_file_input(name, check_path) -def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs): +def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs): check.str_param(name, 'name') check.str_param(delimiter, 'delimiter') def check_path(context, path): check.inst_param(context, 'context', DagsterExecutionContext) check.str_param(path, 'path') - df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs) + df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs) context.metric('rows', df.shape[0]) return df From 7ef3893cc536dc8bd7f6eafa6792cab8833f3997 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:16:43 -0700 Subject: [PATCH 19/29] Add json dependencies. Load the file, not the path --- dagster/core/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index d89549d4123e9..cf7ced7314406 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -8,6 +8,8 @@ from .definitions import (InputDefinition, Solid, create_dagster_single_file_input) from .graph import DagsterPipeline +from dagster.core.execution import DagsterExecutionContext +import json def pipeline(**kwargs): return DagsterPipeline(**kwargs) @@ -30,7 +32,7 @@ def create_json_input(name): def check_path(context, path): check.inst_param(context, 'context', DagsterExecutionContext) check.str_param(path, 'path') - json_obj = json.loads(path) + json_obj = json.load(open(path)) # context.metric('rows', df.shape[0]) return json_obj From c61977f78025f97b328bbcde1db44a8421dd5887 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 14:11:35 -0700 Subject: [PATCH 20/29] Implement horrible, horrible SQL syntax hack. --- dagster/sqlalchemy_kernel/__init__.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index c0db2e2fea6c6..b1c90e8d862b5 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -32,6 +32,19 @@ def __init__(self, subquery_text): def query_text(self): return self._subquery_text + # @property + # def from_target(self): + # return f'({self.sql_text})' + + #FIXME: This is the worst hack I've implemented in a long time. + #Without it, you get errors like this: + """ +E sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")" +E LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table)) +E ^ +E [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405) +""" + @property def from_target(self): return f'({self._subquery_text})' From 1c7d7d24420fc7a49321e2c3e782ec73b56e60ff Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Fri, 1 Jun 2018 14:47:46 -0700 Subject: [PATCH 21/29] Implement truncate_and_insert_table_output --- .vscode/settings.json | 2 +- dagster/core/__init__.py | 1 + dagster/sqlalchemy_kernel/__init__.py | 35 ++++++++++++++++----------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 7c57ac3e73386..84360ef17f9fc 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,7 +10,7 @@ "-s" ], "python.unitTest.pyTestEnabled": true, - "python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python", + "python.pythonPath": "/Users/schrockn/code/venvs/dagster/bin/python", "python.linting.lintOnSave": true, "python.linting.pep8Enabled": false, "python.linting.mypyEnabled": false, diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index cf7ced7314406..e00be887d752e 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -11,6 +11,7 @@ from dagster.core.execution import DagsterExecutionContext import json + def pipeline(**kwargs): return DagsterPipeline(**kwargs) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index b1c90e8d862b5..744c9868868f6 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -32,19 +32,6 @@ def __init__(self, subquery_text): def query_text(self): return self._subquery_text - # @property - # def from_target(self): - # return f'({self.sql_text})' - - #FIXME: This is the worst hack I've implemented in a long time. - #Without it, you get errors like this: - """ -E sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")" -E LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table)) -E ^ -E [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405) -""" - @property def from_target(self): return f'({self._subquery_text})' @@ -83,6 +70,26 @@ def output_fn(sql_expr, context, arg_dict): ) +def truncate_and_insert_table_output(): + def output_fn(sql_expr, context, arg_dict): + check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression) + check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext) + check.dict_param(arg_dict, 'arg_dict') + + output_table_name = check.str_elem(arg_dict, 'table_name') + total_sql = '''TRUNCATE TABLE {output_table_name}; INSERT INTO {output_table_name} ({sql_text})'''.format( + output_table_name=output_table_name, sql_text=sql_expr.sql_text + ) + print(total_sql) + context.engine.connect().execute(total_sql) + + return OutputDefinition( + name='TRUNCATE_AND_INSERT', + output_fn=output_fn, + argument_def_dict={'table_name': types.STRING}, + ) + + def _table_input_fn(context, arg_dict): check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext) @@ -139,7 +146,7 @@ def create_sql_solid(name, inputs, sql_text): name, inputs=inputs, transform_fn=create_sql_transform(sql_text), - outputs=[create_table_output()], + outputs=[create_table_output(), truncate_and_insert_table_output()], ) From c513c00f7339d6111b75bfdd4f2dbc11182458d2 Mon Sep 17 00:00:00 2001 From: Abe Date: Mon, 28 May 2018 08:11:12 -0700 Subject: [PATCH 22/29] Add null_output to dagster_pd --- dagster/pandas_kernel/__init__.py | 14 ++++++++++---- dagster/pandas_kernel/definitions.py | 17 +++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 72bd71ebd7f74..547a0a39908cf 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -16,9 +16,7 @@ create_dagster_pd_parquet_output, create_dagster_pd_read_table_input, ) -from dagster.core import ( - create_json_input -) +from dagster.core import (create_json_input) def solid(**kwargs): @@ -87,7 +85,7 @@ def dataframe_solid(*args, name, inputs, transform_fn=None, **kwargs): return Solid( name=name, inputs=inputs, - outputs=[csv_output(), parquet_output()], + outputs=[csv_output(), parquet_output(), null_output()], transform_fn=_dependency_transform_wrapper(name, transform_fn), **kwargs ) @@ -102,14 +100,22 @@ def single_path_arg(input_name, path): def csv_input(name, delimiter=',', **read_csv_kwargs): return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs) + def csv_output(): return create_dagster_pd_csv_output() + def read_table_input(name, delimiter=',', **read_table_kwargs): return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs) + def json_input(name): return create_json_input(name) + def parquet_output(): return create_dagster_pd_parquet_output() + + +def null_output(): + return create_dagster_pd_csv_output() diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py index 635a9a833d112..71d26eb66d787 100644 --- a/dagster/pandas_kernel/definitions.py +++ b/dagster/pandas_kernel/definitions.py @@ -57,6 +57,7 @@ def check_path(context, path): return create_dagster_single_file_input(name, check_path) + def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs): check.str_param(name, 'name') check.str_param(delimiter, 'delimiter') @@ -97,3 +98,19 @@ def output_fn_inst(df, context, arg_dict): return OutputDefinition( name='PARQUET', output_fn=output_fn_inst, argument_def_dict={'path': types.PATH} ) + + +def create_dagster_pd_null_output(): + def output_fn_inst(context, arg_dict): + # check.inst_param(df, 'df', pd.DataFrame) + check.inst_param(context, 'context', DagsterExecutionContext) + check.dict_param(arg_dict, 'arg_dict') + # path = check.str_elem(arg_dict, 'path') + + # df.to_csv(path, index=False) + + return OutputDefinition( + name='None', + output_fn=output_fn_inst, + # argument_def_dict={'path': types.PATH} + ) From 2e5e292248dde97924bda052170050b93650a096 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 09:52:27 -0700 Subject: [PATCH 23/29] create_json_input --- dagster/core/__init__.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index e00be887d752e..d3950a4a358d8 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -1,6 +1,8 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from builtins import * # pylint: disable=W0622,W0401 +import json + from dagster import check from dagster.core import types from dagster.core.execution import DagsterExecutionContext @@ -8,9 +10,6 @@ from .definitions import (InputDefinition, Solid, create_dagster_single_file_input) from .graph import DagsterPipeline -from dagster.core.execution import DagsterExecutionContext -import json - def pipeline(**kwargs): return DagsterPipeline(**kwargs) @@ -26,7 +25,6 @@ def file_input_definition(argument_def_dict=None, **kwargs): def create_json_input(name): - import json check.str_param(name, 'name') #Note: I don't understand the function of check_path. @@ -37,4 +35,4 @@ def check_path(context, path): # context.metric('rows', df.shape[0]) return json_obj - return create_dagster_single_file_input(name, check_path) \ No newline at end of file + return create_dagster_single_file_input(name, check_path) From d3dcba6a93800fae37d10584cc0ed55c736587c2 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:05:38 -0700 Subject: [PATCH 24/29] Get json_input working --- dagster/core/__init__.py | 2 +- dagster/pandas_kernel/__init__.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index d3950a4a358d8..2608826e83d96 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -7,7 +7,7 @@ from dagster.core import types from dagster.core.execution import DagsterExecutionContext -from .definitions import (InputDefinition, Solid, create_dagster_single_file_input) +from .definitions import (InputDefinition, create_dagster_single_file_input) from .graph import DagsterPipeline diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 547a0a39908cf..296b0d1987c1a 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -6,6 +6,7 @@ from dagster import check from dagster.utils import has_context_argument +from dagster.core import create_json_input from dagster.core.definitions import Solid from dagster.core.execution import DagsterExecutionContext from dagster.core.errors import (DagsterUserCodeExecutionError, DagsterInvariantViolationError) @@ -16,7 +17,6 @@ create_dagster_pd_parquet_output, create_dagster_pd_read_table_input, ) -from dagster.core import (create_json_input) def solid(**kwargs): @@ -105,6 +105,10 @@ def csv_output(): return create_dagster_pd_csv_output() +def json_input(name): + return create_json_input(name) + + def read_table_input(name, delimiter=',', **read_table_kwargs): return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs) From 117fdd1f29cdfa3923bbcae3d49028c77c00a9d7 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:11:41 -0700 Subject: [PATCH 25/29] Add read_table_input. Make arg names more sensible --- dagster/pandas_kernel/__init__.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py index 296b0d1987c1a..4e0e3168b589e 100644 --- a/dagster/pandas_kernel/__init__.py +++ b/dagster/pandas_kernel/__init__.py @@ -105,10 +105,6 @@ def csv_output(): return create_dagster_pd_csv_output() -def json_input(name): - return create_json_input(name) - - def read_table_input(name, delimiter=',', **read_table_kwargs): return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs) From ffa3a756ec7e36ead9b9047d032e39e131b1035c Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 10:16:43 -0700 Subject: [PATCH 26/29] Add json dependencies. Load the file, not the path --- dagster/core/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 2608826e83d96..b394595534a03 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -10,6 +10,8 @@ from .definitions import (InputDefinition, create_dagster_single_file_input) from .graph import DagsterPipeline +from dagster.core.execution import DagsterExecutionContext +import json def pipeline(**kwargs): return DagsterPipeline(**kwargs) From 7db955e2d1ac818f2cb28db14c8c83c00beed8b2 Mon Sep 17 00:00:00 2001 From: Abe Date: Wed, 23 May 2018 14:43:01 -0700 Subject: [PATCH 27/29] Implement truncate_and_insert_table_output --- dagster/core/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index b394595534a03..00b6892f67fa3 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -13,6 +13,7 @@ from dagster.core.execution import DagsterExecutionContext import json + def pipeline(**kwargs): return DagsterPipeline(**kwargs) From 2dcd6d2acca7c6a23cc805a7cc9d8bfda7b8ecb4 Mon Sep 17 00:00:00 2001 From: Abe Date: Thu, 31 May 2018 15:45:04 -0700 Subject: [PATCH 28/29] Add extra_inputs to create_templated_sql_transform_solid --- dagster/sqlalchemy_kernel/common.py | 7 ++++++- dagster/sqlalchemy_kernel/templated.py | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/dagster/sqlalchemy_kernel/common.py b/dagster/sqlalchemy_kernel/common.py index 43e3aa71712fd..705ec30695e46 100644 --- a/dagster/sqlalchemy_kernel/common.py +++ b/dagster/sqlalchemy_kernel/common.py @@ -35,4 +35,9 @@ def execute_sql_text_on_context(context, sql_text): finally: cursor.close() else: - context.engine.connect().execute(sql_text) + # context.engine.connect().execute(sql_text) + connection = context.engine.connect() + transaction = connection.begin() + connection.execute(sql_text) + transaction.commit() + # cursor.commit() diff --git a/dagster/sqlalchemy_kernel/templated.py b/dagster/sqlalchemy_kernel/templated.py index 73adda1485511..0d5f5becd39d6 100644 --- a/dagster/sqlalchemy_kernel/templated.py +++ b/dagster/sqlalchemy_kernel/templated.py @@ -1,3 +1,4 @@ +import logging import jinja2 import dagster @@ -16,7 +17,14 @@ def _create_table_input(name, depends_on=None): ) -def create_templated_sql_transform_solid(name, sql, table_arguments, output, dependencies=None): +def create_templated_sql_transform_solid( + name, + sql, + table_arguments, + output, + dependencies=None, + extra_inputs=[], +): ''' Create a solid that is a templated sql statement. This assumes that the sql statement is creating or modifying a table, and that that table will be used downstream in the pipeline @@ -93,7 +101,7 @@ def create_templated_sql_transform_solid(name, sql, table_arguments, output, dep dep_inputs = [_create_table_input(dep.name, depends_on=dep) for dep in dependencies] return Solid( name=name, - inputs=table_inputs + dep_inputs, + inputs=table_inputs + dep_inputs + extra_inputs, transform_fn=_create_templated_sql_transform_with_output(sql, output), outputs=[], ) @@ -107,6 +115,8 @@ def _render_template_string(template_text, **kwargs): def _create_templated_sql_transform_with_output(sql, output_table): def do_transform(context, **kwargs): rendered_sql = _render_template_string(sql, **kwargs) + # logging.info(rendered_sql) + print(rendered_sql[:1000]) execute_sql_text_on_context(context, rendered_sql) return kwargs[output_table] From 3a2888566cc7a12a2cfd3fd61b6a4abe064450ba Mon Sep 17 00:00:00 2001 From: Nick Schrock Date: Fri, 1 Jun 2018 15:00:26 -0700 Subject: [PATCH 29/29] minor cleanup --- dagster/core/__init__.py | 13 ++++--------- dagster/sqlalchemy_kernel/__init__.py | 4 ++-- dagster/sqlalchemy_kernel/common.py | 2 -- dagster/sqlalchemy_kernel/templated.py | 2 -- 4 files changed, 6 insertions(+), 15 deletions(-) diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py index 00b6892f67fa3..1b0934dbc790e 100644 --- a/dagster/core/__init__.py +++ b/dagster/core/__init__.py @@ -10,9 +10,6 @@ from .definitions import (InputDefinition, create_dagster_single_file_input) from .graph import DagsterPipeline -from dagster.core.execution import DagsterExecutionContext -import json - def pipeline(**kwargs): return DagsterPipeline(**kwargs) @@ -30,12 +27,10 @@ def file_input_definition(argument_def_dict=None, **kwargs): def create_json_input(name): check.str_param(name, 'name') - #Note: I don't understand the function of check_path. - def check_path(context, path): + def load_file(context, path): check.inst_param(context, 'context', DagsterExecutionContext) check.str_param(path, 'path') - json_obj = json.load(open(path)) - # context.metric('rows', df.shape[0]) - return json_obj + with open(path) as ff: + return json.load(ff) - return create_dagster_single_file_input(name, check_path) + return create_dagster_single_file_input(name, load_file) diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py index 744c9868868f6..fee2939db7233 100644 --- a/dagster/sqlalchemy_kernel/__init__.py +++ b/dagster/sqlalchemy_kernel/__init__.py @@ -77,10 +77,10 @@ def output_fn(sql_expr, context, arg_dict): check.dict_param(arg_dict, 'arg_dict') output_table_name = check.str_elem(arg_dict, 'table_name') - total_sql = '''TRUNCATE TABLE {output_table_name}; INSERT INTO {output_table_name} ({sql_text})'''.format( + total_sql = '''TRUNCATE TABLE {output_table_name}; + INSERT INTO {output_table_name} ({sql_text})'''.format( output_table_name=output_table_name, sql_text=sql_expr.sql_text ) - print(total_sql) context.engine.connect().execute(total_sql) return OutputDefinition( diff --git a/dagster/sqlalchemy_kernel/common.py b/dagster/sqlalchemy_kernel/common.py index 705ec30695e46..d02c64bbba35f 100644 --- a/dagster/sqlalchemy_kernel/common.py +++ b/dagster/sqlalchemy_kernel/common.py @@ -35,9 +35,7 @@ def execute_sql_text_on_context(context, sql_text): finally: cursor.close() else: - # context.engine.connect().execute(sql_text) connection = context.engine.connect() transaction = connection.begin() connection.execute(sql_text) transaction.commit() - # cursor.commit() diff --git a/dagster/sqlalchemy_kernel/templated.py b/dagster/sqlalchemy_kernel/templated.py index 0d5f5becd39d6..4b1011494a933 100644 --- a/dagster/sqlalchemy_kernel/templated.py +++ b/dagster/sqlalchemy_kernel/templated.py @@ -115,8 +115,6 @@ def _render_template_string(template_text, **kwargs): def _create_templated_sql_transform_with_output(sql, output_table): def do_transform(context, **kwargs): rendered_sql = _render_template_string(sql, **kwargs) - # logging.info(rendered_sql) - print(rendered_sql[:1000]) execute_sql_text_on_context(context, rendered_sql) return kwargs[output_table]