From 4f673e3de5087c03cc32800c9af7fbde5d3908b6 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 18 Oct 2021 21:46:18 +0800 Subject: [PATCH 01/23] Create default lauchplan Signed-off-by: Kevin Su --- flytekit/clients/raw.py | 54 ++++++++++--------- flytekit/remote/remote.py | 22 +++++++- .../integration/remote/test_remote.py | 9 ++++ 3 files changed, 58 insertions(+), 27 deletions(-) diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index c3c0833295..df288f27cf 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -125,33 +125,35 @@ def handler(*args, **kwargs): """ max_retries = 3 max_wait_time = 1000 - try: - for i in range(max_retries): - try: - return fn(*args, **kwargs) - except _RpcError as e: - if e.code() == _GrpcStatusCode.UNAUTHENTICATED: - # Always retry auth errors. - if i == (max_retries - 1): - # Exit the loop and wrap the authentication error. - raise _user_exceptions.FlyteAuthenticationException(_six.text_type(e)) - cli_logger.error(f"Unauthenticated RPC error {e}, refreshing credentials and retrying\n") - refresh_handler_fn = _get_refresh_handler(_creds_config.AUTH_MODE.get()) - refresh_handler_fn(args[0]) + + for i in range(max_retries): + try: + return fn(*args, **kwargs) + except _RpcError as e: + if e.code() == _GrpcStatusCode.UNAUTHENTICATED: + # Always retry auth errors. + if i == (max_retries - 1): + # Exit the loop and wrap the authentication error. + raise _user_exceptions.FlyteAuthenticationException(_six.text_type(e)) + cli_logger.error(f"Unauthenticated RPC error {e}, refreshing credentials and retrying\n") + refresh_handler_fn = _get_refresh_handler(_creds_config.AUTH_MODE.get()) + refresh_handler_fn(args[0]) + # There are two cases that we should throw error immediately + # 1. Entity already exists when we register workflow + # 2. Entity not found when we fetch workflow + elif e.code() == _GrpcStatusCode.ALREADY_EXISTS: + raise _user_exceptions.FlyteEntityAlreadyExistsException(_six.text_type(e)) + elif e.code() == _GrpcStatusCode.NOT_FOUND: + raise _user_exceptions.FlyteEntityNotExistException(_six.text_type(e)) + else: + # No more retries if retry=False or max_retries reached. + if (retry is False) or i == (max_retries - 1): + raise else: - # No more retries if retry=False or max_retries reached. - if (retry is False) or i == (max_retries - 1): - raise - else: - # Retry: Start with 200ms wait-time and exponentially back-off up to 1 second. - wait_time = min(200 * (2 ** i), max_wait_time) - cli_logger.error(f"Non-auth RPC error {e}, sleeping {wait_time}ms and retrying") - time.sleep(wait_time / 1000) - except _RpcError as e: - if e.code() == _GrpcStatusCode.ALREADY_EXISTS: - raise _user_exceptions.FlyteEntityAlreadyExistsException(_six.text_type(e)) - else: - raise + # Retry: Start with 200ms wait-time and exponentially back-off up to 1 second. + wait_time = min(200 * (2 ** i), max_wait_time) + cli_logger.error(f"Non-auth RPC error {e}, sleeping {wait_time}ms and retrying") + time.sleep(wait_time / 1000) return handler diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 41a128caf9..04067b4c96 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -15,9 +15,11 @@ from flytekit.clients.friendly import SynchronousFlyteClient from flytekit.common import utils as common_utils +from flytekit.common.exceptions.user import FlyteEntityNotExistException from flytekit.configuration import platform as platform_config from flytekit.configuration import sdk as sdk_config from flytekit.configuration import set_flyte_config_file +from flytekit.core import context_manager from flytekit.core.interface import Interface from flytekit.loggers import remote_logger from flytekit.models import filters as filter_models @@ -884,11 +886,29 @@ def _( """Execute an @workflow-decorated function.""" resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) resolved_identifiers_dict = asdict(resolved_identifiers) + task_identifiers_dict = None + for node in entity.nodes: + try: + print(resolved_identifiers.version) + task_identifiers_dict = deepcopy(resolved_identifiers_dict) + task_identifiers_dict["name"] = node.flyte_entity.name + self.fetch_task(**task_identifiers_dict) + except FlyteEntityNotExistException: + self.register(node.flyte_entity, **task_identifiers_dict) + try: flyte_workflow: FlyteWorkflow = self.fetch_workflow(**resolved_identifiers_dict) - except Exception: + except FlyteEntityNotExistException: flyte_workflow: FlyteWorkflow = self.register(entity, **resolved_identifiers_dict) flyte_workflow.guessed_python_interface = entity.python_interface + + ctx = context_manager.FlyteContext.current_context() + try: + self.fetch_launch_plan(**resolved_identifiers_dict) + except FlyteEntityNotExistException: + default_lp = LaunchPlan.get_default_launch_plan(ctx, entity) + self.register(default_lp, **resolved_identifiers_dict) + return self.execute( flyte_workflow, inputs, diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 2b61f220c6..0eeb93572b 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -282,3 +282,12 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re output_obj = joblib.load(joblib_output.path) assert execution.outputs["o0"].extension() == "joblib" assert output_obj == input_obj + + +def test_execute_with_default_launch_plan(flyteclient, flyte_remote_env): + from mock_flyte_repo.workflows.basic.list_float_wf import my_wf + + remote = FlyteRemote.from_config(PROJECT, "development") + xs: typing.List[float] = [42.24, 999.1, 0.0001] + execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) + assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" From 516384e20602c86d1c5ad6fdab295ac69d46035b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 18 Oct 2021 23:00:30 +0800 Subject: [PATCH 02/23] Update comment Signed-off-by: Kevin Su --- flytekit/clients/raw.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index df288f27cf..184610deb9 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -139,8 +139,8 @@ def handler(*args, **kwargs): refresh_handler_fn = _get_refresh_handler(_creds_config.AUTH_MODE.get()) refresh_handler_fn(args[0]) # There are two cases that we should throw error immediately - # 1. Entity already exists when we register workflow - # 2. Entity not found when we fetch workflow + # 1. Entity already exists when we register entity + # 2. Entity not found when we fetch entity elif e.code() == _GrpcStatusCode.ALREADY_EXISTS: raise _user_exceptions.FlyteEntityAlreadyExistsException(_six.text_type(e)) elif e.code() == _GrpcStatusCode.NOT_FOUND: From 17ca892721d235e3f7b236eb29e4ee9c829a16b5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 18 Oct 2021 23:10:34 +0800 Subject: [PATCH 03/23] Added test Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 0eeb93572b..16967b7aa7 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -9,7 +9,7 @@ import pytest from flytekit import kwtypes -from flytekit.common.exceptions.user import FlyteAssertion +from flytekit.common.exceptions.user import FlyteAssertion, FlyteEntityNotExistException from flytekit.core.launch_plan import LaunchPlan from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task from flytekit.remote.remote import FlyteRemote @@ -291,3 +291,9 @@ def test_execute_with_default_launch_plan(flyteclient, flyte_remote_env): xs: typing.List[float] = [42.24, 999.1, 0.0001] execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" + + +def test_fetch_not_exist_launch_plan(flyteclient): + remote = FlyteRemote.from_config(PROJECT, "development") + with pytest.raises(FlyteEntityNotExistException): + remote.fetch_launch_plan(name="workflows.basic.list_float_wf.my_wf", version=f"v{VERSION}") From b511e8e4686f426469614ea8172eb7827c35bb0b Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 18 Oct 2021 23:20:48 +0800 Subject: [PATCH 04/23] Fixed lint Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 16967b7aa7..71cbd69ca4 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -289,7 +289,7 @@ def test_execute_with_default_launch_plan(flyteclient, flyte_remote_env): remote = FlyteRemote.from_config(PROJECT, "development") xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) + execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" From 8a15ab652ec21dbb4cdbffe63169b4a56a743034 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 18 Oct 2021 23:50:33 +0800 Subject: [PATCH 05/23] Fixed lint Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 71cbd69ca4..71ee545f6a 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -4,6 +4,7 @@ import pathlib import time import typing +import uuid import joblib import pytest @@ -287,13 +288,15 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re def test_execute_with_default_launch_plan(flyteclient, flyte_remote_env): from mock_flyte_repo.workflows.basic.list_float_wf import my_wf + my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) + version = uuid.uuid4().hex[:30] + str(int(time.time())) + execution = remote.execute(my_wf, version=version, inputs={"xs": xs}, wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" def test_fetch_not_exist_launch_plan(flyteclient): remote = FlyteRemote.from_config(PROJECT, "development") with pytest.raises(FlyteEntityNotExistException): - remote.fetch_launch_plan(name="workflows.basic.list_float_wf.my_wf", version=f"v{VERSION}") + remote.fetch_launch_plan(name="workflows.basic.list_float_wf.fake_wf", version=f"v{VERSION}") From a492ccfef5de951da5d6dc0bd6ac972403903d9a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 19 Oct 2021 00:35:52 +0800 Subject: [PATCH 06/23] Fixed test Signed-off-by: Kevin Su --- flytekit/remote/remote.py | 1 - tests/flytekit/integration/remote/test_remote.py | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 04067b4c96..47f9645605 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -889,7 +889,6 @@ def _( task_identifiers_dict = None for node in entity.nodes: try: - print(resolved_identifiers.version) task_identifiers_dict = deepcopy(resolved_identifiers_dict) task_identifiers_dict["name"] = node.flyte_entity.name self.fetch_task(**task_identifiers_dict) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 71ee545f6a..7f708d8f02 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -285,14 +285,13 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re assert output_obj == input_obj -def test_execute_with_default_launch_plan(flyteclient, flyte_remote_env): +def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env): from mock_flyte_repo.workflows.basic.list_float_wf import my_wf my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") xs: typing.List[float] = [42.24, 999.1, 0.0001] - version = uuid.uuid4().hex[:30] + str(int(time.time())) - execution = remote.execute(my_wf, version=version, inputs={"xs": xs}, wait=True) + execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" From abebc0cc0b8dcdbb398829c165783ed34423f1a7 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Oct 2021 02:28:43 +0800 Subject: [PATCH 07/23] Register subworkflow, launchplan node Signed-off-by: Kevin Su --- flytekit/clients/raw.py | 4 ++-- flytekit/remote/remote.py | 30 +++++++++++++++++++++--------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index 184610deb9..25c3b75b9b 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -142,9 +142,9 @@ def handler(*args, **kwargs): # 1. Entity already exists when we register entity # 2. Entity not found when we fetch entity elif e.code() == _GrpcStatusCode.ALREADY_EXISTS: - raise _user_exceptions.FlyteEntityAlreadyExistsException(_six.text_type(e)) + raise _user_exceptions.FlyteEntityAlreadyExistsException(e) elif e.code() == _GrpcStatusCode.NOT_FOUND: - raise _user_exceptions.FlyteEntityNotExistException(_six.text_type(e)) + raise _user_exceptions.FlyteEntityNotExistException(e) else: # No more retries if retry=False or max_retries reached. if (retry is False) or i == (max_retries - 1): diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 47f9645605..ab8ef4d5f8 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -1,6 +1,7 @@ """Module defining main Flyte backend entrypoint.""" from __future__ import annotations +import logging import os import time import typing @@ -15,7 +16,7 @@ from flytekit.clients.friendly import SynchronousFlyteClient from flytekit.common import utils as common_utils -from flytekit.common.exceptions.user import FlyteEntityNotExistException +from flytekit.common.exceptions.user import FlyteEntityAlreadyExistsException, FlyteEntityNotExistException from flytekit.configuration import platform as platform_config from flytekit.configuration import sdk as sdk_config from flytekit.configuration import set_flyte_config_file @@ -606,6 +607,22 @@ def _( ) return self.fetch_launch_plan(**resolved_identifiers) + def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: dict): + # Try to register all the entity in WorkflowBase including LaunchPlan, PythonTask, or subworkflow. + node_identifiers_dict = deepcopy(identifiers_dict) + try: + for node in entity.nodes: + node_identifiers_dict["name"] = node.flyte_entity.name + if isinstance(node.flyte_entity, WorkflowBase): + self._register_entity_if_not_exists(node.flyte_entity, identifiers_dict) + self.register(node.flyte_entity, **node_identifiers_dict) + elif isinstance(node.flyte_entity, PythonTask) or isinstance(node.flyte_entity, LaunchPlan): + self.register(node.flyte_entity, **node_identifiers_dict) + else: + raise TypeError(f"We don't support registering this kind of entity: {node.flyte_entity.name}") + except FlyteEntityAlreadyExistsException: + logging.info(f"{entity.name} already exists") + #################### # Execute Entities # #################### @@ -886,18 +903,12 @@ def _( """Execute an @workflow-decorated function.""" resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) resolved_identifiers_dict = asdict(resolved_identifiers) - task_identifiers_dict = None - for node in entity.nodes: - try: - task_identifiers_dict = deepcopy(resolved_identifiers_dict) - task_identifiers_dict["name"] = node.flyte_entity.name - self.fetch_task(**task_identifiers_dict) - except FlyteEntityNotExistException: - self.register(node.flyte_entity, **task_identifiers_dict) + self._register_entity_if_not_exists(entity, resolved_identifiers_dict) try: flyte_workflow: FlyteWorkflow = self.fetch_workflow(**resolved_identifiers_dict) except FlyteEntityNotExistException: + logging.info("Try to register FlyteWorkflow because it wasn't found in Flyte Admin!") flyte_workflow: FlyteWorkflow = self.register(entity, **resolved_identifiers_dict) flyte_workflow.guessed_python_interface = entity.python_interface @@ -905,6 +916,7 @@ def _( try: self.fetch_launch_plan(**resolved_identifiers_dict) except FlyteEntityNotExistException: + logging.info("Try to register default launch plan because it wasn't found in Flyte Admin!") default_lp = LaunchPlan.get_default_launch_plan(ctx, entity) self.register(default_lp, **resolved_identifiers_dict) From 3b3f9e5877b110e53f7929af6ceb886b7efe0b95 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Oct 2021 19:43:47 +0800 Subject: [PATCH 08/23] Fixed lint Signed-off-by: Kevin Su --- flytekit/remote/remote.py | 11 ++++++----- tests/flytekit/integration/remote/test_remote.py | 13 +++++++++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index ab8ef4d5f8..abddac533f 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -610,8 +610,9 @@ def _( def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: dict): # Try to register all the entity in WorkflowBase including LaunchPlan, PythonTask, or subworkflow. node_identifiers_dict = deepcopy(identifiers_dict) - try: - for node in entity.nodes: + + for node in entity.nodes: + try: node_identifiers_dict["name"] = node.flyte_entity.name if isinstance(node.flyte_entity, WorkflowBase): self._register_entity_if_not_exists(node.flyte_entity, identifiers_dict) @@ -619,9 +620,9 @@ def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: elif isinstance(node.flyte_entity, PythonTask) or isinstance(node.flyte_entity, LaunchPlan): self.register(node.flyte_entity, **node_identifiers_dict) else: - raise TypeError(f"We don't support registering this kind of entity: {node.flyte_entity.name}") - except FlyteEntityAlreadyExistsException: - logging.info(f"{entity.name} already exists") + raise NotImplementedError(f"We don't support registering this kind of entity: {node.flyte_entity}") + except FlyteEntityAlreadyExistsException: + logging.info(f"{entity.name} already exists") #################### # Execute Entities # diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 7f708d8f02..cae559f70d 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -235,10 +235,11 @@ def test_execute_python_workflow_dict_of_string_to_string(flyteclient, flyte_wor def test_execute_python_workflow_list_of_floats(flyteclient, flyte_workflows_register, flyte_remote_env): """Test execution of a @workflow-decorated python function and launchplan that are already registered.""" - from mock_flyte_repo.workflows.basic.list_float_wf import my_wf + from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") + concat_list._name = concat_list.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") xs: typing.List[float] = [42.24, 999.1, 0.0001] @@ -285,13 +286,17 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re assert output_obj == input_obj -def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env): - from mock_flyte_repo.workflows.basic.list_float_wf import my_wf +def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register): + from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf + # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") + concat_list._name = concat_list.name.replace("mock_flyte_repo.", "") + remote = FlyteRemote.from_config(PROJECT, "development") + version = uuid.uuid4().hex[:30] + str(int(time.time())) xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) + execution = remote.execute(my_wf, inputs={"xs": xs}, version=version, wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" From 3a0ec625cf7e27c5caeaf56e2a092fcd43b28161 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 20 Oct 2021 20:17:40 +0800 Subject: [PATCH 09/23] Fixed test Signed-off-by: Kevin Su --- flytekit/core/type_engine.py | 2 +- flytekit/remote/remote.py | 7 +++---- tests/flytekit/integration/remote/test_remote.py | 14 ++++++++------ 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 39a7be762a..925f79c770 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -543,7 +543,7 @@ def guess_python_type(cls, flyte_type: LiteralType) -> type: except ValueError: logger.debug(f"Skipping transformer {transformer.name} for {flyte_type}") - # Because the dataclass transformer is handled explicity in the get_transformer code, we have to handle it + # Because the dataclass transformer is handled explicitly in the get_transformer code, we have to handle it # separately here too. try: return cls._DATACLASS_TRANSFORMER.guess_python_type(literal_type=flyte_type) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index abddac533f..04af0d39de 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -607,15 +607,14 @@ def _( ) return self.fetch_launch_plan(**resolved_identifiers) - def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: dict): + def _register_entity_if_not_exists(self, entity: WorkflowBase, resolved_identifiers_dict: dict): # Try to register all the entity in WorkflowBase including LaunchPlan, PythonTask, or subworkflow. - node_identifiers_dict = deepcopy(identifiers_dict) - + node_identifiers_dict = deepcopy(resolved_identifiers_dict) for node in entity.nodes: try: node_identifiers_dict["name"] = node.flyte_entity.name if isinstance(node.flyte_entity, WorkflowBase): - self._register_entity_if_not_exists(node.flyte_entity, identifiers_dict) + self._register_entity_if_not_exists(node.flyte_entity, node_identifiers_dict) self.register(node.flyte_entity, **node_identifiers_dict) elif isinstance(node.flyte_entity, PythonTask) or isinstance(node.flyte_entity, LaunchPlan): self.register(node.flyte_entity, **node_identifiers_dict) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index cae559f70d..6ea4daea58 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -2,6 +2,7 @@ import json import os import pathlib +import subprocess import time import typing import uuid @@ -18,6 +19,9 @@ PROJECT = "flytesnacks" VERSION = os.getpid() +command = "cd " + os.path.dirname(os.path.abspath(__file__)) + "; git rev-parse HEAD;" +IMAGE_TAG = subprocess.run(command, capture_output=True, shell=True).stdout.decode("ascii").strip() +IMAGE_NAME = "flytecookbook:workflows-" + IMAGE_TAG @pytest.fixture(scope="session") @@ -38,7 +42,7 @@ def flyte_remote_env(docker_services): os.environ["FLYTE_INTERNAL_PROJECT"] = PROJECT os.environ["FLYTE_INTERNAL_DOMAIN"] = "development" os.environ["FLYTE_INTERNAL_VERSION"] = f"v{VERSION}" - os.environ["FLYTE_INTERNAL_IMAGE"] = "default:tag" + os.environ["FLYTE_INTERNAL_IMAGE"] = IMAGE_NAME os.environ["FLYTE_CLOUD_PROVIDER"] = "aws" os.environ["FLYTE_AWS_ENDPOINT"] = f"http://localhost:{docker_services.port_for('backend', 30084)}" os.environ["FLYTE_AWS_ACCESS_KEY_ID"] = "minio" @@ -235,11 +239,10 @@ def test_execute_python_workflow_dict_of_string_to_string(flyteclient, flyte_wor def test_execute_python_workflow_list_of_floats(flyteclient, flyte_workflows_register, flyte_remote_env): """Test execution of a @workflow-decorated python function and launchplan that are already registered.""" - from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf + from mock_flyte_repo.workflows.basic.list_float_wf import my_wf # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") - concat_list._name = concat_list.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") xs: typing.List[float] = [42.24, 999.1, 0.0001] @@ -286,12 +289,11 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re assert output_obj == input_obj -def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register): - from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf +def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env): + from mock_flyte_repo.workflows.basic.list_float_wf import my_wf # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") - concat_list._name = concat_list.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") version = uuid.uuid4().hex[:30] + str(int(time.time())) From 004185fc773a50311cda2106d9eb478bd22a2bf5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 02:54:35 +0800 Subject: [PATCH 10/23] Fixed tests Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 6ea4daea58..adcc3980f9 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -32,7 +32,7 @@ def flyte_workflows_source_dir(): @pytest.fixture(scope="session") def flyte_workflows_register(docker_compose): docker_compose.execute( - f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{VERSION} " + f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{IMAGE_TAG} " "backend make -C workflows register" ) From 8958430ed67d3aea23f3cddd9adef458328954a8 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 03:20:12 +0800 Subject: [PATCH 11/23] Fixed tests Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index adcc3980f9..930bfb03fb 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -19,9 +19,7 @@ PROJECT = "flytesnacks" VERSION = os.getpid() -command = "cd " + os.path.dirname(os.path.abspath(__file__)) + "; git rev-parse HEAD;" -IMAGE_TAG = subprocess.run(command, capture_output=True, shell=True).stdout.decode("ascii").strip() -IMAGE_NAME = "flytecookbook:workflows-" + IMAGE_TAG +IMAGE_NAME = "flytecookbook:workflows-" + str(VERSION) @pytest.fixture(scope="session") @@ -32,7 +30,7 @@ def flyte_workflows_source_dir(): @pytest.fixture(scope="session") def flyte_workflows_register(docker_compose): docker_compose.execute( - f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{IMAGE_TAG} " + f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{VERSION} " "backend make -C workflows register" ) From d48699032c22607f200b8a77c577c6e780408a91 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 11:23:55 +0800 Subject: [PATCH 12/23] Fixed tests Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 930bfb03fb..0ce7ec496b 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -19,7 +19,7 @@ PROJECT = "flytesnacks" VERSION = os.getpid() -IMAGE_NAME = "flytecookbook:workflows-" + str(VERSION) +IMAGE_NAME = "flytecookbook:workflows-v" + str(VERSION) @pytest.fixture(scope="session") From 64a8ddaa2e7dfc70bd0ee858ba48eb0b97a96029 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 14:04:45 +0800 Subject: [PATCH 13/23] Fixed tests Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 0ce7ec496b..f387a0a611 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -2,7 +2,6 @@ import json import os import pathlib -import subprocess import time import typing import uuid @@ -14,12 +13,13 @@ from flytekit.common.exceptions.user import FlyteAssertion, FlyteEntityNotExistException from flytekit.core.launch_plan import LaunchPlan from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task +from flytekit.remote import FlyteTask from flytekit.remote.remote import FlyteRemote from flytekit.types.schema import FlyteSchema PROJECT = "flytesnacks" VERSION = os.getpid() -IMAGE_NAME = "flytecookbook:workflows-v" + str(VERSION) +IMAGE_NAME = f"flytecookbook:workflows-v{VERSION}" @pytest.fixture(scope="session") @@ -28,7 +28,7 @@ def flyte_workflows_source_dir(): @pytest.fixture(scope="session") -def flyte_workflows_register(docker_compose): +def flyte_workflows_registers(docker_compose): docker_compose.execute( f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{VERSION} " "backend make -C workflows register" @@ -156,6 +156,8 @@ def test_fetch_execute_workflow(flyteclient, flyte_workflows_register): def test_fetch_execute_task(flyteclient, flyte_workflows_register): remote = FlyteRemote.from_config(PROJECT, "development") flyte_task = remote.fetch_task(name="workflows.basic.basic_workflow.t1", version=f"v{VERSION}") + print(typing.cast(FlyteTask, flyte_task).container.image) + assert typing.cast(FlyteTask, flyte_task).container.image == IMAGE_NAME execution = remote.execute(flyte_task, {"a": 10}, wait=True) assert execution.outputs["t1_int_output"] == 12 assert execution.outputs["c"] == "world" From 8aaedda8a824f0c131ee0c4fe88f810dfd9bed82 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 14:40:41 +0800 Subject: [PATCH 14/23] Fixed tests Signed-off-by: Kevin Su --- flytekit/remote/remote.py | 3 ++- tests/flytekit/integration/remote/test_remote.py | 14 ++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 04af0d39de..9905b3fefd 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -205,7 +205,6 @@ def __init__( raise user_exceptions.FlyteAssertion("Cannot find flyte admin url in config file.") self._client = SynchronousFlyteClient(flyte_admin_url, insecure=insecure, credentials=grpc_credentials) - # read config files, env vars, host, ssl options for admin client self._flyte_admin_url = flyte_admin_url self._insecure = insecure @@ -217,6 +216,7 @@ def __init__( self._labels = labels self._annotations = annotations self._raw_output_data_config = raw_output_data_config + self._grpc_credentials = grpc_credentials # Save the file access object locally, but also make it available for use from the context. FlyteContextManager.with_context(FlyteContextManager.current_context().with_file_access(file_access).build()) @@ -302,6 +302,7 @@ def with_overrides( ): """Create a copy of the remote object, overriding the specified attributes.""" new_remote = deepcopy(self) + # new_remote = FlyteRemote(self._flyte_admin_url, insecure, default_project, default_domain) if default_project: new_remote._default_project = default_project if default_domain: diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index f387a0a611..61c1daa1dd 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -11,6 +11,7 @@ from flytekit import kwtypes from flytekit.common.exceptions.user import FlyteAssertion, FlyteEntityNotExistException +from flytekit.core.context_manager import Image, ImageConfig from flytekit.core.launch_plan import LaunchPlan from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task from flytekit.remote import FlyteTask @@ -28,7 +29,7 @@ def flyte_workflows_source_dir(): @pytest.fixture(scope="session") -def flyte_workflows_registers(docker_compose): +def flyte_workflows_register(docker_compose): docker_compose.execute( f"exec -w /flyteorg/src -e SANDBOX=1 -e PROJECT={PROJECT} -e VERSION=v{VERSION} " "backend make -C workflows register" @@ -157,7 +158,7 @@ def test_fetch_execute_task(flyteclient, flyte_workflows_register): remote = FlyteRemote.from_config(PROJECT, "development") flyte_task = remote.fetch_task(name="workflows.basic.basic_workflow.t1", version=f"v{VERSION}") print(typing.cast(FlyteTask, flyte_task).container.image) - assert typing.cast(FlyteTask, flyte_task).container.image == IMAGE_NAME + assert typing.cast(FlyteTask, flyte_task).container.image == "test" execution = remote.execute(flyte_task, {"a": 10}, wait=True) assert execution.outputs["t1_int_output"] == 12 assert execution.outputs["c"] == "world" @@ -243,14 +244,15 @@ def test_execute_python_workflow_list_of_floats(flyteclient, flyte_workflows_reg # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") - remote = FlyteRemote.from_config(PROJECT, "development") + + version = uuid.uuid4().hex[:30] + str(int(time.time())) xs: typing.List[float] = [42.24, 999.1, 0.0001] execution = remote.execute(my_wf, inputs={"xs": xs}, version=f"v{VERSION}", wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" launch_plan = LaunchPlan.get_or_create(workflow=my_wf, name=my_wf.name) - execution = remote.execute(launch_plan, inputs={"xs": [-1.1, 0.12345]}, version=f"v{VERSION}", wait=True) + execution = remote.execute(launch_plan, inputs={"xs": [-1.1, 0.12345]}, version=version, wait=True) assert execution.outputs["o0"] == "[-1.1, 0.12345]" @@ -290,12 +292,12 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env): - from mock_flyte_repo.workflows.basic.list_float_wf import my_wf + from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") - remote = FlyteRemote.from_config(PROJECT, "development") + version = uuid.uuid4().hex[:30] + str(int(time.time())) xs: typing.List[float] = [42.24, 999.1, 0.0001] execution = remote.execute(my_wf, inputs={"xs": xs}, version=version, wait=True) From 3a332536ef91f2e2490a0168f678913dcd37137a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 18:16:54 +0800 Subject: [PATCH 15/23] Fix tests Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 61c1daa1dd..386df08ded 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -157,8 +157,6 @@ def test_fetch_execute_workflow(flyteclient, flyte_workflows_register): def test_fetch_execute_task(flyteclient, flyte_workflows_register): remote = FlyteRemote.from_config(PROJECT, "development") flyte_task = remote.fetch_task(name="workflows.basic.basic_workflow.t1", version=f"v{VERSION}") - print(typing.cast(FlyteTask, flyte_task).container.image) - assert typing.cast(FlyteTask, flyte_task).container.image == "test" execution = remote.execute(flyte_task, {"a": 10}, wait=True) assert execution.outputs["t1_int_output"] == 12 assert execution.outputs["c"] == "world" @@ -185,7 +183,7 @@ def test_execute_python_workflow_and_launch_plan(flyteclient, flyte_workflows_re my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") - execution = remote.execute(my_wf, inputs={"a": 10, "b": "xyz"}, version=f"v{VERSION}", wait=True) + execution = remote.execute(my_wf, inputs={"a": 10, "b": "xyz"}, wait=True) assert execution.outputs["o0"] == 12 assert execution.outputs["o1"] == "xyzworld" @@ -246,13 +244,12 @@ def test_execute_python_workflow_list_of_floats(flyteclient, flyte_workflows_reg my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") - version = uuid.uuid4().hex[:30] + str(int(time.time())) xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, version=f"v{VERSION}", wait=True) + execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" launch_plan = LaunchPlan.get_or_create(workflow=my_wf, name=my_wf.name) - execution = remote.execute(launch_plan, inputs={"xs": [-1.1, 0.12345]}, version=version, wait=True) + execution = remote.execute(launch_plan, inputs={"xs": [-1.1, 0.12345]}, version=f"v{VERSION}", wait=True) assert execution.outputs["o0"] == "[-1.1, 0.12345]" @@ -292,15 +289,14 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env): - from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf + from mock_flyte_repo.workflows.basic.list_float_wf import my_wf # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") - version = uuid.uuid4().hex[:30] + str(int(time.time())) xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, version=version, wait=True) + execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" From dbb375a571d66f9b257e3abb4a521effaa58a479 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 19:04:49 +0800 Subject: [PATCH 16/23] Fix tests Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 386df08ded..7cdd2f98a3 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -4,23 +4,19 @@ import pathlib import time import typing -import uuid import joblib import pytest from flytekit import kwtypes from flytekit.common.exceptions.user import FlyteAssertion, FlyteEntityNotExistException -from flytekit.core.context_manager import Image, ImageConfig from flytekit.core.launch_plan import LaunchPlan from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task -from flytekit.remote import FlyteTask from flytekit.remote.remote import FlyteRemote from flytekit.types.schema import FlyteSchema PROJECT = "flytesnacks" VERSION = os.getpid() -IMAGE_NAME = f"flytecookbook:workflows-v{VERSION}" @pytest.fixture(scope="session") @@ -41,7 +37,7 @@ def flyte_remote_env(docker_services): os.environ["FLYTE_INTERNAL_PROJECT"] = PROJECT os.environ["FLYTE_INTERNAL_DOMAIN"] = "development" os.environ["FLYTE_INTERNAL_VERSION"] = f"v{VERSION}" - os.environ["FLYTE_INTERNAL_IMAGE"] = IMAGE_NAME + os.environ["FLYTE_INTERNAL_IMAGE"] = "default:tag" os.environ["FLYTE_CLOUD_PROVIDER"] = "aws" os.environ["FLYTE_AWS_ENDPOINT"] = f"http://localhost:{docker_services.port_for('backend', 30084)}" os.environ["FLYTE_AWS_ACCESS_KEY_ID"] = "minio" @@ -187,7 +183,7 @@ def test_execute_python_workflow_and_launch_plan(flyteclient, flyte_workflows_re assert execution.outputs["o0"] == 12 assert execution.outputs["o1"] == "xyzworld" - launch_plan = LaunchPlan.get_or_create(workflow=my_wf, name=my_wf.name) + launch_plan = LaunchPlan.get_or_create(workflow=my_wf, version=f"v{VERSION}", name=my_wf.name) execution = remote.execute(launch_plan, inputs={"a": 14, "b": "foobar"}, version=f"v{VERSION}", wait=True) assert execution.outputs["o0"] == 16 assert execution.outputs["o1"] == "foobarworld" @@ -245,7 +241,7 @@ def test_execute_python_workflow_list_of_floats(flyteclient, flyte_workflows_reg remote = FlyteRemote.from_config(PROJECT, "development") xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) + execution = remote.execute(my_wf, inputs={"xs": xs}, version=f"v{VERSION}", wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" launch_plan = LaunchPlan.get_or_create(workflow=my_wf, name=my_wf.name) @@ -296,7 +292,7 @@ def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, remote = FlyteRemote.from_config(PROJECT, "development") xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) + execution = remote.execute(my_wf, inputs={"xs": xs}, version=f"v{VERSION}", wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" From dd5c39b3d43dc5d99c92cde9ff0e1a0f16a53566 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 21:53:06 +0800 Subject: [PATCH 17/23] Fix tests Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 7cdd2f98a3..dece9f4aca 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -179,11 +179,11 @@ def test_execute_python_workflow_and_launch_plan(flyteclient, flyte_workflows_re my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") - execution = remote.execute(my_wf, inputs={"a": 10, "b": "xyz"}, wait=True) + execution = remote.execute(my_wf, inputs={"a": 10, "b": "xyz"}, version=f"v{VERSION}", wait=True) assert execution.outputs["o0"] == 12 assert execution.outputs["o1"] == "xyzworld" - launch_plan = LaunchPlan.get_or_create(workflow=my_wf, version=f"v{VERSION}", name=my_wf.name) + launch_plan = LaunchPlan.get_or_create(workflow=my_wf, name=my_wf.name) execution = remote.execute(launch_plan, inputs={"a": 14, "b": "foobar"}, version=f"v{VERSION}", wait=True) assert execution.outputs["o0"] == 16 assert execution.outputs["o1"] == "foobarworld" From 7cb61a2090284f98c66495bf7c448fafbbeeb25d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 22 Oct 2021 22:39:02 +0800 Subject: [PATCH 18/23] Fix tests Signed-off-by: Kevin Su --- flytekit/remote/remote.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 9905b3fefd..f447c56add 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -216,7 +216,6 @@ def __init__( self._labels = labels self._annotations = annotations self._raw_output_data_config = raw_output_data_config - self._grpc_credentials = grpc_credentials # Save the file access object locally, but also make it available for use from the context. FlyteContextManager.with_context(FlyteContextManager.current_context().with_file_access(file_access).build()) @@ -302,7 +301,6 @@ def with_overrides( ): """Create a copy of the remote object, overriding the specified attributes.""" new_remote = deepcopy(self) - # new_remote = FlyteRemote(self._flyte_admin_url, insecure, default_project, default_domain) if default_project: new_remote._default_project = default_project if default_domain: @@ -623,6 +621,8 @@ def _register_entity_if_not_exists(self, entity: WorkflowBase, resolved_identifi raise NotImplementedError(f"We don't support registering this kind of entity: {node.flyte_entity}") except FlyteEntityAlreadyExistsException: logging.info(f"{entity.name} already exists") + except Exception as e: + logging.info(f"Failed to register Flyte entity {entity.name} with error v{e}") #################### # Execute Entities # From bab4af410ee8079703d4c05c3111c20716bcfa5e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 23 Oct 2021 03:31:55 +0800 Subject: [PATCH 19/23] Fix tests Signed-off-by: Kevin Su --- flytekit/remote/remote.py | 4 ++-- .../flytekit/integration/remote/test_remote.py | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index f447c56add..a5be6f8ae0 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -17,6 +17,7 @@ from flytekit.clients.friendly import SynchronousFlyteClient from flytekit.common import utils as common_utils from flytekit.common.exceptions.user import FlyteEntityAlreadyExistsException, FlyteEntityNotExistException +from flytekit.configuration import internal from flytekit.configuration import platform as platform_config from flytekit.configuration import sdk as sdk_config from flytekit.configuration import set_flyte_config_file @@ -522,6 +523,7 @@ def _serialize( domain or self.default_domain, version or self.version, self.image_config, + env={internal.IMAGE.env_var: self.image_config.default_image.full}, ), entity=entity, ) @@ -621,8 +623,6 @@ def _register_entity_if_not_exists(self, entity: WorkflowBase, resolved_identifi raise NotImplementedError(f"We don't support registering this kind of entity: {node.flyte_entity}") except FlyteEntityAlreadyExistsException: logging.info(f"{entity.name} already exists") - except Exception as e: - logging.info(f"Failed to register Flyte entity {entity.name} with error v{e}") #################### # Execute Entities # diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index dece9f4aca..c00564a5f3 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -285,15 +285,23 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env): - from mock_flyte_repo.workflows.basic.list_float_wf import my_wf + from mock_flyte_repo.workflows.basic.subworkflows import parent_wf # make sure the task name is the same as the name used during registration - my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") + parent_wf._name = parent_wf.name.replace("mock_flyte_repo.", "") + remote = FlyteRemote.from_config(PROJECT, "development") + execution = remote.execute(parent_wf, {"a": 101}, version=f"v{VERSION}", wait=True) + # check node execution inputs and outputs + assert execution.node_executions["n0"].inputs == {"a": 101} + assert execution.node_executions["n0"].outputs == {"t1_int_output": 103, "c": "world"} + assert execution.node_executions["n1"].inputs == {"a": 103} + assert execution.node_executions["n1"].outputs == {"o0": "world", "o1": "world"} - xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, version=f"v{VERSION}", wait=True) - assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]" + # check subworkflow task execution inputs and outputs + subworkflow_node_executions = execution.node_executions["n1"].subworkflow_node_executions + subworkflow_node_executions["n1-0-n0"].inputs == {"a": 103} + subworkflow_node_executions["n1-0-n1"].outputs == {"t1_int_output": 107, "c": "world"} def test_fetch_not_exist_launch_plan(flyteclient): From 4482254029ca159c867037f2d31dade5b1c65d25 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sat, 23 Oct 2021 04:58:29 +0800 Subject: [PATCH 20/23] Fix tests Signed-off-by: Kevin Su --- .../mock_flyte_repo/workflows/requirements.in | 2 +- .../mock_flyte_repo/workflows/requirements.txt | 18 +++--------------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in b/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in index 8a4e3c9353..d0e0c5ba1a 100644 --- a/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in +++ b/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in @@ -1,4 +1,4 @@ -flytekit>=0.17.0b0 +flytekit>=0.23.1 joblib wheel matplotlib diff --git a/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt b/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt index bb746a6e9b..8149272e9b 100644 --- a/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt +++ b/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt @@ -2,22 +2,18 @@ # This file is autogenerated by pip-compile with python 3.8 # To update, run: # -# make tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt +# pip-compile tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in # attrs==21.2.0 # via scantree certifi==2021.10.8 # via requests -cffi==1.15.0 - # via cryptography charset-normalizer==2.0.7 # via requests click==7.1.2 # via flytekit croniter==1.0.15 # via flytekit -cryptography==35.0.0 - # via secretstorage cycler==0.10.0 # via matplotlib dataclasses-json==0.5.6 @@ -36,7 +32,7 @@ docstring-parser==0.12 # via flytekit flyteidl==0.21.6 # via flytekit -flytekit==0.23.0 +flytekit==0.23.1 # via -r tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in grpcio==1.41.0 # via flytekit @@ -44,10 +40,6 @@ idna==3.3 # via requests importlib-metadata==4.8.1 # via keyring -jeepney==0.7.1 - # via - # keyring - # secretstorage joblib==1.1.0 # via -r tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in keyring==23.2.1 @@ -61,7 +53,7 @@ marshmallow==3.14.0 # marshmallow-jsonschema marshmallow-enum==1.5.1 # via dataclasses-json -marshmallow-jsonschema==0.12.0 +marshmallow-jsonschema==0.13.0 # via flytekit matplotlib==3.4.3 # via -r tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in @@ -91,8 +83,6 @@ py==1.10.0 # via retry pyarrow==3.0.0 # via flytekit -pycparser==2.20 - # via cffi pyparsing==2.4.7 # via matplotlib python-dateutil==2.8.1 @@ -121,8 +111,6 @@ retry==0.9.2 # via flytekit scantree==0.0.1 # via dirhash -secretstorage==3.3.1 - # via keyring six==1.16.0 # via # cycler From f804b48265b6825fede2683c6741af225af83642 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Oct 2021 14:21:15 +0800 Subject: [PATCH 21/23] Fixed test Signed-off-by: Kevin Su --- tests/flytekit/integration/remote/test_remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index c00564a5f3..e33178ade0 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -179,7 +179,7 @@ def test_execute_python_workflow_and_launch_plan(flyteclient, flyte_workflows_re my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") - execution = remote.execute(my_wf, inputs={"a": 10, "b": "xyz"}, version=f"v{VERSION}", wait=True) + execution = remote.execute(my_wf, inputs={"a": 10, "b": "xyz"}, wait=True) assert execution.outputs["o0"] == 12 assert execution.outputs["o1"] == "xyzworld" From fb90ca205089e256121234a18632338c596c7429 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 25 Oct 2021 18:12:29 +0800 Subject: [PATCH 22/23] Fixed test Signed-off-by: Kevin Su --- flytekit/remote/remote.py | 2 ++ .../mock_flyte_repo/workflows/requirements.in | 2 +- .../mock_flyte_repo/workflows/requirements.txt | 18 +++++++++++++++--- .../flytekit/integration/remote/test_remote.py | 2 +- 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index a5be6f8ae0..d9ee248492 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -623,6 +623,8 @@ def _register_entity_if_not_exists(self, entity: WorkflowBase, resolved_identifi raise NotImplementedError(f"We don't support registering this kind of entity: {node.flyte_entity}") except FlyteEntityAlreadyExistsException: logging.info(f"{entity.name} already exists") + except Exception as e: + logging.info(f"Failed to register entity {entity.name} with error {e}") #################### # Execute Entities # diff --git a/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in b/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in index d0e0c5ba1a..8a4e3c9353 100644 --- a/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in +++ b/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in @@ -1,4 +1,4 @@ -flytekit>=0.23.1 +flytekit>=0.17.0b0 joblib wheel matplotlib diff --git a/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt b/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt index 8149272e9b..bb746a6e9b 100644 --- a/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt +++ b/tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt @@ -2,18 +2,22 @@ # This file is autogenerated by pip-compile with python 3.8 # To update, run: # -# pip-compile tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in +# make tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.txt # attrs==21.2.0 # via scantree certifi==2021.10.8 # via requests +cffi==1.15.0 + # via cryptography charset-normalizer==2.0.7 # via requests click==7.1.2 # via flytekit croniter==1.0.15 # via flytekit +cryptography==35.0.0 + # via secretstorage cycler==0.10.0 # via matplotlib dataclasses-json==0.5.6 @@ -32,7 +36,7 @@ docstring-parser==0.12 # via flytekit flyteidl==0.21.6 # via flytekit -flytekit==0.23.1 +flytekit==0.23.0 # via -r tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in grpcio==1.41.0 # via flytekit @@ -40,6 +44,10 @@ idna==3.3 # via requests importlib-metadata==4.8.1 # via keyring +jeepney==0.7.1 + # via + # keyring + # secretstorage joblib==1.1.0 # via -r tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in keyring==23.2.1 @@ -53,7 +61,7 @@ marshmallow==3.14.0 # marshmallow-jsonschema marshmallow-enum==1.5.1 # via dataclasses-json -marshmallow-jsonschema==0.13.0 +marshmallow-jsonschema==0.12.0 # via flytekit matplotlib==3.4.3 # via -r tests/flytekit/integration/remote/mock_flyte_repo/workflows/requirements.in @@ -83,6 +91,8 @@ py==1.10.0 # via retry pyarrow==3.0.0 # via flytekit +pycparser==2.20 + # via cffi pyparsing==2.4.7 # via matplotlib python-dateutil==2.8.1 @@ -111,6 +121,8 @@ retry==0.9.2 # via flytekit scantree==0.0.1 # via dirhash +secretstorage==3.3.1 + # via keyring six==1.16.0 # via # cycler diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index e33178ade0..c00564a5f3 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -179,7 +179,7 @@ def test_execute_python_workflow_and_launch_plan(flyteclient, flyte_workflows_re my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") - execution = remote.execute(my_wf, inputs={"a": 10, "b": "xyz"}, wait=True) + execution = remote.execute(my_wf, inputs={"a": 10, "b": "xyz"}, version=f"v{VERSION}", wait=True) assert execution.outputs["o0"] == 12 assert execution.outputs["o1"] == "xyzworld" From 8093ff6cc6e679235715ee930f7b508f78ba6ad2 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 27 Oct 2021 17:22:09 +0800 Subject: [PATCH 23/23] Add link Signed-off-by: Kevin Su --- flytekit/remote/remote.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index d9ee248492..d9fcd4a0ed 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -523,6 +523,7 @@ def _serialize( domain or self.default_domain, version or self.version, self.image_config, + # https://github.com/flyteorg/flyte/issues/1359 env={internal.IMAGE.env_var: self.image_config.default_image.full}, ), entity=entity, @@ -907,11 +908,11 @@ def _( resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) resolved_identifiers_dict = asdict(resolved_identifiers) - self._register_entity_if_not_exists(entity, resolved_identifiers_dict) try: flyte_workflow: FlyteWorkflow = self.fetch_workflow(**resolved_identifiers_dict) except FlyteEntityNotExistException: logging.info("Try to register FlyteWorkflow because it wasn't found in Flyte Admin!") + self._register_entity_if_not_exists(entity, resolved_identifiers_dict) flyte_workflow: FlyteWorkflow = self.register(entity, **resolved_identifiers_dict) flyte_workflow.guessed_python_interface = entity.python_interface