diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index 5a2c00d5a5..4e0abd03c1 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -140,9 +140,9 @@ def handler(*args, **kwargs): # 1. Entity already exists when we register entity # 2. Entity not found when we fetch entity elif e.code() == _GrpcStatusCode.ALREADY_EXISTS: - raise _user_exceptions.FlyteEntityAlreadyExistsException(_six.text_type(e)) + raise _user_exceptions.FlyteEntityAlreadyExistsException(e) elif e.code() == _GrpcStatusCode.NOT_FOUND: - raise _user_exceptions.FlyteEntityNotExistException(_six.text_type(e)) + raise _user_exceptions.FlyteEntityNotExistException(e) else: # No more retries if retry=False or max_retries reached. if (retry is False) or i == (max_retries - 1): diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 47f9645605..ab8ef4d5f8 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -1,6 +1,7 @@ """Module defining main Flyte backend entrypoint.""" from __future__ import annotations +import logging import os import time import typing @@ -15,7 +16,7 @@ from flytekit.clients.friendly import SynchronousFlyteClient from flytekit.common import utils as common_utils -from flytekit.common.exceptions.user import FlyteEntityNotExistException +from flytekit.common.exceptions.user import FlyteEntityAlreadyExistsException, FlyteEntityNotExistException from flytekit.configuration import platform as platform_config from flytekit.configuration import sdk as sdk_config from flytekit.configuration import set_flyte_config_file @@ -606,6 +607,22 @@ def _( ) return self.fetch_launch_plan(**resolved_identifiers) + def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: dict): + # Try to register all the entity in WorkflowBase including LaunchPlan, PythonTask, or subworkflow. + node_identifiers_dict = deepcopy(identifiers_dict) + try: + for node in entity.nodes: + node_identifiers_dict["name"] = node.flyte_entity.name + if isinstance(node.flyte_entity, WorkflowBase): + self._register_entity_if_not_exists(node.flyte_entity, identifiers_dict) + self.register(node.flyte_entity, **node_identifiers_dict) + elif isinstance(node.flyte_entity, PythonTask) or isinstance(node.flyte_entity, LaunchPlan): + self.register(node.flyte_entity, **node_identifiers_dict) + else: + raise TypeError(f"We don't support registering this kind of entity: {node.flyte_entity.name}") + except FlyteEntityAlreadyExistsException: + logging.info(f"{entity.name} already exists") + #################### # Execute Entities # #################### @@ -886,18 +903,12 @@ def _( """Execute an @workflow-decorated function.""" resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) resolved_identifiers_dict = asdict(resolved_identifiers) - task_identifiers_dict = None - for node in entity.nodes: - try: - task_identifiers_dict = deepcopy(resolved_identifiers_dict) - task_identifiers_dict["name"] = node.flyte_entity.name - self.fetch_task(**task_identifiers_dict) - except FlyteEntityNotExistException: - self.register(node.flyte_entity, **task_identifiers_dict) + self._register_entity_if_not_exists(entity, resolved_identifiers_dict) try: flyte_workflow: FlyteWorkflow = self.fetch_workflow(**resolved_identifiers_dict) except FlyteEntityNotExistException: + logging.info("Try to register FlyteWorkflow because it wasn't found in Flyte Admin!") flyte_workflow: FlyteWorkflow = self.register(entity, **resolved_identifiers_dict) flyte_workflow.guessed_python_interface = entity.python_interface @@ -905,6 +916,7 @@ def _( try: self.fetch_launch_plan(**resolved_identifiers_dict) except FlyteEntityNotExistException: + logging.info("Try to register default launch plan because it wasn't found in Flyte Admin!") default_lp = LaunchPlan.get_default_launch_plan(ctx, entity) self.register(default_lp, **resolved_identifiers_dict)