Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure no nodes can depend on themselves even when transcoding is used #3812

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* Changed the path of where pipeline tests generated with `kedro pipeline create` from `<project root>/src/tests/pipelines/<pipeline name>` to `<project root>/tests/pipelines/<pipeline name>`.
* Updated ``.gitignore`` to prevent pushing Mlflow local runs folder to a remote forge when using mlflow and git.
* Fixed error handling message for malformed yaml/json files in OmegaConfigLoader.
* Fixed a bug in `node`-creation allowing self-dependencies when using transcoding, that is datasets named like `name@format`.

## Breaking changes to the API
* Methods `_is_project` and `_find_kedro_project` have been moved to `kedro.utils`. We recommend not using private methods in your code, but if you do, please update your code to use the new location.
Expand Down
2 changes: 1 addition & 1 deletion kedro/framework/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from kedro.config import AbstractConfigLoader, MissingConfigException
from kedro.framework.project import settings
from kedro.io import DataCatalog
from kedro.pipeline.pipeline import _transcode_split
from kedro.pipeline._transcoding import _transcode_split


def _is_relative_path(path_string: str) -> bool:
Expand Down
38 changes: 38 additions & 0 deletions kedro/pipeline/_transcoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Tuple

TRANSCODING_SEPARATOR = "@"


def _transcode_split(element: str) -> Tuple[str, str]:
"""Split the name by the transcoding separator.
If the transcoding part is missing, empty string will be put in.

Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
split_name = element.split(TRANSCODING_SEPARATOR)

if len(split_name) > 2: # noqa: PLR2004
raise ValueError(
f"Expected maximum 1 transcoding separator, found {len(split_name) - 1} "
f"instead: '{element}'."
)
if len(split_name) == 1:
split_name.append("")

return tuple(split_name) # type: ignore


def _strip_transcoding(element: str) -> str:
"""Strip out the transcoding separator and anything that follows.

Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
return _transcode_split(element)[0]
9 changes: 3 additions & 6 deletions kedro/pipeline/modular_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
from typing import AbstractSet, Iterable

from kedro.pipeline.node import Node
from kedro.pipeline.pipeline import (
TRANSCODING_SEPARATOR,
Pipeline,
_strip_transcoding,
_transcode_split,
)
from kedro.pipeline.pipeline import Pipeline

from ._transcoding import TRANSCODING_SEPARATOR, _strip_transcoding, _transcode_split


class ModularPipelineError(Exception):
Expand Down
8 changes: 6 additions & 2 deletions kedro/pipeline/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from more_itertools import spy, unzip

from ._transcoding import _strip_transcoding


class Node:
"""``Node`` is an auxiliary class facilitating the operations required to
Expand Down Expand Up @@ -528,11 +530,13 @@ def _validate_unique_outputs(self) -> None:
)

def _validate_inputs_dif_than_outputs(self) -> None:
common_in_out = set(self.inputs).intersection(set(self.outputs))
common_in_out = set(map(_strip_transcoding, self.inputs)).intersection(
set(map(_strip_transcoding, self.outputs))
)
if common_in_out:
raise ValueError(
f"Failed to create node {self}.\n"
f"A node cannot have the same inputs and outputs: "
f"A node cannot have the same inputs and outputs even if they are transcoded: "
f"{common_in_out}"
)

Expand Down
37 changes: 1 addition & 36 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,7 @@
import kedro
from kedro.pipeline.node import Node, _to_list

TRANSCODING_SEPARATOR = "@"


def _transcode_split(element: str) -> tuple[str, str]:
"""Split the name by the transcoding separator.
If the transcoding part is missing, empty string will be put in.

Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
split_name = element.split(TRANSCODING_SEPARATOR)

if len(split_name) > 2: # noqa: PLR2004
raise ValueError(
f"Expected maximum 1 transcoding separator, found {len(split_name) - 1} "
f"instead: '{element}'."
)
if len(split_name) == 1:
split_name.append("")

return tuple(split_name) # type: ignore


def _strip_transcoding(element: str) -> str:
"""Strip out the transcoding separator and anything that follows.

Returns:
Node input/output name before the transcoding separator, if present.
Raises:
ValueError: Raised if more than one transcoding separator
is present in the name.
"""
return _transcode_split(element)[0]
from ._transcoding import _strip_transcoding


class OutputNotUniqueError(Exception):
Expand Down
10 changes: 9 additions & 1 deletion tests/pipeline/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ def input_same_as_output_node():
return biconcat, ["A", "B"], {"a": "A"}


def transcoded_input_same_as_output_node():
return identity, "A@excel", {"a": "A@csv"}


def duplicate_output_dict_node():
return identity, "A", {"a": "A", "b": "A"}

Expand All @@ -269,7 +273,11 @@ def bad_output_variable_name():
(no_input_or_output_node, r"it must have some 'inputs' or 'outputs'"),
(
input_same_as_output_node,
r"A node cannot have the same inputs and outputs: {\'A\'}",
r"A node cannot have the same inputs and outputs even if they are transcoded: {\'A\'}",
),
(
transcoded_input_same_as_output_node,
r"A node cannot have the same inputs and outputs even if they are transcoded: {\'A\'}",
),
(
duplicate_output_dict_node,
Expand Down
3 changes: 1 addition & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@

import kedro
from kedro.pipeline import node
from kedro.pipeline._transcoding import _strip_transcoding, _transcode_split
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.pipeline.pipeline import (
CircularDependencyError,
ConfirmNotUniqueError,
OutputNotUniqueError,
_strip_transcoding,
_transcode_split,
)


Expand Down
10 changes: 1 addition & 9 deletions tests/pipeline/test_pipeline_with_transcoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

import kedro
from kedro.pipeline import node
from kedro.pipeline._transcoding import _strip_transcoding
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.pipeline.pipeline import (
CircularDependencyError,
OutputNotUniqueError,
_strip_transcoding,
)


Expand Down Expand Up @@ -191,14 +191,6 @@ def test_transcoding_loop(self):
]
)

def test_transcoding_self_reference(self):
with pytest.raises(CircularDependencyError, match="node1"):
modular_pipeline(
[
node(identity, "A@pandas", "A@spark", name="node1"),
]
)


class TestComplexPipelineWithTranscoding:
"""
Expand Down