From 22b4d84c835e102c0e441dff005d73be4da19462 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 09:52:27 -0700
Subject: [PATCH 01/29] create_json_input

---
 dagster/core/__init__.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 9370bba033b51..97dfba9e78770 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -19,3 +19,17 @@ def input_definition(**kwargs):
 def file_input_definition(argument_def_dict=None, **kwargs):
     check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict')
     return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs)
+
+
+def create_json_input(name):
+    check.str_param(name, 'name')
+
+    #Note: I don't understand the function of check_path.
+    def check_path(context, path):
+        check.inst_param(context, 'context', DagsterExecutionContext)
+        check.str_param(path, 'path')
+        json_obj = json.loads(path)
+        # context.metric('rows', df.shape[0])
+        return json_obj
+
+    return create_dagster_json_input(name, check_path)
\ No newline at end of file

From 4b3b03a7116c3d8f27a992c7fbbe1f665a4de3e3 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:05:38 -0700
Subject: [PATCH 02/29] Get json_input working

---
 dagster/core/__init__.py          | 4 ++--
 dagster/pandas_kernel/__init__.py | 6 +++++-
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 97dfba9e78770..49a5ddc541289 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -4,7 +4,7 @@
 from dagster import check
 from dagster.core import types
 
-from .definitions import InputDefinition
+from .definitions import InputDefinition, create_dagster_single_file_input
 from .graph import DagsterPipeline
 
 
@@ -32,4 +32,4 @@ def check_path(context, path):
         # context.metric('rows', df.shape[0])
         return json_obj
 
-    return create_dagster_json_input(name, check_path)
\ No newline at end of file
+    return create_dagster_single_file_input(name, check_path)
\ No newline at end of file
diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 79ed93b555a8c..76af4d13c8879 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -14,6 +14,9 @@
     create_dagster_pd_dependency_input,
     create_dagster_pd_parquet_output,
 )
+from dagster.core import (
+    create_json_input
+)
 
 
 def solid(**kwargs):
@@ -95,10 +98,11 @@ def single_path_arg(input_name, path):
 def csv_input(name, delimiter=',', **read_csv_kwargs):
     return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs)
 
-
 def csv_output():
     return create_dagster_pd_csv_output()
 
+def json_input(name):
+    return create_json_input(name)
 
 def parquet_output():
     return create_dagster_pd_parquet_output()

From 7a0a51da0afc25139a705f630e66bd3b7c6f729e Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:11:41 -0700
Subject: [PATCH 03/29] Add read_table_input. Make arg names more sensible

---
 dagster/pandas_kernel/__init__.py    | 4 ++++
 dagster/pandas_kernel/definitions.py | 4 ++--
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 76af4d13c8879..3fbfb2965abc3 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -13,6 +13,7 @@
     create_dagster_pd_csv_output,
     create_dagster_pd_dependency_input,
     create_dagster_pd_parquet_output,
+    create_dagster_pd_read_table_input,
 )
 from dagster.core import (
     create_json_input
@@ -101,6 +102,9 @@ def csv_input(name, delimiter=',', **read_csv_kwargs):
 def csv_output():
     return create_dagster_pd_csv_output()
 
+def read_table_input(name, delimiter=',', **read_table_kwargs):
+    return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)
+
 def json_input(name):
     return create_json_input(name)
 
diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py
index 1caedf0ba2925..635a9a833d112 100644
--- a/dagster/pandas_kernel/definitions.py
+++ b/dagster/pandas_kernel/definitions.py
@@ -57,14 +57,14 @@ def check_path(context, path):
 
     return create_dagster_single_file_input(name, check_path)
 
-def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs):
+def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs):
     check.str_param(name, 'name')
     check.str_param(delimiter, 'delimiter')
 
     def check_path(context, path):
         check.inst_param(context, 'context', DagsterExecutionContext)
         check.str_param(path, 'path')
-        df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs)
+        df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs)
         context.metric('rows', df.shape[0])
         return df
 

From 9a47cd662510b24cdec94cdfb832785983988581 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:16:43 -0700
Subject: [PATCH 04/29] Add json dependencies. Load the file, not the path

---
 dagster/core/__init__.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 49a5ddc541289..e6bf91e223368 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -7,6 +7,8 @@
 from .definitions import InputDefinition, create_dagster_single_file_input
 from .graph import DagsterPipeline
 
+from dagster.core.execution import DagsterExecutionContext
+import json
 
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
@@ -28,7 +30,7 @@ def create_json_input(name):
     def check_path(context, path):
         check.inst_param(context, 'context', DagsterExecutionContext)
         check.str_param(path, 'path')
-        json_obj = json.loads(path)
+        json_obj = json.load(open(path))
         # context.metric('rows', df.shape[0])
         return json_obj
 

From 28aec64a82d966827f4cadffe526001136033317 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 14:11:35 -0700
Subject: [PATCH 05/29] Implement horrible, horrible SQL syntax hack.

---
 dagster/sqlalchemy_kernel/__init__.py | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py
index 2a3adfabf22f8..e48ee454a0703 100644
--- a/dagster/sqlalchemy_kernel/__init__.py
+++ b/dagster/sqlalchemy_kernel/__init__.py
@@ -21,9 +21,24 @@ class DagsterSqlExpression:
     def __init__(self, sql_text):
         self.sql_text = check.str_param(sql_text, 'sql_text')
 
+    # @property
+    # def from_target(self):
+    #     return f'({self.sql_text})'
+
+    #FIXME: This is the worst hack I've implemented in a long time.
+    #Without it, you get errors like this:
+    """
+E       sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")"
+E       LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))
+E                                                                            ^
+E        [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405)
+"""
     @property
     def from_target(self):
-        return f'({self.sql_text})'
+        if ' ' in self.sql_text:
+            return f'({self.sql_text})'
+        else:
+            return f'{self.sql_text}'
 
 
 def create_table_output():

From 13c0489777152ead91c5585b3834dfe2740819cf Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 14:43:01 -0700
Subject: [PATCH 06/29] Implement truncate_and_insert_table_output

