diff --git a/boilerplate/flyte/end2end/Makefile b/boilerplate/flyte/end2end/Makefile index 7f080e95..61ee99bc 100644 --- a/boilerplate/flyte/end2end/Makefile +++ b/boilerplate/flyte/end2end/Makefile @@ -5,8 +5,7 @@ .PHONY: end2end_execute end2end_execute: - # TODO: These arguments could come from environment variables - ./boilerplate/flyte/end2end/end2end.sh ./boilerplate/flyte/end2end/functional-test.config --return_non_zero_on_failure + ./boilerplate/flyte/end2end/end2end.sh ./boilerplate/flyte/end2end/functional-test-config.yaml --return_non_zero_on_failure .PHONY: k8s_integration_execute k8s_integration_execute: diff --git a/boilerplate/flyte/end2end/end2end.sh b/boilerplate/flyte/end2end/end2end.sh index 736e3bb3..acc9d012 100755 --- a/boilerplate/flyte/end2end/end2end.sh +++ b/boilerplate/flyte/end2end/end2end.sh @@ -9,8 +9,9 @@ set -e CONFIG_FILE=$1; shift EXTRA_FLAGS=( "$@" ) -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" +# By default only execute `core` tests +PRIORITIES="${PRIORITIES:-P0}" LATEST_VERSION=$(curl --silent "https://api.github.com/repos/flyteorg/flytesnacks/releases/latest" | jq -r .tag_name) -python ./boilerplate/flyte/end2end/run-tests.py $LATEST_VERSION P0,P1 $CONFIG_FILE ${EXTRA_FLAGS[@]} +python ./boilerplate/flyte/end2end/run-tests.py $LATEST_VERSION $PRIORITIES $CONFIG_FILE ${EXTRA_FLAGS[@]} diff --git a/boilerplate/flyte/end2end/functional-test-config.yaml b/boilerplate/flyte/end2end/functional-test-config.yaml new file mode 100644 index 00000000..6d06b707 --- /dev/null +++ b/boilerplate/flyte/end2end/functional-test-config.yaml @@ -0,0 +1,5 @@ +admin: + # For GRPC endpoints you might want to use dns:///flyte.myexample.com + endpoint: localhost:30081 + authType: Pkce + insecure: true diff --git a/boilerplate/flyte/end2end/functional-test.config b/boilerplate/flyte/end2end/functional-test.config deleted file mode 100644 index f2a867ad..00000000 --- a/boilerplate/flyte/end2end/functional-test.config +++ /dev/null @@ -1,3 +0,0 @@ -[platform] -url=127.0.0.1:30081 -insecure=True diff --git a/boilerplate/flyte/end2end/run-tests.py b/boilerplate/flyte/end2end/run-tests.py old mode 100755 new mode 100644 index 6e172f70..15b35e1d --- a/boilerplate/flyte/end2end/run-tests.py +++ b/boilerplate/flyte/end2end/run-tests.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import click +import datetime import json import sys import time @@ -10,110 +11,163 @@ from flytekit.remote import FlyteRemote from flytekit.models.core.execution import WorkflowExecutionPhase from flytekit.configuration import Config, ImageConfig, SerializationSettings +from flytekit.remote.executions import FlyteWorkflowExecution WAIT_TIME = 10 -MAX_ATTEMPTS = 60 +MAX_ATTEMPTS = 200 # This dictionary maps the names found in the flytesnacks manifest to a list of workflow names and # inputs. This is so we can progressively cover all priorities in the original flytesnacks manifest, # starting with "core". FLYTESNACKS_WORKFLOW_GROUPS: Mapping[str, List[Tuple[str, dict]]] = { + "lite": [ + ("basics.hello_world.my_wf", {}), + ("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), + ], "core": [ - ("core.control_flow.chain_tasks.chain_tasks_wf", {}), - ("core.control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), - ("core.control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), + ("basics.deck.wf", {}), + # The chain_workflows example in flytesnacks expects to be running in a sandbox. + # ("control_flow.chain_entities.chain_workflows_wf", {}), + ("control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), + ("control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), # Workflows that use nested executions cannot be launched via flyteremote. # This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482. - # ("core.control_flow.run_conditions.multiplier", {"my_input": 0.5}), - # ("core.control_flow.run_conditions.multiplier_2", {"my_input": 10}), - # ("core.control_flow.run_conditions.multiplier_3", {"my_input": 5}), - # ("core.control_flow.run_conditions.basic_boolean_wf", {"seed": 5}), - # ("core.control_flow.run_conditions.bool_input_wf", {"b": True}), - # ("core.control_flow.run_conditions.nested_conditions", {"my_input": 0.4}), - # ("core.control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}), - # ("core.control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}), - ("core.control_flow.subworkflows.parent_wf", {"a": 3}), - ("core.control_flow.subworkflows.nested_parent_wf", {"a": 3}), - ("core.flyte_basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}), + # ("control_flow.run_conditions.multiplier", {"my_input": 0.5}), + # ("control_flow.run_conditions.multiplier_2", {"my_input": 10}), + # ("control_flow.run_conditions.multiplier_3", {"my_input": 5}), + # ("control_flow.run_conditions.basic_boolean_wf", {"seed": 5}), + # ("control_flow.run_conditions.bool_input_wf", {"b": True}), + # ("control_flow.run_conditions.nested_conditions", {"my_input": 0.4}), + # ("control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}), + # ("control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}), + ("control_flow.subworkflows.parent_wf", {"a": 3}), + ("control_flow.subworkflows.nested_parent_wf", {"a": 3}), + ("basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}), # TODO: enable new files and folders workflows - # ("core.flyte_basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}), - # ("core.flyte_basics.folders.download_and_rotate", {}), - ("core.flyte_basics.hello_world.my_wf", {}), - ("core.flyte_basics.lp.my_wf", {"val": 4}), - ("core.flyte_basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), - ("core.flyte_basics.named_outputs.my_wf", {}), + # ("basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}), + # ("basics.folders.download_and_rotate", {}), + ("basics.hello_world.my_wf", {}), + ("basics.lp.my_wf", {"val": 4}), + ("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), + ("basics.named_outputs.my_wf", {}), # # Getting a 403 for the wikipedia image - # # ("core.flyte_basics.reference_task.wf", {}), - ("core.type_system.custom_objects.wf", {"x": 10, "y": 20}), + # # ("basics.reference_task.wf", {}), + ("type_system.custom_objects.wf", {"x": 10, "y": 20}), # Enums are not supported in flyteremote - # ("core.type_system.enums.enum_wf", {"c": "red"}), - ("core.type_system.schema.df_wf", {"a": 42}), - ("core.type_system.typed_schema.wf", {}), - ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), + # ("type_system.enums.enum_wf", {"c": "red"}), + ("type_system.schema.df_wf", {"a": 42}), + ("type_system.typed_schema.wf", {}), + #("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), + ], + "integrations-k8s-spark": [ + ("k8s_spark.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}), + ], + "integrations-kfpytorch": [ + ("kfpytorch.pytorch_mnist.pytorch_training_wf", {}), + ], + "integrations-kftensorflow": [ + ("kftensorflow.tf_mnist.mnist_tensorflow_workflow", {}), + ], + # "integrations-pod": [ + # ("pod.pod.pod_workflow", {}), + # ], + "integrations-pandera_examples": [ + ("pandera_examples.basic_schema_example.process_data", {}), + # TODO: investigate type mismatch float -> numpy.float64 + # ("pandera_examples.validating_and_testing_ml_pipelines.pipeline", {"data_random_state": 42, "model_random_state": 99}), + ], + "integrations-modin_examples": [ + ("modin_examples.knn_classifier.pipeline", {}), + ], + "integrations-papermilltasks": [ + ("papermilltasks.simple.nb_to_python_wf", {"f": 3.1415926535}), + ], + "integrations-greatexpectations": [ + ("greatexpectations.task_example.simple_wf", {}), + ("greatexpectations.task_example.file_wf", {}), + ("greatexpectations.task_example.schema_wf", {}), + ("greatexpectations.task_example.runtime_wf", {}), ], } -def run_launch_plan(remote, version, workflow_name, inputs): +def execute_workflow(remote, version, workflow_name, inputs): print(f"Fetching workflow={workflow_name} and version={version}") - lp = remote.fetch_workflow(name=workflow_name, version=version) - return remote.execute(lp, inputs=inputs, wait=False) + wf = remote.fetch_workflow(name=workflow_name, version=version) + return remote.execute(wf, inputs=inputs, wait=False) + +def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool: + for executions in executions_by_wfgroup.values(): + if not all([execution.is_done for execution in executions]): + return False + return True +def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]): + try: + for executions in executions_by_wfgroup.values(): + for execution in executions: + print(f"About to sync execution_id={execution.id.name}") + remote.sync(execution) + except: + print("GOT TO THE EXCEPT") + print("COUNT THIS!") -def schedule_workflow_group( + +def report_executions(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]): + for executions in executions_by_wfgroup.values(): + for execution in executions: + print(execution) + +def schedule_workflow_groups( tag: str, - workflow_group: str, + workflow_groups: List[str], remote: FlyteRemote, terminate_workflow_on_failure: bool, -) -> bool: +) -> Dict[str, bool]: """ - Schedule all workflows executions and return True if all executions succeed, otherwise + Schedule workflows executions for all workflow gropus and return True if all executions succeed, otherwise return False. """ - workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(workflow_group, []) - - launch_plans = [ - run_launch_plan(remote, tag, workflow[0], workflow[1]) for workflow in workflows - ] + executions_by_wfgroup = {} + # Schedule executions for each workflow group, + for wf_group in workflow_groups: + workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, []) + executions_by_wfgroup[wf_group] = [ + execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows + ] - # Wait for all launch plans to finish + # Wait for all executions to finish attempt = 0 while attempt == 0 or ( - not all([lp.is_done for lp in launch_plans]) and attempt < MAX_ATTEMPTS + not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS ): attempt += 1 print( f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s" ) time.sleep(WAIT_TIME) - # Need to sync to refresh status of executions - for lp in launch_plans: - print(f"About to sync execution_id={lp.id.name}") - remote.sync(lp) - - # Report result of each launch plan - for lp in launch_plans: - print(lp) - - # Collect all failing launch plans - non_succeeded_lps = [ - lp - for lp in launch_plans - if lp.closure.phase != WorkflowExecutionPhase.SUCCEEDED - ] - - if len(non_succeeded_lps) == 0: - print("All executions succeeded.") - return True - - print("Failed executions:") - # Report failing cases - for lp in non_succeeded_lps: - print(f" workflow={lp.spec.launch_plan.name}, execution_id={lp.id.name}") - if terminate_workflow_on_failure: - remote.terminate(lp, "aborting execution scheduled in functional test") - return False + sync_executions(remote, executions_by_wfgroup) + + + report_executions(executions_by_wfgroup) + + results = {} + for wf_group, executions in executions_by_wfgroup.items(): + non_succeeded_executions = [] + for execution in executions: + if execution.closure.phase != WorkflowExecutionPhase.SUCCEEDED: + non_succeeded_executions.append(execution) + # Report failing cases + if len(non_succeeded_executions) != 0: + print(f"Failed executions for {wf_group}:") + for execution in non_succeeded_executions: + print(f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}") + if terminate_workflow_on_failure: + remote.terminate(execution, "aborting execution scheduled in functional test") + # A workflow group succeeds iff all of its executions succeed + results[wf_group] = len(non_succeeded_executions) == 0 + return results def valid(workflow_group): @@ -139,14 +193,16 @@ def run( # For a given release tag and priority, this function filters the workflow groups from the flytesnacks # manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ]. manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \ - f"{flytesnacks_release_tag}/cookbook/flyte_tests_manifest.json" + f"{flytesnacks_release_tag}/flyte_tests_manifest.json" r = requests.get(manifest_url) parsed_manifest = r.json() + workflow_groups = [] + workflow_groups = ["lite"] if "lite" in priorities else [ + group["name"] for group in parsed_manifest if group["priority"] in priorities + ] - workflow_groups = [ - group["name"] for group in parsed_manifest if group["priority"] in priorities - ] results = [] + valid_workgroups = [] for workflow_group in workflow_groups: if not valid(workflow_group): results.append( @@ -157,19 +213,17 @@ def run( } ) continue + valid_workgroups.append(workflow_group) - try: - workflows_succeeded = schedule_workflow_group( - flytesnacks_release_tag, - workflow_group, - remote, - terminate_workflow_on_failure, - ) - except Exception: - print(traceback.format_exc()) - workflows_succeeded = False + results_by_wfgroup = schedule_workflow_groups( + flytesnacks_release_tag, + valid_workgroups, + remote, + terminate_workflow_on_failure + ) - if workflows_succeeded: + for workflow_group, succeeded in results_by_wfgroup.items(): + if succeeded: background_color = "green" status = "passing" else: diff --git a/boilerplate/flyte/golang_dockerfile/Dockerfile.GoTemplate b/boilerplate/flyte/golang_dockerfile/Dockerfile.GoTemplate index 20f66861..a51f8e1b 100644 --- a/boilerplate/flyte/golang_dockerfile/Dockerfile.GoTemplate +++ b/boilerplate/flyte/golang_dockerfile/Dockerfile.GoTemplate @@ -3,7 +3,7 @@ # # TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst -FROM golang:1.18-alpine3.15 as builder +FROM golang:1.17.1-alpine3.14 as builder RUN apk add git openssh-client make curl # COPY only the go mod files for efficient caching diff --git a/boilerplate/flyte/golang_support_tools/go.mod b/boilerplate/flyte/golang_support_tools/go.mod index a1f1bd49..dbf94f41 100644 --- a/boilerplate/flyte/golang_support_tools/go.mod +++ b/boilerplate/flyte/golang_support_tools/go.mod @@ -1,6 +1,6 @@ module github.com/flyteorg/boilerplate -go 1.18 +go 1.17 require ( github.com/EngHabu/mockery v0.0.0-20220405200825-3f76291311cf diff --git a/boilerplate/flyte/welcome_bot/Readme.rst b/boilerplate/flyte/welcome_bot/Readme.rst deleted file mode 100644 index ea187811..00000000 --- a/boilerplate/flyte/welcome_bot/Readme.rst +++ /dev/null @@ -1,8 +0,0 @@ -Config File -- Welcome Bot -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Provides a ``config.yml`` file. - -**To Enable:** - -Add ``flyte/config.yml`` to your ``boilerplate/update.cfg`` file. \ No newline at end of file diff --git a/boilerplate/flyte/welcome_bot/config.yml b/boilerplate/flyte/welcome_bot/config.yml deleted file mode 100644 index 73da252e..00000000 --- a/boilerplate/flyte/welcome_bot/config.yml +++ /dev/null @@ -1,16 +0,0 @@ -# Comment to be posted on PRs from first-time contributors in your repository -newPRWelcomeComment: | - Thank you for opening this pull request! 🙌 - - These tips will help get your PR across the finish line: - - - Most of the repos have a PR template; if not, fill it out to the best of your knowledge. - - Sign off your commits (Reference: [DCO Guide](https://github.com/src-d/guide/blob/master/developer-community/fix-DCO.md)). - -# Comment to be posted to on pull requests merged by a first time user -firstPRMergeComment: > - Congrats on merging your first pull request! 🎉 - -# Comment to be posted on first-time issues -newIssueWelcomeComment: > - Thank you for opening your first issue here! 🛠 diff --git a/boilerplate/flyte/welcome_bot/update.sh b/boilerplate/flyte/welcome_bot/update.sh deleted file mode 100755 index 2db64ac3..00000000 --- a/boilerplate/flyte/welcome_bot/update.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash - -# WARNING: THIS FILE IS MANAGED IN THE 'BOILERPLATE' REPO AND COPIED TO OTHER REPOSITORIES. -# ONLY EDIT THIS FILE FROM WITHIN THE 'FLYTEORG/BOILERPLATE' REPOSITORY: -# -# TO OPT OUT OF UPDATES, SEE https://github.com/flyteorg/boilerplate/blob/master/Readme.rst - -set -e - -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" - -# Clone the config.yml file -echo " - copying ${DIR}/config.yml to the root directory." -cp "${DIR}"/config.yml "${DIR}"/../../../.github/config.yml -