diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 096b57b22b9..10dfed97543 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -14,7 +14,7 @@ import re import warnings -from typing import Any, Dict, Iterable, List, TypeVar, Union, Callable, Optional, Sequence +from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar, Tuple, Union from kubernetes.client import V1Toleration, V1Affinity from kubernetes.client.models import (V1Container, V1EnvVar, V1EnvFromSource, @@ -1062,6 +1062,15 @@ def __init__(self, argument, input=None, path=None): self.path = path +def _is_legacy_output_name(output_name: str) -> Tuple[bool, str]: + normalized_output_name = re.sub('[^a-zA-Z0-9]', '-', output_name.lower()) + if normalized_output_name in [ + 'mlpipeline-ui-metadata', + 'mlpipeline-metrics', + ]: + return True, normalized_output_name + return False, output_name + class ContainerOp(BaseOp): """Represents an op implemented by a container image. @@ -1257,11 +1266,9 @@ def _decorated(*args, **kwargs): # outputs that should always be saved as artifacts # TODO: Remove when outputs are always saved as artifacts for output_name, path in dict(file_outputs).items(): - normalized_output_name = re.sub('[^a-zA-Z0-9]', '-', output_name.lower()) - if normalized_output_name in [ - 'mlpipeline-ui-metadata', 'mlpipeline-metrics' - ]: - output_artifact_paths[normalized_output_name] = path + is_legacy_name, normalized_name = _is_legacy_output_name(output_name) + if is_legacy_name: + output_artifact_paths[normalized_name] = path del file_outputs[output_name] # attributes specific to `ContainerOp` @@ -1382,6 +1389,14 @@ def _set_metadata(self, metadata, arguments: Optional[Dict[str, Any]] = None): output_filename = _components._generate_output_file_name(output.name) self.file_outputs[output.name] = output_filename + if not kfp.COMPILING_FOR_V2: + for output_name, path in dict(self.file_outputs).items(): + is_legacy_name, normalized_name = _is_legacy_output_name(output_name) + if is_legacy_name: + self.output_artifact_paths[normalized_name] = path + del self.file_outputs[output_name] + del self.outputs[output_name] + if arguments is not None: for input_name, value in arguments.items(): self.artifact_arguments[input_name] = str(value)