---
 .vscode/settings.json                 |  2 +-
 dagster/core/__init__.py              |  1 +
 dagster/sqlalchemy_kernel/__init__.py | 23 ++++++++++++++++++++++-
 3 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/.vscode/settings.json b/.vscode/settings.json
index 7c57ac3e73386..ae136e8796638 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -10,7 +10,7 @@
     "-s"
   ],
   "python.unitTest.pyTestEnabled": true,
-  "python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python",
+  "python.pythonPath": "/Users/abe/anaconda2/envs/py36/bin/python",
   "python.linting.lintOnSave": true,
   "python.linting.pep8Enabled": false,
   "python.linting.mypyEnabled": false,
diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index e6bf91e223368..b09627e15a256 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -10,6 +10,7 @@
 from dagster.core.execution import DagsterExecutionContext
 import json
 
+
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
 
diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py
index e48ee454a0703..158a90fa4aa04 100644
--- a/dagster/sqlalchemy_kernel/__init__.py
+++ b/dagster/sqlalchemy_kernel/__init__.py
@@ -33,6 +33,7 @@ def __init__(self, sql_text):
 E                                                                            ^
 E        [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405)
 """
+
     @property
     def from_target(self):
         if ' ' in self.sql_text:
@@ -60,6 +61,26 @@ def output_fn(sql_expr, context, arg_dict):
     )
 
 
+def truncate_and_insert_table_output():
+    def output_fn(sql_expr, context, arg_dict):
+        check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression)
+        check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
+        check.dict_param(arg_dict, 'arg_dict')
+
+        output_table_name = check.str_elem(arg_dict, 'table_name')
+        total_sql = '''TRUNCATE TABLE {output_table_name}; INSERT INTO {output_table_name} ({sql_text})'''.format(
+            output_table_name=output_table_name, sql_text=sql_expr.sql_text
+        )
+        print(total_sql)
+        context.engine.connect().execute(total_sql)
+
+    return OutputDefinition(
+        name='TRUNCATE_AND_INSERT',
+        output_fn=output_fn,
+        argument_def_dict={'table_name': types.STRING},
+    )
+
+
 def _table_input_fn(context, arg_dict):
 
     check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
@@ -116,5 +137,5 @@ def create_sql_solid(name, inputs, sql_text):
         name,
         inputs=inputs,
         transform_fn=create_sql_transform(sql_text),
-        outputs=[create_table_output()],
+        outputs=[create_table_output(), truncate_and_insert_table_output()],
     )

From aaa57e09997ea5d5db17ede82c5b47d14c19f21c Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 09:52:27 -0700
Subject: [PATCH 07/29] create_json_input

---
 dagster/core/__init__.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 9370bba033b51..97dfba9e78770 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -19,3 +19,17 @@ def input_definition(**kwargs):
 def file_input_definition(argument_def_dict=None, **kwargs):
     check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict')
     return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs)
+
+
+def create_json_input(name):
+    check.str_param(name, 'name')
+
+    #Note: I don't understand the function of check_path.
+    def check_path(context, path):
+        check.inst_param(context, 'context', DagsterExecutionContext)
+        check.str_param(path, 'path')
+        json_obj = json.loads(path)
+        # context.metric('rows', df.shape[0])
+        return json_obj
+
+    return create_dagster_json_input(name, check_path)
\ No newline at end of file

From d1e09744f84ab8ef41b73522b8d1272d40183d07 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:05:38 -0700
Subject: [PATCH 08/29] Get json_input working

---
 dagster/core/__init__.py          | 4 ++--
 dagster/pandas_kernel/__init__.py | 6 +++++-
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 97dfba9e78770..49a5ddc541289 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -4,7 +4,7 @@
 from dagster import check
 from dagster.core import types
 
-from .definitions import InputDefinition
+from .definitions import InputDefinition, create_dagster_single_file_input
 from .graph import DagsterPipeline
 
 
@@ -32,4 +32,4 @@ def check_path(context, path):
         # context.metric('rows', df.shape[0])
         return json_obj
 
-    return create_dagster_json_input(name, check_path)
\ No newline at end of file
+    return create_dagster_single_file_input(name, check_path)
\ No newline at end of file
diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 6cd6b772e0ddc..149f6e2e85954 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -15,6 +15,9 @@
     create_dagster_pd_dependency_input,
     create_dagster_pd_parquet_output,
 )
+from dagster.core import (
+    create_json_input
+)
 
 
 def solid(**kwargs):
@@ -98,10 +101,11 @@ def single_path_arg(input_name, path):
 def csv_input(name, delimiter=',', **read_csv_kwargs):
     return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs)
 
-
 def csv_output():
     return create_dagster_pd_csv_output()
 
+def json_input(name):
+    return create_json_input(name)
 
 def parquet_output():
     return create_dagster_pd_parquet_output()

From 66035ec236ba7e50df4e7d9ef6745319752f27ce Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:11:41 -0700
Subject: [PATCH 09/29] Add read_table_input. Make arg names more sensible

---
 dagster/pandas_kernel/__init__.py    | 4 ++++
 dagster/pandas_kernel/definitions.py | 4 ++--
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 149f6e2e85954..90f933a9c682f 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -14,6 +14,7 @@
     create_dagster_pd_csv_output,
     create_dagster_pd_dependency_input,
     create_dagster_pd_parquet_output,
+    create_dagster_pd_read_table_input,
 )
 from dagster.core import (
     create_json_input
@@ -104,6 +105,9 @@ def csv_input(name, delimiter=',', **read_csv_kwargs):
 def csv_output():
     return create_dagster_pd_csv_output()
 
+def read_table_input(name, delimiter=',', **read_table_kwargs):
+    return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)
+
 def json_input(name):
     return create_json_input(name)
 
diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py
index 1caedf0ba2925..635a9a833d112 100644
--- a/dagster/pandas_kernel/definitions.py
+++ b/dagster/pandas_kernel/definitions.py
@@ -57,14 +57,14 @@ def check_path(context, path):
 
     return create_dagster_single_file_input(name, check_path)
 
-def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs):
+def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs):
     check.str_param(name, 'name')
     check.str_param(delimiter, 'delimiter')
 
     def check_path(context, path):
         check.inst_param(context, 'context', DagsterExecutionContext)
         check.str_param(path, 'path')
-        df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs)
+        df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs)
         context.metric('rows', df.shape[0])
         return df
 

From 1f7938f6338ff2f606dfb95b93e65530517acd75 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:16:43 -0700
Subject: [PATCH 10/29] Add json dependencies. Load the file, not the path

---
 dagster/core/__init__.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 49a5ddc541289..e6bf91e223368 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -7,6 +7,8 @@
 from .definitions import InputDefinition, create_dagster_single_file_input
 from .graph import DagsterPipeline
 
+from dagster.core.execution import DagsterExecutionContext
+import json
 
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
@@ -28,7 +30,7 @@ def create_json_input(name):
     def check_path(context, path):
         check.inst_param(context, 'context', DagsterExecutionContext)
         check.str_param(path, 'path')
-        json_obj = json.loads(path)
+        json_obj = json.load(open(path))
         # context.metric('rows', df.shape[0])
         return json_obj
 

From 479d937117886f7ab9bb601da534be225230aeb1 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 14:11:35 -0700
Subject: [PATCH 11/29] Implement horrible, horrible SQL syntax hack.

---
 dagster/sqlalchemy_kernel/__init__.py | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py
index 405f92a81bc79..7412920c6e924 100644
--- a/dagster/sqlalchemy_kernel/__init__.py
+++ b/dagster/sqlalchemy_kernel/__init__.py
@@ -24,9 +24,24 @@ class DagsterSqlExpression:
     def __init__(self, sql_text):
         self.sql_text = check.str_param(sql_text, 'sql_text')
 
+    # @property
+    # def from_target(self):
+    #     return f'({self.sql_text})'
+
+    #FIXME: This is the worst hack I've implemented in a long time.
+    #Without it, you get errors like this:
+    """
+E       sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")"
+E       LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))
+E                                                                            ^
+E        [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405)
+"""
     @property
     def from_target(self):
-        return f'({self.sql_text})'
+        if ' ' in self.sql_text:
+            return f'({self.sql_text})'
+        else:
+            return f'{self.sql_text}'
 
 
 def create_table_output():

From 83c7f1ab9375d39b04a9b23e2a5e32af84c6128e Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 14:43:01 -0700
Subject: [PATCH 12/29] Implement truncate_and_insert_table_output

---
 .vscode/settings.json                 |  2 +-
 dagster/core/__init__.py              |  1 +
 dagster/sqlalchemy_kernel/__init__.py | 23 ++++++++++++++++++++++-
 3 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/.vscode/settings.json b/.vscode/settings.json
index 7c57ac3e73386..ae136e8796638 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -10,7 +10,7 @@
     "-s"
   ],
   "python.unitTest.pyTestEnabled": true,
-  "python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python",
+  "python.pythonPath": "/Users/abe/anaconda2/envs/py36/bin/python",
   "python.linting.lintOnSave": true,
   "python.linting.pep8Enabled": false,
   "python.linting.mypyEnabled": false,
diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index e6bf91e223368..b09627e15a256 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -10,6 +10,7 @@
 from dagster.core.execution import DagsterExecutionContext
 import json
 
+
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
 
diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py
index 7412920c6e924..e51ee81e0c5bc 100644
--- a/dagster/sqlalchemy_kernel/__init__.py
+++ b/dagster/sqlalchemy_kernel/__init__.py
@@ -36,6 +36,7 @@ def __init__(self, sql_text):
 E                                                                            ^
 E        [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405)
 """
