Skip to content

Commit

Permalink
Register subworkflow, launchplan node
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Oct 19, 2021
1 parent 5aa98e6 commit c043255
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
4 changes: 2 additions & 2 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ def handler(*args, **kwargs):
# 1. Entity already exists when we register entity
# 2. Entity not found when we fetch entity
elif e.code() == _GrpcStatusCode.ALREADY_EXISTS:
raise _user_exceptions.FlyteEntityAlreadyExistsException(_six.text_type(e))
raise _user_exceptions.FlyteEntityAlreadyExistsException(e)
elif e.code() == _GrpcStatusCode.NOT_FOUND:
raise _user_exceptions.FlyteEntityNotExistException(_six.text_type(e))
raise _user_exceptions.FlyteEntityNotExistException(e)
else:
# No more retries if retry=False or max_retries reached.
if (retry is False) or i == (max_retries - 1):
Expand Down
30 changes: 21 additions & 9 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Module defining main Flyte backend entrypoint."""
from __future__ import annotations

import logging
import os
import time
import typing
Expand All @@ -15,7 +16,7 @@

from flytekit.clients.friendly import SynchronousFlyteClient
from flytekit.common import utils as common_utils
from flytekit.common.exceptions.user import FlyteEntityNotExistException
from flytekit.common.exceptions.user import FlyteEntityAlreadyExistsException, FlyteEntityNotExistException
from flytekit.configuration import platform as platform_config
from flytekit.configuration import sdk as sdk_config
from flytekit.configuration import set_flyte_config_file
Expand Down Expand Up @@ -606,6 +607,22 @@ def _(
)
return self.fetch_launch_plan(**resolved_identifiers)

def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: dict):
# Try to register all the entity in WorkflowBase including LaunchPlan, PythonTask, or subworkflow.
node_identifiers_dict = deepcopy(identifiers_dict)
try:
for node in entity.nodes:
node_identifiers_dict["name"] = node.flyte_entity.name
if isinstance(node.flyte_entity, WorkflowBase):
self._register_entity_if_not_exists(node.flyte_entity, identifiers_dict)
self.register(node.flyte_entity, **node_identifiers_dict)
elif isinstance(node.flyte_entity, PythonTask) or isinstance(node.flyte_entity, LaunchPlan):
self.register(node.flyte_entity, **node_identifiers_dict)
else:
raise TypeError(f"We don't support registering this kind of entity: {node.flyte_entity.name}")
except FlyteEntityAlreadyExistsException:
logging.info(f"{entity.name} already exists")

####################
# Execute Entities #
####################
Expand Down Expand Up @@ -886,25 +903,20 @@ def _(
"""Execute an @workflow-decorated function."""
resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version)
resolved_identifiers_dict = asdict(resolved_identifiers)
task_identifiers_dict = None
for node in entity.nodes:
try:
task_identifiers_dict = deepcopy(resolved_identifiers_dict)
task_identifiers_dict["name"] = node.flyte_entity.name
self.fetch_task(**task_identifiers_dict)
except FlyteEntityNotExistException:
self.register(node.flyte_entity, **task_identifiers_dict)

self._register_entity_if_not_exists(entity, resolved_identifiers_dict)
try:
flyte_workflow: FlyteWorkflow = self.fetch_workflow(**resolved_identifiers_dict)
except FlyteEntityNotExistException:
logging.info("Try to register FlyteWorkflow because it wasn't found in Flyte Admin!")
flyte_workflow: FlyteWorkflow = self.register(entity, **resolved_identifiers_dict)
flyte_workflow.guessed_python_interface = entity.python_interface

ctx = context_manager.FlyteContext.current_context()
try:
self.fetch_launch_plan(**resolved_identifiers_dict)
except FlyteEntityNotExistException:
logging.info("Try to register default launch plan because it wasn't found in Flyte Admin!")
default_lp = LaunchPlan.get_default_launch_plan(ctx, entity)
self.register(default_lp, **resolved_identifiers_dict)

Expand Down

0 comments on commit c043255

Please sign in to comment.