diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index ab8ef4d5f8..abddac533f 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -610,8 +610,9 @@ def _( def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: dict): # Try to register all the entity in WorkflowBase including LaunchPlan, PythonTask, or subworkflow. node_identifiers_dict = deepcopy(identifiers_dict) - try: - for node in entity.nodes: + + for node in entity.nodes: + try: node_identifiers_dict["name"] = node.flyte_entity.name if isinstance(node.flyte_entity, WorkflowBase): self._register_entity_if_not_exists(node.flyte_entity, identifiers_dict) @@ -619,9 +620,9 @@ def _register_entity_if_not_exists(self, entity: WorkflowBase, identifiers_dict: elif isinstance(node.flyte_entity, PythonTask) or isinstance(node.flyte_entity, LaunchPlan): self.register(node.flyte_entity, **node_identifiers_dict) else: - raise TypeError(f"We don't support registering this kind of entity: {node.flyte_entity.name}") - except FlyteEntityAlreadyExistsException: - logging.info(f"{entity.name} already exists") + raise NotImplementedError(f"We don't support registering this kind of entity: {node.flyte_entity}") + except FlyteEntityAlreadyExistsException: + logging.info(f"{entity.name} already exists") #################### # Execute Entities # diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 7f708d8f02..cae559f70d 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -235,10 +235,11 @@ def test_execute_python_workflow_dict_of_string_to_string(flyteclient, flyte_wor def test_execute_python_workflow_list_of_floats(flyteclient, flyte_workflows_register, flyte_remote_env): """Test execution of a @workflow-decorated python function and launchplan that are already registered.""" - from mock_flyte_repo.workflows.basic.list_float_wf import my_wf + from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") + concat_list._name = concat_list.name.replace("mock_flyte_repo.", "") remote = FlyteRemote.from_config(PROJECT, "development") xs: typing.List[float] = [42.24, 999.1, 0.0001] @@ -285,13 +286,17 @@ def test_execute_joblib_workflow(flyteclient, flyte_workflows_register, flyte_re assert output_obj == input_obj -def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env): - from mock_flyte_repo.workflows.basic.list_float_wf import my_wf +def test_execute_with_default_launch_plan(flyteclient, flyte_workflows_register): + from mock_flyte_repo.workflows.basic.list_float_wf import concat_list, my_wf + # make sure the task name is the same as the name used during registration my_wf._name = my_wf.name.replace("mock_flyte_repo.", "") + concat_list._name = concat_list.name.replace("mock_flyte_repo.", "") + remote = FlyteRemote.from_config(PROJECT, "development") + version = uuid.uuid4().hex[:30] + str(int(time.time())) xs: typing.List[float] = [42.24, 999.1, 0.0001] - execution = remote.execute(my_wf, inputs={"xs": xs}, wait=True) + execution = remote.execute(my_wf, inputs={"xs": xs}, version=version, wait=True) assert execution.outputs["o0"] == "[42.24, 999.1, 0.0001]"