+
     @property
     def from_target(self):
         if ' ' in self.sql_text:
@@ -63,6 +64,26 @@ def output_fn(sql_expr, context, arg_dict):
     )
 
 
+def truncate_and_insert_table_output():
+    def output_fn(sql_expr, context, arg_dict):
+        check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression)
+        check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
+        check.dict_param(arg_dict, 'arg_dict')
+
+        output_table_name = check.str_elem(arg_dict, 'table_name')
+        total_sql = '''TRUNCATE TABLE {output_table_name}; INSERT INTO {output_table_name} ({sql_text})'''.format(
+            output_table_name=output_table_name, sql_text=sql_expr.sql_text
+        )
+        print(total_sql)
+        context.engine.connect().execute(total_sql)
+
+    return OutputDefinition(
+        name='TRUNCATE_AND_INSERT',
+        output_fn=output_fn,
+        argument_def_dict={'table_name': types.STRING},
+    )
+
+
 def _table_input_fn(context, arg_dict):
 
     check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
@@ -119,7 +140,7 @@ def create_sql_solid(name, inputs, sql_text):
         name,
         inputs=inputs,
         transform_fn=create_sql_transform(sql_text),
-        outputs=[create_table_output()],
+        outputs=[create_table_output(), truncate_and_insert_table_output()],
     )
 
 

From 2cd750ee1a28edf8bd0752d6778dd953b4f7b42e Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Mon, 28 May 2018 08:11:12 -0700
Subject: [PATCH 13/29] Add null_output to dagster_pd

---
 dagster/pandas_kernel/__init__.py    | 14 ++++++++++----
 dagster/pandas_kernel/definitions.py | 17 +++++++++++++++++
 2 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 90f933a9c682f..994c07864712a 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -16,9 +16,7 @@
     create_dagster_pd_parquet_output,
     create_dagster_pd_read_table_input,
 )
-from dagster.core import (
-    create_json_input
-)
+from dagster.core import (create_json_input)
 
 
 def solid(**kwargs):
