Skip to content

Commit

Permalink
Ensure no nodes can depend on themselves even when transcoding is used (
Browse files Browse the repository at this point in the history
#3812)

* Factor out transcoding helpers into a private module

Signed-off-by: Ivan Danov <[email protected]>

* Ensure node input/output validation doesn't allow transcoded self-loops

Signed-off-by: Ivan Danov <[email protected]>

* Updated release note to avoid github warning

Signed-off-by: Elena Khaustova <[email protected]>

---------

Signed-off-by: Ivan Danov <[email protected]>
Signed-off-by: Elena Khaustova <[email protected]>
Co-authored-by: Elena Khaustova <[email protected]>
  • Loading branch information
idanov and ElenaKhaustova authored Apr 15, 2024
1 parent 28277a6 commit 2915024
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 57 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* Changed the path of where pipeline tests generated with `kedro pipeline create` from `<project root>/src/tests/pipelines/<pipeline name>` to `<project root>/tests/pipelines/<pipeline name>`.
* Updated ``.gitignore`` to prevent pushing Mlflow local runs folder to a remote forge when using mlflow and git.
* Fixed error handling message for malformed yaml/json files in OmegaConfigLoader.
* Fixed a bug in `node`-creation allowing self-dependencies when using transcoding, that is datasets named like `name@format`.

## Breaking changes to the API
* Methods `_is_project` and `_find_kedro_project` have been moved to `kedro.utils`. We recommend not using private methods in your code, but if you do, please update your code to use the new location.
Expand Down
2 changes: 1 addition & 1 deletion kedro/framework/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from kedro.config import AbstractConfigLoader, MissingConfigException
from kedro.framework.project import settings
from kedro.io import DataCatalog
from kedro.pipeline.pipeline import _transcode_split
from kedro.pipeline._transcoding import _transcode_split


def _is_relative_path(path_string: str) -> bool:
Expand Down
38 changes: 38 additions & 0 deletions kedro/pipeline/_transcoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Tuple

TRANSCODING_SEPARATOR = "@"


def _transcode_split(element: str) -> Tuple[str, str]:
"""Split the name by the transcoding separator.
If the transcoding part is missing, empty string will be put in.
Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
split_name = element.split(TRANSCODING_SEPARATOR)

if len(split_name) > 2: # noqa: PLR2004
raise ValueError(
f"Expected maximum 1 transcoding separator, found {len(split_name) - 1} "
f"instead: '{element}'."
)
if len(split_name) == 1:
split_name.append("")

return tuple(split_name) # type: ignore


def _strip_transcoding(element: str) -> str:
"""Strip out the transcoding separator and anything that follows.
Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
return _transcode_split(element)[0]
9 changes: 3 additions & 6 deletions kedro/pipeline/modular_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
from typing import AbstractSet, Iterable

from kedro.pipeline.node import Node
from kedro.pipeline.pipeline import (
TRANSCODING_SEPARATOR,
Pipeline,
_strip_transcoding,
_transcode_split,
)
from kedro.pipeline.pipeline import Pipeline

from ._transcoding import TRANSCODING_SEPARATOR, _strip_transcoding, _transcode_split


class ModularPipelineError(Exception):
Expand Down
8 changes: 6 additions & 2 deletions kedro/pipeline/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from more_itertools import spy, unzip

from ._transcoding import _strip_transcoding


class Node:
"""``Node`` is an auxiliary class facilitating the operations required to
Expand Down Expand Up @@ -528,11 +530,13 @@ def _validate_unique_outputs(self) -> None:
)

def _validate_inputs_dif_than_outputs(self) -> None:
common_in_out = set(self.inputs).intersection(set(self.outputs))
common_in_out = set(map(_strip_transcoding, self.inputs)).intersection(
set(map(_strip_transcoding, self.outputs))
)
if common_in_out:
raise ValueError(
f"Failed to create node {self}.\n"
f"A node cannot have the same inputs and outputs: "
f"A node cannot have the same inputs and outputs even if they are transcoded: "
f"{common_in_out}"
)

Expand Down
37 changes: 1 addition & 36 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,7 @@
import kedro
from kedro.pipeline.node import Node, _to_list

TRANSCODING_SEPARATOR = "@"


def _transcode_split(element: str) -> tuple[str, str]:
"""Split the name by the transcoding separator.
If the transcoding part is missing, empty string will be put in.
Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
split_name = element.split(TRANSCODING_SEPARATOR)

if len(split_name) > 2: # noqa: PLR2004
raise ValueError(
f"Expected maximum 1 transcoding separator, found {len(split_name) - 1} "
f"instead: '{element}'."
)
if len(split_name) == 1:
split_name.append("")

return tuple(split_name) # type: ignore


def _strip_transcoding(element: str) -> str:
"""Strip out the transcoding separator and anything that follows.
Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
return _transcode_split(element)[0]
from ._transcoding import _strip_transcoding


class OutputNotUniqueError(Exception):
Expand Down
10 changes: 9 additions & 1 deletion tests/pipeline/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ def input_same_as_output_node():
return biconcat, ["A", "B"], {"a": "A"}


def transcoded_input_same_as_output_node():
return identity, "A@excel", {"a": "A@csv"}


def duplicate_output_dict_node():
return identity, "A", {"a": "A", "b": "A"}

Expand All @@ -269,7 +273,11 @@ def bad_output_variable_name():
(no_input_or_output_node, r"it must have some 'inputs' or 'outputs'"),
(
input_same_as_output_node,
r"A node cannot have the same inputs and outputs: {\'A\'}",
r"A node cannot have the same inputs and outputs even if they are transcoded: {\'A\'}",
),
(
transcoded_input_same_as_output_node,
r"A node cannot have the same inputs and outputs even if they are transcoded: {\'A\'}",
),
(
duplicate_output_dict_node,
Expand Down
3 changes: 1 addition & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@

import kedro
from kedro.pipeline import node
from kedro.pipeline._transcoding import _strip_transcoding, _transcode_split
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.pipeline.pipeline import (
CircularDependencyError,
ConfirmNotUniqueError,
OutputNotUniqueError,
_strip_transcoding,
_transcode_split,
)


Expand Down
10 changes: 1 addition & 9 deletions tests/pipeline/test_pipeline_with_transcoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

import kedro
from kedro.pipeline import node
from kedro.pipeline._transcoding import _strip_transcoding
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.pipeline.pipeline import (
CircularDependencyError,
OutputNotUniqueError,
_strip_transcoding,
)


Expand Down Expand Up @@ -191,14 +191,6 @@ def test_transcoding_loop(self):
]
)

def test_transcoding_self_reference(self):
with pytest.raises(CircularDependencyError, match="node1"):
modular_pipeline(
[
node(identity, "A@pandas", "A@spark", name="node1"),
]
)


class TestComplexPipelineWithTranscoding:
"""
Expand Down

0 comments on commit 2915024

Please sign in to comment.