diff --git a/flytetools/flytetester/app/workflows/failing_workflows.py b/flytetools/flytetester/app/workflows/failing_workflows.py index ee83adf1f5..af04667b0a 100644 --- a/flytetools/flytetester/app/workflows/failing_workflows.py +++ b/flytetools/flytetester/app/workflows/failing_workflows.py @@ -4,13 +4,19 @@ import time from datetime import timedelta -from flytekit.sdk.tasks import outputs, python_task +from flytekit.sdk.tasks import ( + python_task, + dynamic_task, + inputs, + outputs, +) from flytekit.sdk.types import Types from flytekit.sdk.workflow import workflow_class from flytekit.common.exceptions.base import FlyteRecoverableException +from flytekit.models.core.workflow import WorkflowMetadata @outputs(answer=Types.Integer) -@python_task +@python_task(cpu_request="40m") def divider(wf_params, answer): answer.set(1 / 0) @@ -21,7 +27,7 @@ class DivideByZeroWf(object): @outputs(answer=Types.Integer) -@python_task(timeout=timedelta(1)) +@python_task(timeout=timedelta(1), cpu_request="10m") def oversleeper(wf_params, answer): time.sleep(10) answer.set(1) @@ -32,7 +38,7 @@ class SleeperWf(object): zzz = oversleeper() -@python_task(retries=2) +@python_task(retries=2, cpu_request="20m") def retryer(wf_params): raise FlyteRecoverableException('This task is supposed to fail') @@ -40,3 +46,36 @@ def retryer(wf_params): @workflow_class class RetrysWf(object): retried_task = retryer() + + +@dynamic_task(retries=2, cpu_request="40m") +def retryable_dynamic_node(wf_params): + yield divider() + + +@inputs(in_str=Types.String) +@python_task() +def echo(wf_params, in_str): + wf_params.logging.warn(in_str) + + +@workflow_class +class FailingDynamicNodeWF(object): + should_fail_and_retry = retryable_dynamic_node() + + +@workflow_class(on_failure=WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) +class RunToCompletionWF(object): + div_by_zero = divider() + echo_n = echo(in_str="should never run") + + echo_n_2 = echo(in_str="should run first") + echo_n_2_2 = echo(in_str="should run next") + div_by_zero_2 = divider() + + should_not_run = divider() + + echo_n_2 >> echo_n_2_2 >> div_by_zero_2 + div_by_zero >> echo_n + echo_n_2 >> should_not_run + div_by_zero >> should_not_run diff --git a/flytetools/flytetester/end2end/run.sh b/flytetools/flytetester/end2end/run.sh index 7085870734..0dac75e4a2 100755 --- a/flytetools/flytetester/end2end/run.sh +++ b/flytetools/flytetester/end2end/run.sh @@ -15,6 +15,8 @@ flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config re flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.work.WorkflowWithIO --b hello_world flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.failing_workflows.DivideByZeroWf flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.failing_workflows.RetrysWf +flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.failing_workflows.FailingDynamicNodeWF +flytekit_venv pyflyte -p flytetester -d development -c end2end/end2end.config lp execute app.workflows.failing_workflows.RunToCompletionWF # Make sure workflow does everything correctly flytekit_venv python end2end/validator.py diff --git a/flytetools/flytetester/end2end/validator.py b/flytetools/flytetester/end2end/validator.py index 5c33936107..7eb348e17d 100644 --- a/flytetools/flytetester/end2end/validator.py +++ b/flytetools/flytetester/end2end/validator.py @@ -24,6 +24,8 @@ 'app.workflows.failing_workflows.DivideByZeroWf', 'app.workflows.work.WorkflowWithIO', 'app.workflows.failing_workflows.RetrysWf', + 'app.workflows.failing_workflows.FailingDynamicNodeWF', + 'app.workflows.failing_workflows.RunToCompletionWF', ] # This tells Python where admin is, and also to hit Minio instead of the real S3 @@ -126,7 +128,62 @@ def retrys_wf_validator(execution, node_execution_list, task_execution_list): return None # come back and check later assert len(task_execution_list) == 3 - print('Done validating app-workflows-failing-workflows-retrys-wf!') + print('Done validating app.workflows.failing_workflows.RetrysWf!') + return True + + +def retrys_dynamic_wf_validator(execution, node_execution_list, task_execution_list): + """ + Validation logic for app.workflows.failing_workflows.FailingDynamicNodeWF + This workflow should always fail, but the dynamic node should retry twice. + + :param flytekit.models.execution.Execution execution: + :param list[flytekit.models.node_execution.NodeExecution] node_execution_list: + :param list[flytekit.models.admin.task_execution.TaskExecution] task_execution_list: + :rtype: option[bool] + """ + phase = execution.closure.phase + if not phase == _WorkflowExecutionPhase.FAILED: + # If not failed, fail the test if the execution is in an unacceptable state + if phase == _WorkflowExecutionPhase.ABORTED or phase == _WorkflowExecutionPhase.SUCCEEDED or \ + phase == _WorkflowExecutionPhase.TIMED_OUT: + return False + elif phase == _WorkflowExecutionPhase.RUNNING: + return None # come back and check later + else: + return False + + print('FailingDynamicNodeWF finished with {} task(s)'.format(len(task_execution_list))) + assert len(task_execution_list) == 3 + print('Done validating app.workflows.failing_workflows.FailingDynamicNodeWF!') + return True + + +def run_to_completion_wf_validator(execution, node_execution_list, task_execution_list): + """ + Validation logic for app.workflows.failing_workflows.RunToCompletionWF + This workflow should always fail, but the dynamic node should retry twice. + + :param flytekit.models.execution.Execution execution: + :param list[flytekit.models.node_execution.NodeExecution] node_execution_list: + :param list[flytekit.models.admin.task_execution.TaskExecution] task_execution_list: + :rtype: option[bool] + """ + phase = execution.closure.phase + if not phase == _WorkflowExecutionPhase.FAILED: + # If not failed, fail the test if the execution is in an unacceptable state + if phase == _WorkflowExecutionPhase.ABORTED or phase == _WorkflowExecutionPhase.SUCCEEDED or \ + phase == _WorkflowExecutionPhase.TIMED_OUT: + return False + elif phase == _WorkflowExecutionPhase.RUNNING: + return None # come back and check later + else: + print('Got unexpected phase [{}]'.format(phase)) + return False + + print('RunToCompletionWF finished with {} task(s)'.format(len(task_execution_list))) + assert len(task_execution_list) == 4 + print('Done validating app.workflows.failing_workflows.RunToCompletionWF!') return True @@ -134,6 +191,8 @@ def retrys_wf_validator(execution, node_execution_list, task_execution_list): 'app.workflows.work.WorkflowWithIO': workflow_with_io_validator, 'app.workflows.failing_workflows.DivideByZeroWf': failing_workflows_divide_by_zero_wf_validator, 'app.workflows.failing_workflows.RetrysWf': retrys_wf_validator, + 'app.workflows.failing_workflows.FailingDynamicNodeWF': retrys_dynamic_wf_validator, + 'app.workflows.failing_workflows.RunToCompletionWF': run_to_completion_wf_validator, } diff --git a/flytetools/flytetester/requirements.txt b/flytetools/flytetester/requirements.txt index aba45816f8..83566fc5bb 100644 --- a/flytetools/flytetester/requirements.txt +++ b/flytetools/flytetester/requirements.txt @@ -1,4 +1,4 @@ -flytekit[sidecar,schema]==0.8.1 +flytekit[sidecar,schema]==0.9.1 statsd opencv-python==3.4.4.19 k8s-proto>=0.0.2