@@ -87,7 +85,7 @@ def dataframe_solid(*args, name, inputs, transform_fn=None, **kwargs):
     return Solid(
         name=name,
         inputs=inputs,
-        outputs=[csv_output(), parquet_output()],
+        outputs=[csv_output(), parquet_output(), null_output()],
         transform_fn=_dependency_transform_wrapper(name, transform_fn),
         **kwargs
     )
@@ -102,14 +100,22 @@ def single_path_arg(input_name, path):
 def csv_input(name, delimiter=',', **read_csv_kwargs):
     return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs)
 
+
 def csv_output():
     return create_dagster_pd_csv_output()
 
+
 def read_table_input(name, delimiter=',', **read_table_kwargs):
     return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)
 
+
 def json_input(name):
     return create_json_input(name)
 
+
 def parquet_output():
     return create_dagster_pd_parquet_output()
+
+
+def null_output():
+    return create_dagster_pd_csv_output()
diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py
index 635a9a833d112..71d26eb66d787 100644
--- a/dagster/pandas_kernel/definitions.py
+++ b/dagster/pandas_kernel/definitions.py
@@ -57,6 +57,7 @@ def check_path(context, path):
 
     return create_dagster_single_file_input(name, check_path)
 
+
 def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs):
     check.str_param(name, 'name')
     check.str_param(delimiter, 'delimiter')
@@ -97,3 +98,19 @@ def output_fn_inst(df, context, arg_dict):
     return OutputDefinition(
         name='PARQUET', output_fn=output_fn_inst, argument_def_dict={'path': types.PATH}
     )
+
+
+def create_dagster_pd_null_output():
+    def output_fn_inst(context, arg_dict):
+        # check.inst_param(df, 'df', pd.DataFrame)
+        check.inst_param(context, 'context', DagsterExecutionContext)
+        check.dict_param(arg_dict, 'arg_dict')
+        # path = check.str_elem(arg_dict, 'path')
+
+        # df.to_csv(path, index=False)
+
+    return OutputDefinition(
+        name='None',
+        output_fn=output_fn_inst,
+        # argument_def_dict={'path': types.PATH}
+    )

From 95d5298a0e85d239812cc32fecf46e0f5e9c580d Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Mon, 28 May 2018 14:36:44 -0700
Subject: [PATCH 14/29] Decruft

---
 dagster/sqlalchemy_kernel/__init__.py | 13 -------------
 1 file changed, 13 deletions(-)

diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py
index 4c2e1c3678156..744c9868868f6 100644
--- a/dagster/sqlalchemy_kernel/__init__.py
+++ b/dagster/sqlalchemy_kernel/__init__.py
@@ -32,19 +32,6 @@ def __init__(self, subquery_text):
     def query_text(self):
         return self._subquery_text
 
-    # @property
-    # def from_target(self):
-    #     return f'({self.sql_text})'
-
-    #FIXME: This is the worst hack I've implemented in a long time.
-    #Without it, you get errors like this:
-    """
-E       sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")"
-E       LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))
-E                                                                            ^
-E        [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405)
-"""
-
     @property
     def from_target(self):
         return f'({self._subquery_text})'

From 1b84c4531c8a7af6da2f944dafe6b394c3052379 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Thu, 31 May 2018 15:45:04 -0700
Subject: [PATCH 15/29] Add extra_inputs to
 create_templated_sql_transform_solid

---
 dagster/sqlalchemy_kernel/common.py    |  7 ++++++-
 dagster/sqlalchemy_kernel/templated.py | 14 ++++++++++++--
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/dagster/sqlalchemy_kernel/common.py b/dagster/sqlalchemy_kernel/common.py
index 43e3aa71712fd..705ec30695e46 100644
--- a/dagster/sqlalchemy_kernel/common.py
+++ b/dagster/sqlalchemy_kernel/common.py
@@ -35,4 +35,9 @@ def execute_sql_text_on_context(context, sql_text):
         finally:
             cursor.close()
     else:
-        context.engine.connect().execute(sql_text)
+        # context.engine.connect().execute(sql_text)
+        connection = context.engine.connect()
+        transaction = connection.begin()
+        connection.execute(sql_text)
+        transaction.commit()
+        # cursor.commit()
diff --git a/dagster/sqlalchemy_kernel/templated.py b/dagster/sqlalchemy_kernel/templated.py
index 73adda1485511..0d5f5becd39d6 100644
--- a/dagster/sqlalchemy_kernel/templated.py
+++ b/dagster/sqlalchemy_kernel/templated.py
@@ -1,3 +1,4 @@
+import logging
 import jinja2
 
 import dagster
@@ -16,7 +17,14 @@ def _create_table_input(name, depends_on=None):
     )
 
 
