Skip to content

Commit

Permalink
Split execution test into with output and with error
Browse files Browse the repository at this point in the history
Signed-off-by: Bernhard Stadlbauer <[email protected]>
  • Loading branch information
Bernhard Stadlbauer authored and bstadlbauer committed Jan 29, 2022
1 parent adea5c0 commit 1adbbbb
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions tests/flytekit/unit/models/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,52 @@
)


def test_execution_closure():
def test_execution_closure_with_output():
test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC)
test_timedelta = datetime.timedelta(seconds=10)
test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/")

obj = _execution.ExecutionClosure(
phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED,
started_at=test_datetime,
duration=test_timedelta,
outputs=test_outputs,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
assert obj.duration == test_timedelta
assert obj.outputs == test_outputs
obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
assert obj2.outputs == test_outputs


def test_execution_closure_with_error():
test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC)
test_timedelta = datetime.timedelta(seconds=10)
test_error = _core_exec.ExecutionError(
code="foo", message="bar", error_uri="http://foobar", kind=_core_exec.ExecutionError.ErrorKind.USER
)
test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/")

obj = _execution.ExecutionClosure(
phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED,
started_at=test_datetime,
duration=test_timedelta,
error=test_error,
outputs=test_outputs,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
assert obj.duration == test_timedelta
assert obj.error == test_error
assert obj.outputs == test_outputs
obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
# assert obj2.error == test_error # FIXME: This won't work for some reason?
assert obj2.outputs == test_outputs
assert obj2.error == test_error


def test_execution_metadata():
Expand Down

0 comments on commit 1adbbbb

Please sign in to comment.