Skip to content

Commit

Permalink
fix GE and pandera tests
Browse files Browse the repository at this point in the history
Signed-off-by: Niels Bantilan <[email protected]>
  • Loading branch information
cosmicBboy committed Apr 11, 2023
1 parent 369ce04 commit 2769ded
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
2 changes: 1 addition & 1 deletion flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def dispatch_execute(
except Exception as exc:
msg = f"Failed to convert inputs of task '{self.name}':\n {exc}"
logger.error(msg)
raise TypeError(msg) from exc
raise type(exc)(msg) from exc

# TODO: Logger should auto inject the current context information to indicate if the task is running within
# a workflow or a subworkflow etc
Expand Down
5 changes: 3 additions & 2 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,9 @@ def __call__(self, *args, **kwargs) -> Union[Tuple[Promise], Promise, VoidPromis
self.compile()
try:
return flyte_entity_call_handler(self, *args, **input_kwargs)
except TypeError as exc:
raise TypeError(f"Encountered error while executing workflow '{self.name}':\n {exc}") from exc
except Exception as exc:
exc.args = (f"Encountered error while executing workflow '{self.name}':\n {exc}", *exc.args[1:])
raise exc

def execute(self, **kwargs):
raise Exception("Should not be called")
Expand Down
17 changes: 15 additions & 2 deletions plugins/flytekit-pandera/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ def invalid_wf() -> pandera.typing.DataFrame[OutSchema]:
def wf_with_df_input(df: pandera.typing.DataFrame[InSchema]) -> pandera.typing.DataFrame[OutSchema]:
return transform2(df=transform1(df=df))

with pytest.raises(pandera.errors.SchemaError, match="^expected series 'col2' to have type float64, got object"):
with pytest.raises(
pandera.errors.SchemaError,
match=(
"^Encountered error while executing workflow 'test_plugin.wf_with_df_input':\n"
" expected series 'col2' to have type float64, got object"
)
):
wf_with_df_input(df=invalid_df)

# raise error when executing workflow with invalid output
Expand All @@ -67,7 +73,14 @@ def transform2_noop(df: pandera.typing.DataFrame[IntermediateSchema]) -> pandera
def wf_invalid_output(df: pandera.typing.DataFrame[InSchema]) -> pandera.typing.DataFrame[OutSchema]:
return transform2_noop(df=transform1(df=df))

with pytest.raises(TypeError, match="^Failed to convert return value"):
with pytest.raises(
TypeError,
match=(
"^Encountered error while executing workflow 'test_plugin.wf_invalid_output':\n"
" Error encountered while executing 'wf_invalid_output':\n"
" Failed to convert outputs of task"
),
):
wf_invalid_output(df=valid_df)


Expand Down

0 comments on commit 2769ded

Please sign in to comment.