-def create_templated_sql_transform_solid(name, sql, table_arguments, output, dependencies=None):
+def create_templated_sql_transform_solid(
+    name,
+    sql,
+    table_arguments,
+    output,
+    dependencies=None,
+    extra_inputs=[],
+):
     '''
     Create a solid that is a templated sql statement. This assumes that the sql statement
     is creating or modifying a table, and that that table will be used downstream in the pipeline
@@ -93,7 +101,7 @@ def create_templated_sql_transform_solid(name, sql, table_arguments, output, dep
     dep_inputs = [_create_table_input(dep.name, depends_on=dep) for dep in dependencies]
     return Solid(
         name=name,
-        inputs=table_inputs + dep_inputs,
+        inputs=table_inputs + dep_inputs + extra_inputs,
         transform_fn=_create_templated_sql_transform_with_output(sql, output),
         outputs=[],
     )
@@ -107,6 +115,8 @@ def _render_template_string(template_text, **kwargs):
 def _create_templated_sql_transform_with_output(sql, output_table):
     def do_transform(context, **kwargs):
         rendered_sql = _render_template_string(sql, **kwargs)
+        # logging.info(rendered_sql)
+        print(rendered_sql[:1000])
         execute_sql_text_on_context(context, rendered_sql)
         return kwargs[output_table]
 

From 73ae07c8a98860baf893b109affbdd827a1b56a6 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 09:52:27 -0700
Subject: [PATCH 16/29] create_json_input

---
 dagster/core/__init__.py | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index e6e5136a4b8c0..491253b812868 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -19,3 +19,17 @@ def input_definition(**kwargs):
 def file_input_definition(argument_def_dict=None, **kwargs):
     check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict')
     return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs)
+
+
+def create_json_input(name):
+    check.str_param(name, 'name')
+
+    #Note: I don't understand the function of check_path.
+    def check_path(context, path):
+        check.inst_param(context, 'context', DagsterExecutionContext)
+        check.str_param(path, 'path')
+        json_obj = json.loads(path)
+        # context.metric('rows', df.shape[0])
+        return json_obj
+
+    return create_dagster_json_input(name, check_path)
\ No newline at end of file

From f314433d2997ccbdb6b79b883ab6ff59962436cc Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:05:38 -0700
Subject: [PATCH 17/29] Get json_input working

---
 dagster/core/__init__.py          | 6 ++++--
 dagster/pandas_kernel/__init__.py | 6 +++++-
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 491253b812868..d89549d4123e9 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -3,8 +3,9 @@
 
 from dagster import check
 from dagster.core import types
+from dagster.core.execution import DagsterExecutionContext
 
-from .definitions import InputDefinition, Solid
+from .definitions import (InputDefinition, Solid, create_dagster_single_file_input)
 from .graph import DagsterPipeline
 
 
@@ -22,6 +23,7 @@ def file_input_definition(argument_def_dict=None, **kwargs):
 
 
 def create_json_input(name):
+    import json
     check.str_param(name, 'name')
 
     #Note: I don't understand the function of check_path.
@@ -32,4 +34,4 @@ def check_path(context, path):
         # context.metric('rows', df.shape[0])
         return json_obj
 
-    return create_dagster_json_input(name, check_path)
\ No newline at end of file
+    return create_dagster_single_file_input(name, check_path)
\ No newline at end of file
diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 648fe71a0e11c..6bf469f245178 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -15,6 +15,9 @@
     create_dagster_pd_dependency_input,
     create_dagster_pd_parquet_output,
 )
+from dagster.core import (
+    create_json_input
+)
 
 
 def solid(**kwargs):
@@ -98,10 +101,11 @@ def single_path_arg(input_name, path):
 def csv_input(name, delimiter=',', **read_csv_kwargs):
     return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs)
 
-
 def csv_output():
     return create_dagster_pd_csv_output()
 
+def json_input(name):
+    return create_json_input(name)
 
 def parquet_output():
     return create_dagster_pd_parquet_output()

From c47f407f96e386b8e75f79e4a837b7c1f74e0c3e Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:11:41 -0700
Subject: [PATCH 18/29] Add read_table_input. Make arg names more sensible

---
 dagster/pandas_kernel/__init__.py    | 4 ++++
 dagster/pandas_kernel/definitions.py | 4 ++--
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 6bf469f245178..72bd71ebd7f74 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -14,6 +14,7 @@
     create_dagster_pd_csv_output,
     create_dagster_pd_dependency_input,
     create_dagster_pd_parquet_output,
+    create_dagster_pd_read_table_input,
 )
 from dagster.core import (
     create_json_input
@@ -104,6 +105,9 @@ def csv_input(name, delimiter=',', **read_csv_kwargs):
 def csv_output():
     return create_dagster_pd_csv_output()
 
+def read_table_input(name, delimiter=',', **read_table_kwargs):
+    return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)
+
 def json_input(name):
     return create_json_input(name)
 
diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py
index 1caedf0ba2925..635a9a833d112 100644
--- a/dagster/pandas_kernel/definitions.py
+++ b/dagster/pandas_kernel/definitions.py
@@ -57,14 +57,14 @@ def check_path(context, path):
 
     return create_dagster_single_file_input(name, check_path)
 
-def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs):
+def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs):
     check.str_param(name, 'name')
     check.str_param(delimiter, 'delimiter')
 
     def check_path(context, path):
         check.inst_param(context, 'context', DagsterExecutionContext)
         check.str_param(path, 'path')
-        df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs)
+        df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs)
         context.metric('rows', df.shape[0])
         return df
 

From 6a29c3dff5b2b673d5cdac8ca9d13de93421df47 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:16:43 -0700
Subject: [PATCH 19/29] Add json dependencies. Load the file, not the path

---
 dagster/core/__init__.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index d89549d4123e9..cf7ced7314406 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -8,6 +8,8 @@
 from .definitions import (InputDefinition, Solid, create_dagster_single_file_input)
 from .graph import DagsterPipeline
 
+from dagster.core.execution import DagsterExecutionContext
+import json
 
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
@@ -30,7 +32,7 @@ def create_json_input(name):
     def check_path(context, path):
         check.inst_param(context, 'context', DagsterExecutionContext)
         check.str_param(path, 'path')
-        json_obj = json.loads(path)
+        json_obj = json.load(open(path))
         # context.metric('rows', df.shape[0])
         return json_obj
 

From ac5a5c1f625ddcd7c07310657438c5c2c8205720 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 14:11:35 -0700
Subject: [PATCH 20/29] Implement horrible, horrible SQL syntax hack.

---
 dagster/sqlalchemy_kernel/__init__.py | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py
index c0db2e2fea6c6..b1c90e8d862b5 100644
--- a/dagster/sqlalchemy_kernel/__init__.py
+++ b/dagster/sqlalchemy_kernel/__init__.py
@@ -32,6 +32,19 @@ def __init__(self, subquery_text):
     def query_text(self):
         return self._subquery_text
 
+    # @property
+    # def from_target(self):
+    #     return f'({self.sql_text})'
+
+    #FIXME: This is the worst hack I've implemented in a long time.
+    #Without it, you get errors like this:
+    """
+E       sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")"
+E       LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))
+E                                                                            ^
+E        [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405)
+"""
+
     @property
     def from_target(self):
         return f'({self._subquery_text})'

From b7b4d3d44f8eb13033aac590a62385d2144c5e28 Mon Sep 17 00:00:00 2001
From: Nick Schrock <schrockn@gmail.com>
Date: Fri, 1 Jun 2018 14:47:46 -0700
Subject: [PATCH 21/29] Implement truncate_and_insert_table_output

---
 .vscode/settings.json                 |  2 +-
 dagster/core/__init__.py              |  1 +
 dagster/sqlalchemy_kernel/__init__.py | 35 ++++++++++++++++-----------
 3 files changed, 23 insertions(+), 15 deletions(-)

diff --git a/.vscode/settings.json b/.vscode/settings.json
index 7c57ac3e73386..84360ef17f9fc 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -10,7 +10,7 @@
     "-s"
   ],
   "python.unitTest.pyTestEnabled": true,
-  "python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python",
+  "python.pythonPath": "/Users/schrockn/code/venvs/dagster/bin/python",
   "python.linting.lintOnSave": true,
   "python.linting.pep8Enabled": false,
   "python.linting.mypyEnabled": false,
diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index cf7ced7314406..e00be887d752e 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -11,6 +11,7 @@
 from dagster.core.execution import DagsterExecutionContext
 import json
 
+
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
 
diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py
index b1c90e8d862b5..744c9868868f6 100644
--- a/dagster/sqlalchemy_kernel/__init__.py
+++ b/dagster/sqlalchemy_kernel/__init__.py
@@ -32,19 +32,6 @@ def __init__(self, subquery_text):
     def query_text(self):
         return self._subquery_text
 
-    # @property
-    # def from_target(self):
-    #     return f'({self.sql_text})'
-
-    #FIXME: This is the worst hack I've implemented in a long time.
-    #Without it, you get errors like this:
-    """
-E       sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) syntax error at or near ")"
-E       LINE 1: ...CT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))
-E                                                                            ^
-E        [SQL: 'CREATE TABLE abe_temp.sum_sq_table AS SELECT num1, num2, sum, sum * sum as sum_sq from (SELECT num1, num2, num1 + num2 as sum FROM (abe_temp.num_table))'] (Background on this error at: http://sqlalche.me/e/f405)
-"""
-
     @property
     def from_target(self):
         return f'({self._subquery_text})'
@@ -83,6 +70,26 @@ def output_fn(sql_expr, context, arg_dict):
     )
 
 
+def truncate_and_insert_table_output():
+    def output_fn(sql_expr, context, arg_dict):
+        check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression)
+        check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
+        check.dict_param(arg_dict, 'arg_dict')
+
+        output_table_name = check.str_elem(arg_dict, 'table_name')
+        total_sql = '''TRUNCATE TABLE {output_table_name}; INSERT INTO {output_table_name} ({sql_text})'''.format(
+            output_table_name=output_table_name, sql_text=sql_expr.sql_text
+        )
+        print(total_sql)
+        context.engine.connect().execute(total_sql)
+
+    return OutputDefinition(
+        name='TRUNCATE_AND_INSERT',
+        output_fn=output_fn,
+        argument_def_dict={'table_name': types.STRING},
+    )
+
+
 def _table_input_fn(context, arg_dict):
 
     check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
@@ -139,7 +146,7 @@ def create_sql_solid(name, inputs, sql_text):
         name,
         inputs=inputs,
         transform_fn=create_sql_transform(sql_text),
-        outputs=[create_table_output()],
+        outputs=[create_table_output(), truncate_and_insert_table_output()],
     )
 
 

From 969d38c77a578bf47b607394308e597e17a3270a Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Mon, 28 May 2018 08:11:12 -0700
Subject: [PATCH 22/29] Add null_output to dagster_pd

---
 dagster/pandas_kernel/__init__.py    | 14 ++++++++++----
 dagster/pandas_kernel/definitions.py | 17 +++++++++++++++++
 2 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 72bd71ebd7f74..547a0a39908cf 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -16,9 +16,7 @@
     create_dagster_pd_parquet_output,
     create_dagster_pd_read_table_input,
 )
-from dagster.core import (
-    create_json_input
-)
+from dagster.core import (create_json_input)
 
 
 def solid(**kwargs):
@@ -87,7 +85,7 @@ def dataframe_solid(*args, name, inputs, transform_fn=None, **kwargs):
     return Solid(
         name=name,
         inputs=inputs,
-        outputs=[csv_output(), parquet_output()],
+        outputs=[csv_output(), parquet_output(), null_output()],
         transform_fn=_dependency_transform_wrapper(name, transform_fn),
         **kwargs
     )
@@ -102,14 +100,22 @@ def single_path_arg(input_name, path):
 def csv_input(name, delimiter=',', **read_csv_kwargs):
     return create_dagster_pd_csv_input(name, delimiter, **read_csv_kwargs)
 
+
 def csv_output():
     return create_dagster_pd_csv_output()
 
+
 def read_table_input(name, delimiter=',', **read_table_kwargs):
     return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)
 
+
 def json_input(name):
     return create_json_input(name)
 
+
 def parquet_output():
     return create_dagster_pd_parquet_output()
+
+
+def null_output():
+    return create_dagster_pd_csv_output()
diff --git a/dagster/pandas_kernel/definitions.py b/dagster/pandas_kernel/definitions.py
index 635a9a833d112..71d26eb66d787 100644
--- a/dagster/pandas_kernel/definitions.py
+++ b/dagster/pandas_kernel/definitions.py
@@ -57,6 +57,7 @@ def check_path(context, path):
 
     return create_dagster_single_file_input(name, check_path)
 
+
 def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs):
     check.str_param(name, 'name')
     check.str_param(delimiter, 'delimiter')
@@ -97,3 +98,19 @@ def output_fn_inst(df, context, arg_dict):
     return OutputDefinition(
         name='PARQUET', output_fn=output_fn_inst, argument_def_dict={'path': types.PATH}
     )
+
+
+def create_dagster_pd_null_output():
+    def output_fn_inst(context, arg_dict):
+        # check.inst_param(df, 'df', pd.DataFrame)
+        check.inst_param(context, 'context', DagsterExecutionContext)
+        check.dict_param(arg_dict, 'arg_dict')
+        # path = check.str_elem(arg_dict, 'path')
+
+        # df.to_csv(path, index=False)
+
+    return OutputDefinition(
+        name='None',
+        output_fn=output_fn_inst,
+        # argument_def_dict={'path': types.PATH}
+    )

From 21afcc720b70a380ed4576c704ffa3cdb81d8c97 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 09:52:27 -0700
Subject: [PATCH 23/29] create_json_input

---
 dagster/core/__init__.py | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index e00be887d752e..d3950a4a358d8 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -1,6 +1,8 @@
 from __future__ import (absolute_import, division, print_function, unicode_literals)
 from builtins import *  # pylint: disable=W0622,W0401
 
+import json
+
 from dagster import check
 from dagster.core import types
 from dagster.core.execution import DagsterExecutionContext
@@ -8,9 +10,6 @@
 from .definitions import (InputDefinition, Solid, create_dagster_single_file_input)
 from .graph import DagsterPipeline
 
-from dagster.core.execution import DagsterExecutionContext
-import json
-
 
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
@@ -26,7 +25,6 @@ def file_input_definition(argument_def_dict=None, **kwargs):
 
 
 def create_json_input(name):
-    import json
     check.str_param(name, 'name')
 
     #Note: I don't understand the function of check_path.
@@ -37,4 +35,4 @@ def check_path(context, path):
         # context.metric('rows', df.shape[0])
         return json_obj
 
-    return create_dagster_single_file_input(name, check_path)
\ No newline at end of file
+    return create_dagster_single_file_input(name, check_path)

From e772892a411b8f44b5c4e7ce1fabeaa96e02b54a Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:05:38 -0700
Subject: [PATCH 24/29] Get json_input working

---
 dagster/core/__init__.py          | 2 +-
 dagster/pandas_kernel/__init__.py | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index d3950a4a358d8..2608826e83d96 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -7,7 +7,7 @@
 from dagster.core import types
 from dagster.core.execution import DagsterExecutionContext
 
-from .definitions import (InputDefinition, Solid, create_dagster_single_file_input)
+from .definitions import (InputDefinition, create_dagster_single_file_input)
 from .graph import DagsterPipeline
 
 
diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 547a0a39908cf..296b0d1987c1a 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -6,6 +6,7 @@
 from dagster import check
 from dagster.utils import has_context_argument
 
+from dagster.core import create_json_input
 from dagster.core.definitions import Solid
 from dagster.core.execution import DagsterExecutionContext
 from dagster.core.errors import (DagsterUserCodeExecutionError, DagsterInvariantViolationError)
@@ -16,7 +17,6 @@
     create_dagster_pd_parquet_output,
     create_dagster_pd_read_table_input,
 )
-from dagster.core import (create_json_input)
 
 
 def solid(**kwargs):
@@ -105,6 +105,10 @@ def csv_output():
     return create_dagster_pd_csv_output()
 
 
+def json_input(name):
+    return create_json_input(name)
+
+
 def read_table_input(name, delimiter=',', **read_table_kwargs):
     return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)
 

From da12d4a88e4366584c6f820e07cebdf02f708edd Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:11:41 -0700
Subject: [PATCH 25/29] Add read_table_input. Make arg names more sensible

---
 dagster/pandas_kernel/__init__.py | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/dagster/pandas_kernel/__init__.py b/dagster/pandas_kernel/__init__.py
index 296b0d1987c1a..4e0e3168b589e 100644
--- a/dagster/pandas_kernel/__init__.py
+++ b/dagster/pandas_kernel/__init__.py
@@ -105,10 +105,6 @@ def csv_output():
     return create_dagster_pd_csv_output()
 
 
-def json_input(name):
-    return create_json_input(name)
-
-
 def read_table_input(name, delimiter=',', **read_table_kwargs):
     return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)
 

From f38fa9d9f30d215dbf4332fd22ff4e9e70180c1e Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 10:16:43 -0700
Subject: [PATCH 26/29] Add json dependencies. Load the file, not the path

---
 dagster/core/__init__.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 2608826e83d96..b394595534a03 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -10,6 +10,8 @@
 from .definitions import (InputDefinition, create_dagster_single_file_input)
 from .graph import DagsterPipeline
 
+from dagster.core.execution import DagsterExecutionContext
+import json
 
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)

From 997ac6a4c9fcf2565797f3a2044308aeda5ff682 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Wed, 23 May 2018 14:43:01 -0700
Subject: [PATCH 27/29] Implement truncate_and_insert_table_output

---
 dagster/core/__init__.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index b394595534a03..00b6892f67fa3 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -13,6 +13,7 @@
 from dagster.core.execution import DagsterExecutionContext
 import json
 
+
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
 

From 17b2434768b2dfc1b1c0ffb666ef5df34e3af0c9 Mon Sep 17 00:00:00 2001
From: Abe <abegong@gmail.com>
Date: Thu, 31 May 2018 15:45:04 -0700
Subject: [PATCH 28/29] Add extra_inputs to
 create_templated_sql_transform_solid

---
 dagster/sqlalchemy_kernel/common.py    |  7 ++++++-
 dagster/sqlalchemy_kernel/templated.py | 14 ++++++++++++--
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/dagster/sqlalchemy_kernel/common.py b/dagster/sqlalchemy_kernel/common.py
index 43e3aa71712fd..705ec30695e46 100644
--- a/dagster/sqlalchemy_kernel/common.py
+++ b/dagster/sqlalchemy_kernel/common.py
@@ -35,4 +35,9 @@ def execute_sql_text_on_context(context, sql_text):
         finally:
             cursor.close()
     else:
-        context.engine.connect().execute(sql_text)
+        # context.engine.connect().execute(sql_text)
+        connection = context.engine.connect()
+        transaction = connection.begin()
+        connection.execute(sql_text)
+        transaction.commit()
+        # cursor.commit()
diff --git a/dagster/sqlalchemy_kernel/templated.py b/dagster/sqlalchemy_kernel/templated.py
index 73adda1485511..0d5f5becd39d6 100644
--- a/dagster/sqlalchemy_kernel/templated.py
+++ b/dagster/sqlalchemy_kernel/templated.py
@@ -1,3 +1,4 @@
+import logging
 import jinja2
 
 import dagster
@@ -16,7 +17,14 @@ def _create_table_input(name, depends_on=None):
     )
 
 
-def create_templated_sql_transform_solid(name, sql, table_arguments, output, dependencies=None):
+def create_templated_sql_transform_solid(
+    name,
+    sql,
+    table_arguments,
+    output,
+    dependencies=None,
+    extra_inputs=[],
+):
     '''
     Create a solid that is a templated sql statement. This assumes that the sql statement
     is creating or modifying a table, and that that table will be used downstream in the pipeline
