Skip to content

Commit

Permalink
Additional tests (#14)
Browse files Browse the repository at this point in the history
- [X] Retryable Dynamic Nodes
- [X] RunToCompletion Workflow
  • Loading branch information
EngHabu authored Jun 15, 2020
1 parent 66affef commit eeafd5b
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 6 deletions.
47 changes: 43 additions & 4 deletions flytetools/flytetester/app/workflows/failing_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
import time
from datetime import timedelta

from flytekit.sdk.tasks import outputs, python_task
from flytekit.sdk.tasks import (
python_task,
dynamic_task,
inputs,
outputs,
)
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class
from flytekit.common.exceptions.base import FlyteRecoverableException
from flytekit.models.core.workflow import WorkflowMetadata

@outputs(answer=Types.Integer)
@python_task
@python_task(cpu_request="40m")
def divider(wf_params, answer):
answer.set(1 / 0)

Expand All @@ -21,7 +27,7 @@ class DivideByZeroWf(object):


@outputs(answer=Types.Integer)
@python_task(timeout=timedelta(1))
@python_task(timeout=timedelta(1), cpu_request="10m")
def oversleeper(wf_params, answer):
time.sleep(10)
answer.set(1)
Expand All @@ -32,11 +38,44 @@ class SleeperWf(object):
zzz = oversleeper()


@python_task(retries=2)
@python_task(retries=2, cpu_request="20m")
def retryer(wf_params):
raise FlyteRecoverableException('This task is supposed to fail')


@workflow_class
class RetrysWf(object):
retried_task = retryer()


@dynamic_task(retries=2, cpu_request="40m")
def retryable_dynamic_node(wf_params):
yield divider()


@inputs(in_str=Types.String)
@python_task()
def echo(wf_params, in_str):
wf_params.logging.warn(in_str)


@workflow_class
class FailingDynamicNodeWF(object):
should_fail_and_retry = retryable_dynamic_node()


@workflow_class(on_failure=WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
class RunToCompletionWF(object):
div_by_zero = divider()
echo_n = echo(in_str="should never run")

echo_n_2 = echo(in_str="should run first")
echo_n_2_2 = echo(in_str="should run next")
div_by_zero_2 = divider()

should_not_run = divider()

echo_n_2 >> echo_n_2_2 >> div_by_zero_2
div_by_zero >> echo_n
echo_n_2 >> should_not_run
div_by_zero >> should_not_run
2 changes: 2 additions & 0 deletions flytetools/flytetester/end2end/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config re
flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.work.WorkflowWithIO --b hello_world
flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.failing_workflows.DivideByZeroWf
flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.failing_workflows.RetrysWf
flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.failing_workflows.FailingDynamicNodeWF
flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.failing_workflows.RunToCompletionWF

# Make sure workflow does everything correctly
flytekit_venv python end2end/validator.py
61 changes: 60 additions & 1 deletion flytetools/flytetester/end2end/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
'app.workflows.failing_workflows.DivideByZeroWf',
'app.workflows.work.WorkflowWithIO',
'app.workflows.failing_workflows.RetrysWf',
'app.workflows.failing_workflows.FailingDynamicNodeWF',
'app.workflows.failing_workflows.RunToCompletionWF',
]

# This tells Python where admin is, and also to hit Minio instead of the real S3
Expand Down Expand Up @@ -126,14 +128,71 @@ def retrys_wf_validator(execution, node_execution_list, task_execution_list):
return None # come back and check later

assert len(task_execution_list) == 3
print('Done validating app-workflows-failing-workflows-retrys-wf!')
print('Done validating app.workflows.failing_workflows.RetrysWf!')
return True


def retrys_dynamic_wf_validator(execution, node_execution_list, task_execution_list):
"""
Validation logic for app.workflows.failing_workflows.FailingDynamicNodeWF
This workflow should always fail, but the dynamic node should retry twice.
:param flytekit.models.execution.Execution execution:
:param list[flytekit.models.node_execution.NodeExecution] node_execution_list:
:param list[flytekit.models.admin.task_execution.TaskExecution] task_execution_list:
:rtype: option[bool]
"""
phase = execution.closure.phase
if not phase == _WorkflowExecutionPhase.FAILED:
# If not failed, fail the test if the execution is in an unacceptable state
if phase == _WorkflowExecutionPhase.ABORTED or phase == _WorkflowExecutionPhase.SUCCEEDED or \
phase == _WorkflowExecutionPhase.TIMED_OUT:
return False
elif phase == _WorkflowExecutionPhase.RUNNING:
return None # come back and check later
else:
return False

print('FailingDynamicNodeWF finished with {} task(s)'.format(len(task_execution_list)))
assert len(task_execution_list) == 3
print('Done validating app.workflows.failing_workflows.FailingDynamicNodeWF!')
return True


def run_to_completion_wf_validator(execution, node_execution_list, task_execution_list):
"""
Validation logic for app.workflows.failing_workflows.RunToCompletionWF
This workflow should always fail, but the dynamic node should retry twice.
:param flytekit.models.execution.Execution execution:
:param list[flytekit.models.node_execution.NodeExecution] node_execution_list:
:param list[flytekit.models.admin.task_execution.TaskExecution] task_execution_list:
:rtype: option[bool]
"""
phase = execution.closure.phase
if not phase == _WorkflowExecutionPhase.FAILED:
# If not failed, fail the test if the execution is in an unacceptable state
if phase == _WorkflowExecutionPhase.ABORTED or phase == _WorkflowExecutionPhase.SUCCEEDED or \
phase == _WorkflowExecutionPhase.TIMED_OUT:
return False
elif phase == _WorkflowExecutionPhase.RUNNING:
return None # come back and check later
else:
print('Got unexpected phase [{}]'.format(phase))
return False

print('RunToCompletionWF finished with {} task(s)'.format(len(task_execution_list)))
assert len(task_execution_list) == 4
print('Done validating app.workflows.failing_workflows.RunToCompletionWF!')
return True


validators = {
'app.workflows.work.WorkflowWithIO': workflow_with_io_validator,
'app.workflows.failing_workflows.DivideByZeroWf': failing_workflows_divide_by_zero_wf_validator,
'app.workflows.failing_workflows.RetrysWf': retrys_wf_validator,
'app.workflows.failing_workflows.FailingDynamicNodeWF': retrys_dynamic_wf_validator,
'app.workflows.failing_workflows.RunToCompletionWF': run_to_completion_wf_validator,
}


Expand Down
2 changes: 1 addition & 1 deletion flytetools/flytetester/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
flytekit[sidecar,schema]==0.8.1
flytekit[sidecar,schema]==0.9.1
statsd
opencv-python==3.4.4.19
k8s-proto>=0.0.2
Expand Down

0 comments on commit eeafd5b

Please sign in to comment.