diff --git a/.github/workflows/single-binary.yml b/.github/workflows/single-binary.yml index 8465c17b51..94743ea35f 100644 --- a/.github/workflows/single-binary.yml +++ b/.github/workflows/single-binary.yml @@ -190,18 +190,16 @@ jobs: - name: Register specific tests run: | for f in \ - basics/basics/basic_workflow.py \ - basics/basics/deck.py \ basics/basics/hello_world.py \ - basics/basics/lp.py \ + basics/basics/workflow.py \ basics/basics/named_outputs.py \ - control_flow/control_flow/chain_entities.py \ - control_flow/control_flow/dynamics.py \ - control_flow/control_flow/map_task.py \ - control_flow/control_flow/subworkflows.py \ - type_system/type_system/custom_objects.py \ - type_system/type_system/schema.py \ - type_system/type_system/typed_schema.py ; + advanced_composition/advanced_composition/chain_entities.py \ + advanced_composition/advanced_composition/dynamics.py \ + advanced_composition/advanced_composition/map_task.py \ + advanced_composition/advanced_composition/subworkflows.py \ + data_types_and_io/data_types_and_io/custom_objects.py \ + data_types_and_io/data_types_and_io/schema.py \ + data_types_and_io/data_types_and_io/typed_schema.py ; do pyflyte --config ./boilerplate/flyte/end2end/functional-test-config.yaml \ register \ diff --git a/boilerplate/flyte/end2end/run-tests.py b/boilerplate/flyte/end2end/run-tests.py index 6427681774..eb2b28d8d3 100644 --- a/boilerplate/flyte/end2end/run-tests.py +++ b/boilerplate/flyte/end2end/run-tests.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 -import click import datetime import json import sys import time import traceback +from typing import Dict, List, Mapping, Tuple + +import click import requests -from typing import List, Mapping, Tuple, Dict -from flytekit.remote import FlyteRemote +from flytekit.configuration import Config from flytekit.models.core.execution import WorkflowExecutionPhase -from flytekit.configuration import Config, ImageConfig, SerializationSettings +from flytekit.remote import FlyteRemote from flytekit.remote.executions import FlyteWorkflowExecution - WAIT_TIME = 10 MAX_ATTEMPTS = 200 @@ -22,15 +22,14 @@ # starting with "core". FLYTESNACKS_WORKFLOW_GROUPS: Mapping[str, List[Tuple[str, dict]]] = { "lite": [ - ("basics.hello_world.my_wf", {}), - ("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), + ("basics.hello_world.hello_world_wf", {}), ], "core": [ - ("basics.deck.wf", {}), + # ("development_lifecycle.decks.image_renderer_wf", {}), # The chain_workflows example in flytesnacks expects to be running in a sandbox. - ("control_flow.chain_entities.chain_workflows_wf", {}), - ("control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), - ("control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), + ("advanced_composition.chain_entities.chain_workflows_wf", {}), + ("advanced_composition.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), + ("advanced_composition.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), # Workflows that use nested executions cannot be launched via flyteremote. # This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482. # ("control_flow.run_conditions.multiplier", {"my_input": 0.5}), @@ -41,24 +40,22 @@ # ("control_flow.run_conditions.nested_conditions", {"my_input": 0.4}), # ("control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}), # ("control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}), - ("control_flow.subworkflows.parent_wf", {"a": 3}), - ("control_flow.subworkflows.nested_parent_wf", {"a": 3}), - ("basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}), + ("advanced_composition.subworkflows.parent_workflow", {"my_input1": "hello"}), + ("advanced_composition.subworkflows.nested_parent_wf", {"a": 3}), + ("basics.workflow.simple_wf", {"x": [1, 2, 3], "y": [1, 2, 3]}), # TODO: enable new files and folders workflows # ("basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}), # ("basics.folders.download_and_rotate", {}), - ("basics.hello_world.my_wf", {}), - ("basics.lp.my_wf", {"val": 4}), - ("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), - ("basics.named_outputs.my_wf", {}), + ("basics.hello_world.hello_world_wf", {}), + ("basics.named_outputs.simple_wf_with_named_outputs", {}), # # Getting a 403 for the wikipedia image # # ("basics.reference_task.wf", {}), - ("type_system.custom_objects.wf", {"x": 10, "y": 20}), + ("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}), # Enums are not supported in flyteremote # ("type_system.enums.enum_wf", {"c": "red"}), - ("type_system.schema.df_wf", {"a": 42}), - ("type_system.typed_schema.wf", {}), - #("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), + ("data_types_and_io.schema.df_wf", {"a": 42}), + ("data_types_and_io.typed_schema.wf", {}), + # ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), ], "integrations-k8s-spark": [ ("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}), @@ -97,12 +94,14 @@ def execute_workflow(remote, version, workflow_name, inputs): wf = remote.fetch_workflow(name=workflow_name, version=version) return remote.execute(wf, inputs=inputs, wait=False) + def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool: for executions in executions_by_wfgroup.values(): if not all([execution.is_done for execution in executions]): return False return True + def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]): try: for executions in executions_by_wfgroup.values(): @@ -120,6 +119,7 @@ def report_executions(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecuti for execution in executions: print(execution) + def schedule_workflow_groups( tag: str, workflow_groups: List[str], @@ -140,17 +140,12 @@ def schedule_workflow_groups( # Wait for all executions to finish attempt = 0 - while attempt == 0 or ( - not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS - ): + while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS): attempt += 1 - print( - f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s" - ) + print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s") time.sleep(WAIT_TIME) sync_executions(remote, executions_by_wfgroup) - report_executions(executions_by_wfgroup) results = {} @@ -193,14 +188,17 @@ def run( # For a given release tag and priority, this function filters the workflow groups from the flytesnacks # manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ]. - manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \ - f"{flytesnacks_release_tag}/flyte_tests_manifest.json" + manifest_url = ( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json" + ) r = requests.get(manifest_url) parsed_manifest = r.json() workflow_groups = [] - workflow_groups = ["lite"] if "lite" in priorities else [ - group["name"] for group in parsed_manifest if group["priority"] in priorities - ] + workflow_groups = ( + ["lite"] + if "lite" in priorities + else [group["name"] for group in parsed_manifest if group["priority"] in priorities] + ) results = [] valid_workgroups = [] @@ -217,10 +215,7 @@ def run( valid_workgroups.append(workflow_group) results_by_wfgroup = schedule_workflow_groups( - flytesnacks_release_tag, - valid_workgroups, - remote, - terminate_workflow_on_failure + flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure ) for workflow_group, succeeded in results_by_wfgroup.items(): @@ -274,9 +269,7 @@ def cli( terminate_workflow_on_failure, ): print(f"return_non_zero_on_failure={return_non_zero_on_failure}") - results = run( - flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure - ) + results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure) # Write a json object in its own line describing the result of this run to stdout print(f"Result of run:\n{json.dumps(results)}") diff --git a/rsts/concepts/schedules.rst b/rsts/concepts/schedules.rst index 6341e1ae19..34644b217b 100644 --- a/rsts/concepts/schedules.rst +++ b/rsts/concepts/schedules.rst @@ -5,7 +5,7 @@ Schedules .. tags:: Basic, Glossary -Workflows can be run automatically using :ref:`schedules ` associated with launch plans. +Workflows can be run automatically using :ref:`schedules ` associated with launch plans. Only one launch plan version for a given {Project, Domain, Name} combination can be active, which means only one schedule can be active for a launch plan. This is because a single active schedule can exist across all versions of the launch plan. diff --git a/rsts/concepts/tasks.rst b/rsts/concepts/tasks.rst index 42ca8ca77f..1ca43d5ea8 100644 --- a/rsts/concepts/tasks.rst +++ b/rsts/concepts/tasks.rst @@ -115,4 +115,4 @@ Caching/Memoization ^^^^^^^^^^^^^^^^^^^ Flyte supports memoization of task outputs to ensure that identical invocations of a task are not executed repeatedly, thereby saving compute resources and execution time. For example, if you wish to run the same piece of code multiple times, you can re-use the output instead of re-computing it. -For more information on memoization, refer to the :std:doc:`Caching Example `. +For more information on memoization, refer to the :std:doc:`Caching Example `. diff --git a/rsts/deployment/configuration/notifications.rst b/rsts/deployment/configuration/notifications.rst index 11c60904af..e01dd1a0ca 100644 --- a/rsts/deployment/configuration/notifications.rst +++ b/rsts/deployment/configuration/notifications.rst @@ -39,7 +39,7 @@ For example ) -See detailed usage examples in the :std:doc:`User Guide ` +See detailed usage examples in the :std:doc:`User Guide ` Notifications can be combined with schedules to automatically alert you when a scheduled job succeeds or fails.