@@ -93,7 +101,7 @@ def create_templated_sql_transform_solid(name, sql, table_arguments, output, dep
     dep_inputs = [_create_table_input(dep.name, depends_on=dep) for dep in dependencies]
     return Solid(
         name=name,
-        inputs=table_inputs + dep_inputs,
+        inputs=table_inputs + dep_inputs + extra_inputs,
         transform_fn=_create_templated_sql_transform_with_output(sql, output),
         outputs=[],
     )
@@ -107,6 +115,8 @@ def _render_template_string(template_text, **kwargs):
 def _create_templated_sql_transform_with_output(sql, output_table):
     def do_transform(context, **kwargs):
         rendered_sql = _render_template_string(sql, **kwargs)
+        # logging.info(rendered_sql)
+        print(rendered_sql[:1000])
         execute_sql_text_on_context(context, rendered_sql)
         return kwargs[output_table]
 

From 323b439d8f40cac1f973bd2a42747ead97107b5f Mon Sep 17 00:00:00 2001
From: Nick Schrock <schrockn@gmail.com>
Date: Fri, 1 Jun 2018 15:00:26 -0700
Subject: [PATCH 29/29] minor cleanup

---
 dagster/core/__init__.py               | 13 ++++---------
 dagster/sqlalchemy_kernel/__init__.py  |  4 ++--
 dagster/sqlalchemy_kernel/common.py    |  2 --
 dagster/sqlalchemy_kernel/templated.py |  2 --
 4 files changed, 6 insertions(+), 15 deletions(-)

diff --git a/dagster/core/__init__.py b/dagster/core/__init__.py
index 00b6892f67fa3..1b0934dbc790e 100644
--- a/dagster/core/__init__.py
+++ b/dagster/core/__init__.py
@@ -10,9 +10,6 @@
 from .definitions import (InputDefinition, create_dagster_single_file_input)
 from .graph import DagsterPipeline
 
-from dagster.core.execution import DagsterExecutionContext
-import json
-
 
 def pipeline(**kwargs):
     return DagsterPipeline(**kwargs)
@@ -30,12 +27,10 @@ def file_input_definition(argument_def_dict=None, **kwargs):
 def create_json_input(name):
     check.str_param(name, 'name')
 
-    #Note: I don't understand the function of check_path.
-    def check_path(context, path):
+    def load_file(context, path):
         check.inst_param(context, 'context', DagsterExecutionContext)
         check.str_param(path, 'path')
-        json_obj = json.load(open(path))
-        # context.metric('rows', df.shape[0])
-        return json_obj
+        with open(path) as ff:
+            return json.load(ff)
 
-    return create_dagster_single_file_input(name, check_path)
+    return create_dagster_single_file_input(name, load_file)
diff --git a/dagster/sqlalchemy_kernel/__init__.py b/dagster/sqlalchemy_kernel/__init__.py
index 744c9868868f6..fee2939db7233 100644
--- a/dagster/sqlalchemy_kernel/__init__.py
+++ b/dagster/sqlalchemy_kernel/__init__.py
@@ -77,10 +77,10 @@ def output_fn(sql_expr, context, arg_dict):
         check.dict_param(arg_dict, 'arg_dict')
 
         output_table_name = check.str_elem(arg_dict, 'table_name')
-        total_sql = '''TRUNCATE TABLE {output_table_name}; INSERT INTO {output_table_name} ({sql_text})'''.format(
+        total_sql = '''TRUNCATE TABLE {output_table_name};
+                       INSERT INTO {output_table_name} ({sql_text})'''.format(
             output_table_name=output_table_name, sql_text=sql_expr.sql_text
         )
-        print(total_sql)
         context.engine.connect().execute(total_sql)
 
     return OutputDefinition(
diff --git a/dagster/sqlalchemy_kernel/common.py b/dagster/sqlalchemy_kernel/common.py
index 705ec30695e46..d02c64bbba35f 100644
--- a/dagster/sqlalchemy_kernel/common.py
+++ b/dagster/sqlalchemy_kernel/common.py
@@ -35,9 +35,7 @@ def execute_sql_text_on_context(context, sql_text):
         finally:
             cursor.close()
     else:
-        # context.engine.connect().execute(sql_text)
         connection = context.engine.connect()
         transaction = connection.begin()
         connection.execute(sql_text)
         transaction.commit()
-        # cursor.commit()
diff --git a/dagster/sqlalchemy_kernel/templated.py b/dagster/sqlalchemy_kernel/templated.py
index 0d5f5becd39d6..4b1011494a933 100644
--- a/dagster/sqlalchemy_kernel/templated.py
+++ b/dagster/sqlalchemy_kernel/templated.py
@@ -115,8 +115,6 @@ def _render_template_string(template_text, **kwargs):
 def _create_templated_sql_transform_with_output(sql, output_table):
     def do_transform(context, **kwargs):
         rendered_sql = _render_template_string(sql, **kwargs)
-        # logging.info(rendered_sql)
-        print(rendered_sql[:1000])
         execute_sql_text_on_context(context, rendered_sql)
         return kwargs[output_table]