Skip to content

Commit

Permalink
Merge pull request #8 from schrockn/running_edits_with_no_tests
Browse files Browse the repository at this point in the history
Running edits with no tests
  • Loading branch information
Nicholas Schrock authored Jun 1, 2018
2 parents f53762a + 323b439 commit e5fa0a0
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"-s"
],
"python.unitTest.pyTestEnabled": true,
"python.pythonPath": "/Users/schrockn/code/venvs/new_env/bin/python",
"python.pythonPath": "/Users/schrockn/code/venvs/dagster/bin/python",
"python.linting.lintOnSave": true,
"python.linting.pep8Enabled": false,
"python.linting.mypyEnabled": false,
Expand Down
17 changes: 16 additions & 1 deletion dagster/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import (absolute_import, division, print_function, unicode_literals)
from builtins import * # pylint: disable=W0622,W0401

import json

from dagster import check
from dagster.core import types
from dagster.core.execution import DagsterExecutionContext

from .definitions import InputDefinition, Solid
from .definitions import (InputDefinition, create_dagster_single_file_input)
from .graph import DagsterPipeline


Expand All @@ -19,3 +22,15 @@ def input_definition(**kwargs):
def file_input_definition(argument_def_dict=None, **kwargs):
check.param_invariant(argument_def_dict is None, 'Should not provide argument_def_dict')
return InputDefinition(argument_def_dict={'path': types.PATH}, **kwargs)


def create_json_input(name):
check.str_param(name, 'name')

def load_file(context, path):
check.inst_param(context, 'context', DagsterExecutionContext)
check.str_param(path, 'path')
with open(path) as ff:
return json.load(ff)

return create_dagster_single_file_input(name, load_file)
17 changes: 16 additions & 1 deletion dagster/pandas_kernel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster import check
from dagster.utils import has_context_argument

from dagster.core import create_json_input
from dagster.core.definitions import Solid
from dagster.core.execution import DagsterExecutionContext
from dagster.core.errors import (DagsterUserCodeExecutionError, DagsterInvariantViolationError)
Expand All @@ -14,7 +15,9 @@
create_dagster_pd_csv_output,
create_dagster_pd_dependency_input,
create_dagster_pd_parquet_output,
create_dagster_pd_read_table_input,
)
from dagster.core import (create_json_input)


def solid(**kwargs):
Expand Down Expand Up @@ -83,7 +86,7 @@ def dataframe_solid(*args, name, inputs, transform_fn=None, **kwargs):
return Solid(
name=name,
inputs=inputs,
outputs=[csv_output(), parquet_output()],
outputs=[csv_output(), parquet_output(), null_output()],
transform_fn=_dependency_transform_wrapper(name, transform_fn),
**kwargs
)
Expand All @@ -103,5 +106,17 @@ def csv_output():
return create_dagster_pd_csv_output()


def read_table_input(name, delimiter=',', **read_table_kwargs):
return create_dagster_pd_read_table_input(name, delimiter, **read_table_kwargs)


def json_input(name):
return create_json_input(name)


def parquet_output():
return create_dagster_pd_parquet_output()


def null_output():
return create_dagster_pd_csv_output()
21 changes: 19 additions & 2 deletions dagster/pandas_kernel/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ def check_path(context, path):

return create_dagster_single_file_input(name, check_path)

def create_dagster_pd_read_table_input(name, delimiter=',', **read_csv_kwargs):

def create_dagster_pd_read_table_input(name, delimiter=',', **read_table_kwargs):
check.str_param(name, 'name')
check.str_param(delimiter, 'delimiter')

def check_path(context, path):
check.inst_param(context, 'context', DagsterExecutionContext)
check.str_param(path, 'path')
df = pd.read_table(path, delimiter=delimiter, **read_csv_kwargs)
df = pd.read_table(path, delimiter=delimiter, **read_table_kwargs)
context.metric('rows', df.shape[0])
return df

Expand Down Expand Up @@ -97,3 +98,19 @@ def output_fn_inst(df, context, arg_dict):
return OutputDefinition(
name='PARQUET', output_fn=output_fn_inst, argument_def_dict={'path': types.PATH}
)


def create_dagster_pd_null_output():
def output_fn_inst(context, arg_dict):
# check.inst_param(df, 'df', pd.DataFrame)
check.inst_param(context, 'context', DagsterExecutionContext)
check.dict_param(arg_dict, 'arg_dict')
# path = check.str_elem(arg_dict, 'path')

# df.to_csv(path, index=False)

return OutputDefinition(
name='None',
output_fn=output_fn_inst,
# argument_def_dict={'path': types.PATH}
)
22 changes: 21 additions & 1 deletion dagster/sqlalchemy_kernel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ def output_fn(sql_expr, context, arg_dict):
)


def truncate_and_insert_table_output():
def output_fn(sql_expr, context, arg_dict):
check.inst_param(sql_expr, 'sql_expr', DagsterSqlExpression)
check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
check.dict_param(arg_dict, 'arg_dict')

output_table_name = check.str_elem(arg_dict, 'table_name')
total_sql = '''TRUNCATE TABLE {output_table_name};
INSERT INTO {output_table_name} ({sql_text})'''.format(
output_table_name=output_table_name, sql_text=sql_expr.sql_text
)
context.engine.connect().execute(total_sql)

return OutputDefinition(
name='TRUNCATE_AND_INSERT',
output_fn=output_fn,
argument_def_dict={'table_name': types.STRING},
)


def _table_input_fn(context, arg_dict):

check.inst_param(context, 'context', DagsterSqlAlchemyExecutionContext)
Expand Down Expand Up @@ -126,7 +146,7 @@ def create_sql_solid(name, inputs, sql_text):
name,
inputs=inputs,
transform_fn=create_sql_transform(sql_text),
outputs=[create_table_output()],
outputs=[create_table_output(), truncate_and_insert_table_output()],
)


Expand Down
5 changes: 4 additions & 1 deletion dagster/sqlalchemy_kernel/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ def execute_sql_text_on_context(context, sql_text):
finally:
cursor.close()
else:
context.engine.connect().execute(sql_text)
connection = context.engine.connect()
transaction = connection.begin()
connection.execute(sql_text)
transaction.commit()
12 changes: 10 additions & 2 deletions dagster/sqlalchemy_kernel/templated.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import jinja2

import dagster
Expand All @@ -16,7 +17,14 @@ def _create_table_input(name, depends_on=None):
)


def create_templated_sql_transform_solid(name, sql, table_arguments, output, dependencies=None):
def create_templated_sql_transform_solid(
name,
sql,
table_arguments,
output,
dependencies=None,
extra_inputs=[],
):
'''
Create a solid that is a templated sql statement. This assumes that the sql statement
is creating or modifying a table, and that that table will be used downstream in the pipeline
Expand Down Expand Up @@ -93,7 +101,7 @@ def create_templated_sql_transform_solid(name, sql, table_arguments, output, dep
dep_inputs = [_create_table_input(dep.name, depends_on=dep) for dep in dependencies]
return Solid(
name=name,
inputs=table_inputs + dep_inputs,
inputs=table_inputs + dep_inputs + extra_inputs,
transform_fn=_create_templated_sql_transform_with_output(sql, output),
outputs=[],
)
Expand Down

0 comments on commit e5fa0a0

Please sign in to comment.