Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into prepare-monorepo
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Sep 7, 2023
2 parents 05f7a1d + cda745c commit 8e1f887
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 54 deletions.
18 changes: 8 additions & 10 deletions .github/workflows/single-binary.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,16 @@ jobs:
- name: Register specific tests
run: |
for f in \
basics/basics/basic_workflow.py \
basics/basics/deck.py \
basics/basics/hello_world.py \
basics/basics/lp.py \
basics/basics/workflow.py \
basics/basics/named_outputs.py \
control_flow/control_flow/chain_entities.py \
control_flow/control_flow/dynamics.py \
control_flow/control_flow/map_task.py \
control_flow/control_flow/subworkflows.py \
type_system/type_system/custom_objects.py \
type_system/type_system/schema.py \
type_system/type_system/typed_schema.py ;
advanced_composition/advanced_composition/chain_entities.py \
advanced_composition/advanced_composition/dynamics.py \
advanced_composition/advanced_composition/map_task.py \
advanced_composition/advanced_composition/subworkflows.py \
data_types_and_io/data_types_and_io/custom_objects.py \
data_types_and_io/data_types_and_io/schema.py \
data_types_and_io/data_types_and_io/typed_schema.py ;
do
pyflyte --config ./boilerplate/flyte/end2end/functional-test-config.yaml \
register \
Expand Down
75 changes: 34 additions & 41 deletions boilerplate/flyte/end2end/run-tests.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#!/usr/bin/env python3

import click
import datetime
import json
import sys
import time
import traceback
from typing import Dict, List, Mapping, Tuple

import click
import requests
from typing import List, Mapping, Tuple, Dict
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.core.execution import WorkflowExecutionPhase
from flytekit.configuration import Config, ImageConfig, SerializationSettings
from flytekit.remote import FlyteRemote
from flytekit.remote.executions import FlyteWorkflowExecution


WAIT_TIME = 10
MAX_ATTEMPTS = 200

