Skip to content

Commit

Permalink
Update boilerplate version (flyteorg#42)
Browse files Browse the repository at this point in the history
Signed-off-by: Flyte-Bot <[email protected]>
Co-authored-by: eapolinario <[email protected]>
  • Loading branch information
flyte-bot and eapolinario authored Jul 15, 2023
1 parent 10f7420 commit 5a38c21
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 150 deletions.
3 changes: 1 addition & 2 deletions boilerplate/flyte/end2end/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

.PHONY: end2end_execute
end2end_execute:
# TODO: These arguments could come from environment variables
./boilerplate/flyte/end2end/end2end.sh ./boilerplate/flyte/end2end/functional-test.config --return_non_zero_on_failure
./boilerplate/flyte/end2end/end2end.sh ./boilerplate/flyte/end2end/functional-test-config.yaml --return_non_zero_on_failure

.PHONY: k8s_integration_execute
k8s_integration_execute:
Expand Down
5 changes: 3 additions & 2 deletions boilerplate/flyte/end2end/end2end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ set -e
CONFIG_FILE=$1; shift
EXTRA_FLAGS=( "$@" )

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
# By default only execute `core` tests
PRIORITIES="${PRIORITIES:-P0}"

LATEST_VERSION=$(curl --silent "https://api.github.com/repos/flyteorg/flytesnacks/releases/latest" | jq -r .tag_name)

python ./boilerplate/flyte/end2end/run-tests.py $LATEST_VERSION P0,P1 $CONFIG_FILE ${EXTRA_FLAGS[@]}
python ./boilerplate/flyte/end2end/run-tests.py $LATEST_VERSION $PRIORITIES $CONFIG_FILE ${EXTRA_FLAGS[@]}
5 changes: 5 additions & 0 deletions boilerplate/flyte/end2end/functional-test-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
admin:
# For GRPC endpoints you might want to use dns:///flyte.myexample.com
endpoint: localhost:30081
authType: Pkce
insecure: true
3 changes: 0 additions & 3 deletions boilerplate/flyte/end2end/functional-test.config

This file was deleted.

222 changes: 138 additions & 84 deletions boilerplate/flyte/end2end/run-tests.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import click
import datetime
import json
import sys
import time
Expand All @@ -10,110 +11,163 @@
from flytekit.remote import FlyteRemote
from flytekit.models.core.execution import WorkflowExecutionPhase
from flytekit.configuration import Config, ImageConfig, SerializationSettings
from flytekit.remote.executions import FlyteWorkflowExecution


WAIT_TIME = 10
MAX_ATTEMPTS = 60
MAX_ATTEMPTS = 200

# This dictionary maps the names found in the flytesnacks manifest to a list of workflow names and
# inputs. This is so we can progressively cover all priorities in the original flytesnacks manifest,
# starting with "core".
FLYTESNACKS_WORKFLOW_GROUPS: Mapping[str, List[Tuple[str, dict]]] = {
"lite": [
("basics.hello_world.my_wf", {}),
("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}),
],
"core": [
("core.control_flow.chain_tasks.chain_tasks_wf", {}),
("core.control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}),
("core.control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}),
("basics.deck.wf", {}),
# The chain_workflows example in flytesnacks expects to be running in a sandbox.
# ("control_flow.chain_entities.chain_workflows_wf", {}),
("control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}),
("control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}),
# Workflows that use nested executions cannot be launched via flyteremote.
# This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482.
# ("core.control_flow.run_conditions.multiplier", {"my_input": 0.5}),
# ("core.control_flow.run_conditions.multiplier_2", {"my_input": 10}),
# ("core.control_flow.run_conditions.multiplier_3", {"my_input": 5}),
# ("core.control_flow.run_conditions.basic_boolean_wf", {"seed": 5}),
# ("core.control_flow.run_conditions.bool_input_wf", {"b": True}),
# ("core.control_flow.run_conditions.nested_conditions", {"my_input": 0.4}),
# ("core.control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}),
# ("core.control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}),
("core.control_flow.subworkflows.parent_wf", {"a": 3}),
("core.control_flow.subworkflows.nested_parent_wf", {"a": 3}),
("core.flyte_basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}),
# Getting a 403 for the wikipedia image
# ("core.flyte_basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}),
("core.flyte_basics.folders.download_and_rotate", {}),
("core.flyte_basics.hello_world.my_wf", {}),
("core.flyte_basics.lp.my_wf", {"val": 4}),
("core.flyte_basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}),
("core.flyte_basics.named_outputs.my_wf", {}),
# ("control_flow.run_conditions.multiplier", {"my_input": 0.5}),
# ("control_flow.run_conditions.multiplier_2", {"my_input": 10}),
# ("control_flow.run_conditions.multiplier_3", {"my_input": 5}),
# ("control_flow.run_conditions.basic_boolean_wf", {"seed": 5}),
# ("control_flow.run_conditions.bool_input_wf", {"b": True}),
# ("control_flow.run_conditions.nested_conditions", {"my_input": 0.4}),
# ("control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}),
# ("control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}),
("control_flow.subworkflows.parent_wf", {"a": 3}),
("control_flow.subworkflows.nested_parent_wf", {"a": 3}),
("basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}),
# TODO: enable new files and folders workflows
# ("basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}),
# ("basics.folders.download_and_rotate", {}),
("basics.hello_world.my_wf", {}),
("basics.lp.my_wf", {"val": 4}),
("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}),
("basics.named_outputs.my_wf", {}),
# # Getting a 403 for the wikipedia image
# # ("core.flyte_basics.reference_task.wf", {}),
("core.type_system.custom_objects.wf", {"x": 10, "y": 20}),
# # ("basics.reference_task.wf", {}),
("type_system.custom_objects.wf", {"x": 10, "y": 20}),
# Enums are not supported in flyteremote
# ("core.type_system.enums.enum_wf", {"c": "red"}),
("core.type_system.schema.df_wf", {"a": 42}),
("core.type_system.typed_schema.wf", {}),
("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
# ("type_system.enums.enum_wf", {"c": "red"}),
("type_system.schema.df_wf", {"a": 42}),
("type_system.typed_schema.wf", {}),
#("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
],
"integrations-k8s-spark": [
("k8s_spark.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}),
],
"integrations-kfpytorch": [
("kfpytorch.pytorch_mnist.pytorch_training_wf", {}),
],
"integrations-kftensorflow": [
("kftensorflow.tf_mnist.mnist_tensorflow_workflow", {}),
],
# "integrations-pod": [
# ("pod.pod.pod_workflow", {}),
# ],
"integrations-pandera_examples": [
("pandera_examples.basic_schema_example.process_data", {}),
# TODO: investigate type mismatch float -> numpy.float64
# ("pandera_examples.validating_and_testing_ml_pipelines.pipeline", {"data_random_state": 42, "model_random_state": 99}),
],
"integrations-modin_examples": [
("modin_examples.knn_classifier.pipeline", {}),
],
"integrations-papermilltasks": [
("papermilltasks.simple.nb_to_python_wf", {"f": 3.1415926535}),
],
"integrations-greatexpectations": [
("greatexpectations.task_example.simple_wf", {}),
("greatexpectations.task_example.file_wf", {}),
("greatexpectations.task_example.schema_wf", {}),
("greatexpectations.task_example.runtime_wf", {}),
],
}


def run_launch_plan(remote, version, workflow_name, inputs):
def execute_workflow(remote, version, workflow_name, inputs):
print(f"Fetching workflow={workflow_name} and version={version}")
lp = remote.fetch_workflow(name=workflow_name, version=version)
return remote.execute(lp, inputs=inputs, wait=False)
wf = remote.fetch_workflow(name=workflow_name, version=version)
return remote.execute(wf, inputs=inputs, wait=False)

def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool:
for executions in executions_by_wfgroup.values():
if not all([execution.is_done for execution in executions]):
return False
return True

def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]):
try:
for executions in executions_by_wfgroup.values():
for execution in executions:
print(f"About to sync execution_id={execution.id.name}")
remote.sync(execution)
except:
print("GOT TO THE EXCEPT")
print("COUNT THIS!")

def schedule_workflow_group(

def report_executions(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]):
for executions in executions_by_wfgroup.values():
for execution in executions:
print(execution)

def schedule_workflow_groups(
tag: str,
workflow_group: str,
workflow_groups: List[str],
remote: FlyteRemote,
terminate_workflow_on_failure: bool,
) -> bool:
) -> Dict[str, bool]:
"""
Schedule all workflows executions and return True if all executions succeed, otherwise
Schedule workflows executions for all workflow gropus and return True if all executions succeed, otherwise
return False.
"""
workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(workflow_group, [])

launch_plans = [
run_launch_plan(remote, tag, workflow[0], workflow[1]) for workflow in workflows
]
executions_by_wfgroup = {}
# Schedule executions for each workflow group,
for wf_group in workflow_groups:
workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, [])
executions_by_wfgroup[wf_group] = [
execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows
]

# Wait for all launch plans to finish
# Wait for all executions to finish
attempt = 0
while attempt == 0 or (
not all([lp.is_complete for lp in launch_plans]) and attempt < MAX_ATTEMPTS
not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS
):
attempt += 1
print(
f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s"
)
time.sleep(WAIT_TIME)
# Need to sync to refresh status of executions
for lp in launch_plans:
print(f"About to sync execution_id={lp.id.name}")
remote.sync(lp)

# Report result of each launch plan
for lp in launch_plans:
print(lp)

# Collect all failing launch plans
non_succeeded_lps = [
lp
for lp in launch_plans
if lp.closure.phase != WorkflowExecutionPhase.SUCCEEDED
]

if len(non_succeeded_lps) == 0:
print("All executions succeeded.")
return True

print("Failed executions:")
# Report failing cases
for lp in non_succeeded_lps:
print(f" workflow={lp.spec.launch_plan.name}, execution_id={lp.id.name}")
if terminate_workflow_on_failure:
remote.terminate(lp, "aborting execution scheduled in functional test")
return False
sync_executions(remote, executions_by_wfgroup)


report_executions(executions_by_wfgroup)

results = {}
for wf_group, executions in executions_by_wfgroup.items():
non_succeeded_executions = []
for execution in executions:
if execution.closure.phase != WorkflowExecutionPhase.SUCCEEDED:
non_succeeded_executions.append(execution)
# Report failing cases
if len(non_succeeded_executions) != 0:
print(f"Failed executions for {wf_group}:")
for execution in non_succeeded_executions:
print(f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}")
if terminate_workflow_on_failure:
remote.terminate(execution, "aborting execution scheduled in functional test")
# A workflow group succeeds iff all of its executions succeed
results[wf_group] = len(non_succeeded_executions) == 0
return results


def valid(workflow_group):
Expand All @@ -139,14 +193,16 @@ def run(
# For a given release tag and priority, this function filters the workflow groups from the flytesnacks
# manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ].
manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \
f"{flytesnacks_release_tag}/cookbook/flyte_tests_manifest.json"
f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
r = requests.get(manifest_url)
parsed_manifest = r.json()
workflow_groups = []
workflow_groups = ["lite"] if "lite" in priorities else [
group["name"] for group in parsed_manifest if group["priority"] in priorities
]

workflow_groups = [
group["name"] for group in parsed_manifest if group["priority"] in priorities
]
results = []
valid_workgroups = []
for workflow_group in workflow_groups:
if not valid(workflow_group):
results.append(
Expand All @@ -157,19 +213,17 @@ def run(
}
)
continue
valid_workgroups.append(workflow_group)

try:
workflows_succeeded = schedule_workflow_group(
flytesnacks_release_tag,
workflow_group,
remote,
terminate_workflow_on_failure,
)
except Exception:
print(traceback.format_exc())
workflows_succeeded = False
results_by_wfgroup = schedule_workflow_groups(
flytesnacks_release_tag,
valid_workgroups,
remote,
terminate_workflow_on_failure
)

if workflows_succeeded:
for workflow_group, succeeded in results_by_wfgroup.items():
if succeeded:
background_color = "green"
status = "passing"
else:
Expand Down
16 changes: 7 additions & 9 deletions boilerplate/flyte/golang_support_tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/flyteorg/boilerplate
go 1.17

require (
github.com/EngHabu/mockery v0.0.0-20220405200825-3f76291311cf
github.com/alvaroloes/enumer v1.1.2
github.com/flyteorg/flytestdlib v0.4.16
github.com/golangci/golangci-lint v1.38.0
github.com/pseudomuto/protoc-gen-doc v1.4.1
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5
)

require (
Expand Down Expand Up @@ -152,7 +152,7 @@ require (
github.com/spf13/viper v1.7.1 // indirect
github.com/ssgreg/nlreturn/v2 v2.1.0 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/stretchr/testify v1.7.1 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tdakkota/asciicheck v0.0.0-20200416200610-e657995f937b // indirect
github.com/tetafro/godot v1.4.4 // indirect
Expand All @@ -163,15 +163,15 @@ require (
github.com/ultraware/whitespace v0.0.4 // indirect
github.com/uudashr/gocognit v1.0.1 // indirect
go.opencensus.io v0.22.6 // indirect
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect
golang.org/x/oauth2 v0.0.0-20210126194326-f9ce19ea3013 // indirect
golang.org/x/sys v0.0.0-20210423082822-04245dca01da // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
golang.org/x/tools v0.1.0 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/api v0.38.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand All @@ -191,6 +191,4 @@ require (
mvdan.cc/unparam v0.0.0-20210104141923-aac4ce9116a7 // indirect
)

replace github.com/vektra/mockery => github.com/enghabu/mockery v0.0.0-20191009061720-9d0c8670c2f0

replace github.com/pseudomuto/protoc-gen-doc => github.com/flyteorg/protoc-gen-doc v1.4.2
Loading

0 comments on commit 5a38c21

Please sign in to comment.