Skip to content

Commit

Permalink
Fixed lint
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Oct 20, 2021
1 parent c043255 commit e4a125d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
11 changes: 6 additions & 5 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,18 +610,19 @@ def _(
def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: dict):
# Try to register all the entity in WorkflowBase including LaunchPlan, PythonTask, or subworkflow.
node_identifiers_dict = deepcopy(identifiers_dict)
try:
for node in entity.nodes:

for node in entity.nodes:
try:
node_identifiers_dict["name"] = node.flyte_entity.name
if isinstance(node.flyte_entity, WorkflowBase):
self._register_entity_if_not_exists(node.flyte_entity, identifiers_dict)
self.register(node.flyte_entity, **node_identifiers_dict)
elif isinstance(node.flyte_entity, PythonTask) or isinstance(node.flyte_entity, LaunchPlan):
self.register(node.flyte_entity, **node_identifiers_dict)
else:
raise TypeError(f"We don't support registering this kind of entity: {node.flyte_entity.name}")
except FlyteEntityAlreadyExistsException:
logging.info(f"{entity.name} already exists")
raise NotImplementedError(f"We don't support registering this kind of entity: {node.flyte_entity}")
except FlyteEntityAlreadyExistsException:
logging.info(f"{entity.name} already exists")

####################
# Execute Entities #
Expand Down
13 changes: 9 additions & 4 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,11 @@ def test_execute_python_workflow_dict_of_string_to_string(flyteclient, flyte_wor

def test_execute_python_workflow_list_of_floats(flyteclient, flyte_workflows_register, flyte_remote_env):
"""Test execution of a @workflow-decorated python function and launchplan that are already registered."""
from mock_flyte_repo.workflows.basic.list_float_wf import my_wf
from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf

# make sure the task name is the same as the name used during registration
my_wf._name = my_wf.name.replace("mock_flyte_repo.", "")
concat_list._name = concat_list.name.replace("mock_flyte_repo.", "")

remote = FlyteRemote.from_config(PROJECT, "development")
xs: typing.List[float] = [42.24, 999.1, 0.0001]
Expand Down Expand Up @@ -285,13 +286,17 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re
assert output_obj == input_obj


def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env):
from mock_flyte_repo.workflows.basic.list_float_wf import my_wf
def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register):
from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf

# make sure the task name is the same as the name used during registration
my_wf._name = my_wf.name.replace("mock_flyte_repo.", "")
concat_list._name = concat_list.name.replace("mock_flyte_repo.", "")

remote = FlyteRemote.from_config(PROJECT, "development")
version = uuid.uuid4().hex[:30] + str(int(time.time()))
xs: typing.List[float] = [42.24, 999.1, 0.0001]
execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True)
execution = remote.execute(my_wf, inputs={"xs": xs}, version=version, wait=True)
assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]"


Expand Down

0 comments on commit e4a125d

Please sign in to comment.