Expand All @@ -22,15 +22,14 @@
# starting with "core".
FLYTESNACKS_WORKFLOW_GROUPS: Mapping[str, List[Tuple[str, dict]]] = {
"lite": [
("basics.hello_world.my_wf", {}),
("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}),
("basics.hello_world.hello_world_wf", {}),
],
"core": [
("basics.deck.wf", {}),
# ("development_lifecycle.decks.image_renderer_wf", {}),
# The chain_workflows example in flytesnacks expects to be running in a sandbox.
("control_flow.chain_entities.chain_workflows_wf", {}),
("control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}),
("control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}),
("advanced_composition.chain_entities.chain_workflows_wf", {}),
("advanced_composition.dynamics.wf", {"s1": "Pear", "s2": "Earth"}),
("advanced_composition.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}),
# Workflows that use nested executions cannot be launched via flyteremote.
# This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482.
# ("control_flow.run_conditions.multiplier", {"my_input": 0.5}),
Expand All @@ -41,24 +40,22 @@
# ("control_flow.run_conditions.nested_conditions", {"my_input": 0.4}),
# ("control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}),
# ("control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}),
("control_flow.subworkflows.parent_wf", {"a": 3}),
("control_flow.subworkflows.nested_parent_wf", {"a": 3}),
("basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}),
("advanced_composition.subworkflows.parent_workflow", {"my_input1": "hello"}),
("advanced_composition.subworkflows.nested_parent_wf", {"a": 3}),
("basics.workflow.simple_wf", {"x": [1, 2, 3], "y": [1, 2, 3]}),
# TODO: enable new files and folders workflows
# ("basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}),
# ("basics.folders.download_and_rotate", {}),
("basics.hello_world.my_wf", {}),
("basics.lp.my_wf", {"val": 4}),
("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}),
("basics.named_outputs.my_wf", {}),
("basics.hello_world.hello_world_wf", {}),
("basics.named_outputs.simple_wf_with_named_outputs", {}),
# # Getting a 403 for the wikipedia image
# # ("basics.reference_task.wf", {}),
("type_system.custom_objects.wf", {"x": 10, "y": 20}),
("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}),
# Enums are not supported in flyteremote
# ("type_system.enums.enum_wf", {"c": "red"}),
("type_system.schema.df_wf", {"a": 42}),
("type_system.typed_schema.wf", {}),
#("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
("data_types_and_io.schema.df_wf", {"a": 42}),
("data_types_and_io.typed_schema.wf", {}),
# ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
],
"integrations-k8s-spark": [
("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}),
Expand Down Expand Up @@ -97,12 +94,14 @@ def execute_workflow(remote, version, workflow_name, inputs):
wf = remote.fetch_workflow(name=workflow_name, version=version)
return remote.execute(wf, inputs=inputs, wait=False)


def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool:
for executions in executions_by_wfgroup.values():
if not all([execution.is_done for execution in executions]):
return False
return True


def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]):
try:
for executions in executions_by_wfgroup.values():
Expand All @@ -120,6 +119,7 @@ def report_executions(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecuti
for execution in executions:
print(execution)


def schedule_workflow_groups(
tag: str,
workflow_groups: List[str],
Expand All @@ -140,17 +140,12 @@ def schedule_workflow_groups(

# Wait for all executions to finish
attempt = 0
while attempt == 0 or (
not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS
):
while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS):
attempt += 1
print(
f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s"
)
print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s")
time.sleep(WAIT_TIME)
sync_executions(remote, executions_by_wfgroup)


report_executions(executions_by_wfgroup)

results = {}
Expand Down Expand Up @@ -193,14 +188,17 @@ def run(

# For a given release tag and priority, this function filters the workflow groups from the flytesnacks
# manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ].
manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \
f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
manifest_url = (
"https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
)
r = requests.get(manifest_url)
parsed_manifest = r.json()
workflow_groups = []
workflow_groups = ["lite"] if "lite" in priorities else [
group["name"] for group in parsed_manifest if group["priority"] in priorities
]
workflow_groups = (
["lite"]
if "lite" in priorities
else [group["name"] for group in parsed_manifest if group["priority"] in priorities]
)

results = []
valid_workgroups = []
Expand All @@ -217,10 +215,7 @@ def run(
valid_workgroups.append(workflow_group)

results_by_wfgroup = schedule_workflow_groups(
flytesnacks_release_tag,
valid_workgroups,
remote,
terminate_workflow_on_failure
flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure
)

for workflow_group, succeeded in results_by_wfgroup.items():
Expand Down Expand Up @@ -274,9 +269,7 @@ def cli(
terminate_workflow_on_failure,
):
print(f"return_non_zero_on_failure={return_non_zero_on_failure}")
results = run(
flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure
)
results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure)

# Write a json object in its own line describing the result of this run to stdout
print(f"Result of run:\n{json.dumps(results)}")
Expand Down
2 changes: 1 addition & 1 deletion rsts/concepts/schedules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Schedules

.. tags:: Basic, Glossary

Workflows can be run automatically using :ref:`schedules <cookbook:launchplan_schedules>` associated with launch plans.
Workflows can be run automatically using :ref:`schedules <cookbook:scheduling_launch_plan>` associated with launch plans.

Only one launch plan version for a given {Project, Domain, Name} combination can be active, which means only one schedule can be active for a launch plan. This is because a single active schedule can exist across all versions of the launch plan.

Expand Down
2 changes: 1 addition & 1 deletion rsts/concepts/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ Caching/Memoization
^^^^^^^^^^^^^^^^^^^

Flyte supports memoization of task outputs to ensure that identical invocations of a task are not executed repeatedly, thereby saving compute resources and execution time. For example, if you wish to run the same piece of code multiple times, you can re-use the output instead of re-computing it.
For more information on memoization, refer to the :std:doc:`Caching Example <cookbook:auto_examples/basics/task_cache>`.
For more information on memoization, refer to the :std:doc:`Caching Example <cookbook:auto_examples/development_lifecycle/task_cache>`.
2 changes: 1 addition & 1 deletion rsts/deployment/configuration/notifications.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ For example
)
See detailed usage examples in the :std:doc:`User Guide <cookbook:auto_examples/deployment/lp_notifications>`
See detailed usage examples in the :std:doc:`User Guide <cookbook:auto_examples/productionizing/lp_notifications>`

Notifications can be combined with schedules to automatically alert you when a scheduled job succeeds or fails.

Expand Down

0 comments on commit 8e1f887

Please sign in to comment.