diff --git a/RELEASE.md b/RELEASE.md index be53347946..fe4bc45e46 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -18,6 +18,7 @@ * Changed the path of where pipeline tests generated with `kedro pipeline create` from `/src/tests/pipelines/` to `/tests/pipelines/`. * Updated ``.gitignore`` to prevent pushing Mlflow local runs folder to a remote forge when using mlflow and git. * Fixed error handling message for malformed yaml/json files in OmegaConfigLoader. +* Fixed a bug in `node`-creation allowing self-dependencies when using transcoding, that is datasets named like `name@format`. ## Breaking changes to the API * Methods `_is_project` and `_find_kedro_project` have been moved to `kedro.utils`. We recommend not using private methods in your code, but if you do, please update your code to use the new location. diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index b82a979ffa..6189ba9869 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -15,7 +15,7 @@ from kedro.config import AbstractConfigLoader, MissingConfigException from kedro.framework.project import settings from kedro.io import DataCatalog -from kedro.pipeline.pipeline import _transcode_split +from kedro.pipeline._transcoding import _transcode_split def _is_relative_path(path_string: str) -> bool: diff --git a/kedro/pipeline/_transcoding.py b/kedro/pipeline/_transcoding.py new file mode 100644 index 0000000000..71f0dac342 --- /dev/null +++ b/kedro/pipeline/_transcoding.py @@ -0,0 +1,38 @@ +from typing import Tuple + +TRANSCODING_SEPARATOR = "@" + + +def _transcode_split(element: str) -> Tuple[str, str]: + """Split the name by the transcoding separator. + If the transcoding part is missing, empty string will be put in. + + Returns: + Node input/output name before the transcoding separator, if present. + Raises: + ValueError: Raised if more than one transcoding separator + is present in the name. + """ + split_name = element.split(TRANSCODING_SEPARATOR) + + if len(split_name) > 2: # noqa: PLR2004 + raise ValueError( + f"Expected maximum 1 transcoding separator, found {len(split_name) - 1} " + f"instead: '{element}'." + ) + if len(split_name) == 1: + split_name.append("") + + return tuple(split_name) # type: ignore + + +def _strip_transcoding(element: str) -> str: + """Strip out the transcoding separator and anything that follows. + + Returns: + Node input/output name before the transcoding separator, if present. + Raises: + ValueError: Raised if more than one transcoding separator + is present in the name. + """ + return _transcode_split(element)[0] diff --git a/kedro/pipeline/modular_pipeline.py b/kedro/pipeline/modular_pipeline.py index 9ac2d6a1d1..c600ca877c 100644 --- a/kedro/pipeline/modular_pipeline.py +++ b/kedro/pipeline/modular_pipeline.py @@ -6,12 +6,9 @@ from typing import AbstractSet, Iterable from kedro.pipeline.node import Node -from kedro.pipeline.pipeline import ( - TRANSCODING_SEPARATOR, - Pipeline, - _strip_transcoding, - _transcode_split, -) +from kedro.pipeline.pipeline import Pipeline + +from ._transcoding import TRANSCODING_SEPARATOR, _strip_transcoding, _transcode_split class ModularPipelineError(Exception): diff --git a/kedro/pipeline/node.py b/kedro/pipeline/node.py index ba0d43cddf..4ef4ff6574 100644 --- a/kedro/pipeline/node.py +++ b/kedro/pipeline/node.py @@ -13,6 +13,8 @@ from more_itertools import spy, unzip +from ._transcoding import _strip_transcoding + class Node: """``Node`` is an auxiliary class facilitating the operations required to @@ -528,11 +530,13 @@ def _validate_unique_outputs(self) -> None: ) def _validate_inputs_dif_than_outputs(self) -> None: - common_in_out = set(self.inputs).intersection(set(self.outputs)) + common_in_out = set(map(_strip_transcoding, self.inputs)).intersection( + set(map(_strip_transcoding, self.outputs)) + ) if common_in_out: raise ValueError( f"Failed to create node {self}.\n" - f"A node cannot have the same inputs and outputs: " + f"A node cannot have the same inputs and outputs even if they are transcoded: " f"{common_in_out}" ) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 5a0d63b4cf..56029bbe35 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -15,42 +15,7 @@ import kedro from kedro.pipeline.node import Node, _to_list -TRANSCODING_SEPARATOR = "@" - - -def _transcode_split(element: str) -> tuple[str, str]: - """Split the name by the transcoding separator. - If the transcoding part is missing, empty string will be put in. - - Returns: - Node input/output name before the transcoding separator, if present. - Raises: - ValueError: Raised if more than one transcoding separator - is present in the name. - """ - split_name = element.split(TRANSCODING_SEPARATOR) - - if len(split_name) > 2: # noqa: PLR2004 - raise ValueError( - f"Expected maximum 1 transcoding separator, found {len(split_name) - 1} " - f"instead: '{element}'." - ) - if len(split_name) == 1: - split_name.append("") - - return tuple(split_name) # type: ignore - - -def _strip_transcoding(element: str) -> str: - """Strip out the transcoding separator and anything that follows. - - Returns: - Node input/output name before the transcoding separator, if present. - Raises: - ValueError: Raised if more than one transcoding separator - is present in the name. - """ - return _transcode_split(element)[0] +from ._transcoding import _strip_transcoding class OutputNotUniqueError(Exception): diff --git a/tests/pipeline/test_node.py b/tests/pipeline/test_node.py index 3dcf3db790..8798faa273 100644 --- a/tests/pipeline/test_node.py +++ b/tests/pipeline/test_node.py @@ -244,6 +244,10 @@ def input_same_as_output_node(): return biconcat, ["A", "B"], {"a": "A"} +def transcoded_input_same_as_output_node(): + return identity, "A@excel", {"a": "A@csv"} + + def duplicate_output_dict_node(): return identity, "A", {"a": "A", "b": "A"} @@ -269,7 +273,11 @@ def bad_output_variable_name(): (no_input_or_output_node, r"it must have some 'inputs' or 'outputs'"), ( input_same_as_output_node, - r"A node cannot have the same inputs and outputs: {\'A\'}", + r"A node cannot have the same inputs and outputs even if they are transcoded: {\'A\'}", + ), + ( + transcoded_input_same_as_output_node, + r"A node cannot have the same inputs and outputs even if they are transcoded: {\'A\'}", ), ( duplicate_output_dict_node, diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 8b727d57af..47f382ed26 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -5,13 +5,12 @@ import kedro from kedro.pipeline import node +from kedro.pipeline._transcoding import _strip_transcoding, _transcode_split from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.pipeline.pipeline import ( CircularDependencyError, ConfirmNotUniqueError, OutputNotUniqueError, - _strip_transcoding, - _transcode_split, ) diff --git a/tests/pipeline/test_pipeline_with_transcoding.py b/tests/pipeline/test_pipeline_with_transcoding.py index 7e4ad04056..59ccafcd52 100644 --- a/tests/pipeline/test_pipeline_with_transcoding.py +++ b/tests/pipeline/test_pipeline_with_transcoding.py @@ -4,11 +4,11 @@ import kedro from kedro.pipeline import node +from kedro.pipeline._transcoding import _strip_transcoding from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.pipeline.pipeline import ( CircularDependencyError, OutputNotUniqueError, - _strip_transcoding, ) @@ -191,14 +191,6 @@ def test_transcoding_loop(self): ] ) - def test_transcoding_self_reference(self): - with pytest.raises(CircularDependencyError, match="node1"): - modular_pipeline( - [ - node(identity, "A@pandas", "A@spark", name="node1"), - ] - ) - class TestComplexPipelineWithTranscoding: """