Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk): Relax the requirement that component inputs/outputs must appear on the command line. #6268

Merged
merged 10 commits into from
Aug 10, 2021
31 changes: 1 addition & 30 deletions sdk/python/kfp/components/_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,35 +529,6 @@ def _func_to_component_spec_v2(
from kfp.components._structures import ExecutorInputPlaceholder
component_spec = _extract_component_interface(func)

component_inputs = component_spec.inputs or []
component_outputs = component_spec.outputs or []

outputs_passed_using_func_parameters = [
output for output in component_outputs
if output._passing_style is not None
]
arguments = []
for input in component_inputs + outputs_passed_using_func_parameters:
flag = "--{}-output-path".format(input.name.replace("_", "-"))

if input._passing_style in [InputPath, io_types.InputAnnotation]:
arguments_for_input = [flag, InputPathPlaceholder(input.name)]
elif input._passing_style in [OutputPath, io_types.OutputAnnotation]:
arguments_for_input = [flag, OutputPathPlaceholder(input.name)]
else:
arguments_for_input = [flag, InputValuePlaceholder(input.name)]

arguments.extend(arguments_for_input)

# Add output placeholders for return values from func.
func_outputs = [
output for output in component_outputs
if output._passing_style is None
]
for output in func_outputs:
flag = "--" + output.name.replace("_", "-")
arguments.extend([flag, OutputPathPlaceholder(output.name)])

component_spec.implementation=ContainerImplementation(
container=ContainerSpec(
image=base_image,
Expand All @@ -578,7 +549,7 @@ def _func_to_component_spec_v2(
"--executor_input",
ExecutorInputPlaceholder(),
"--function_to_execute", func.__name__,
] + arguments,
]
)
)
return component_spec
Expand Down
21 changes: 20 additions & 1 deletion sdk/python/kfp/dsl/_component_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,32 @@ def _create_container_op_from_component_and_arguments(
_container_op.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = old_warn_value

component_meta = copy.copy(component_spec)
task._set_metadata(component_meta)
task._set_metadata(component_meta, original_arguments)
if component_ref:
component_ref_without_spec = copy.copy(component_ref)
component_ref_without_spec.spec = None
task._component_ref = component_ref_without_spec

task._parameter_arguments = resolved_cmd.inputs_consumed_by_value
name_to_spec_type = {}
if component_meta.inputs:
name_to_spec_type = {
input.name: input.type
for input in component_meta.inputs
}
if kfp.COMPILING_FOR_V2:
for name, spec_type in name_to_spec_type.items():
if (name in original_arguments and
type_utils.is_parameter_type(spec_type)):
task._parameter_arguments[name] = str(original_arguments[name])

for name in list(task.artifact_arguments.keys()):
if name in task._parameter_arguments:
del task.artifact_arguments[name]

for name in list(task.input_artifact_paths.keys()):
if name in task._parameter_arguments:
del task.input_artifact_paths[name]

# Previously, ContainerOp had strict requirements for the output names, so we
# had to convert all the names before passing them to the ContainerOp
Expand Down
53 changes: 43 additions & 10 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import inspect
import re
import warnings
from typing import Any, Dict, Iterable, List, TypeVar, Union, Callable, Optional, Sequence
Expand All @@ -26,7 +24,7 @@
V1Lifecycle, V1Volume)

import kfp
from kfp.components import _structures
from kfp.components import _components, _structures
from kfp.dsl import _pipeline_param
from kfp.dsl import dsl_utils
from kfp.pipeline_spec import pipeline_spec_pb2
Expand Down Expand Up @@ -396,7 +394,7 @@ def set_gpu_limit(self, gpu: Union[str, _pipeline_param.PipelineParam], vendor:
ignored in v2.
"""

if not isinstance(gpu,_pipeline_param.PipelineParam) or not isinstance(gpu,_pipeline_param.PipelineParam):
if not isinstance(gpu,_pipeline_param.PipelineParam) or not isinstance(gpu,_pipeline_param.PipelineParam):
self._validate_positive_number(gpu, 'gpu')

if self._container_spec:
Expand Down Expand Up @@ -832,6 +830,11 @@ def __init__(self,
# used to mark this op with loop arguments
self.loop_args = None

# Placeholder for inputs when adding ComponentSpec metadata to this
# ContainerOp. This holds inputs defined in ComponentSpec that have
# a corresponding PipelineParam.
self._component_spec_inputs_with_pipeline_params = []

# attributes specific to `BaseOp`
self._inputs = []
self.dependent_names = []
Expand All @@ -848,7 +851,7 @@ def inputs(self):
# called the 1st time (because there are in-place updates to `PipelineParam`
# during compilation - remove in-place updates for easier debugging?)
if not self._inputs:
self._inputs = []
self._inputs = self._component_spec_inputs_with_pipeline_params or []
# TODO replace with proper k8s obj?
for key in self.attrs_with_pipelineparams:
self._inputs += _pipeline_param.extract_pipelineparams_from_any(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like _component_spec_inputs is a list of inputs reference PipelineParam, in another word, inputs with constant values would not appear in this list. It that right?
If so, 1) maybe we can rename it to better reflect this fact. 2) I wonder if this self._inputs += _pipeline_param.extract_pipelineparams_from_any is still needed, because it looks to me you would have covered them when you construct _component_spec_inputs down below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Re: 1) I updated the names to be more specific and added a comment.

Re: 2), I can't remove that, as some params are specified on things like the container image. So this won't work I think. Also, the same code is used by things like ResourceOp, which does not have a component spec so it won't work there either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. That makes sense.
(This also reminds me the topic that component may have implicit inputs not captured in its inputs spec, and whether we should allow this to happen)

Expand Down Expand Up @@ -1280,6 +1283,12 @@ def _decorated(*args, **kwargs):
for name in file_outputs.keys()
}

self._set_single_output_attribute()

self.pvolumes = {}
self.add_pvolumes(pvolumes)

def _set_single_output_attribute(self):
# Syntactic sugar: Add task.output attribute if the component has a single
# output.
# TODO: Currently the "MLPipeline UI Metadata" output is removed from
Expand All @@ -1291,9 +1300,6 @@ def _decorated(*args, **kwargs):
else:
self.output = _MultipleOutputsError()

self.pvolumes = {}
self.add_pvolumes(pvolumes)

@property
def is_v2(self):
return self._is_v2
Expand Down Expand Up @@ -1351,17 +1357,42 @@ def immediate_value_pipeline():
"""
return self._container

def _set_metadata(self, metadata):
def _set_metadata(self, metadata, arguments: Optional[Dict[str, Any]] = None):
"""Passes the ContainerOp the metadata information and configures the right output.

Args:
metadata (ComponentSpec): component metadata
arguments: Dictionary of input arguments to the component.
"""
if not isinstance(metadata, _structures.ComponentSpec):
raise ValueError('_set_metadata is expecting ComponentSpec.')

self._metadata = metadata

if self._metadata.outputs:
declared_outputs = {
output.name: _pipeline_param.PipelineParam(
output.name, op_name=self.name)
for output in self._metadata.outputs
}
self.outputs.update(declared_outputs)

for output in self._metadata.outputs:
if output.name not in self.file_outputs:
output_filename = _components._generate_output_file_name(output.name)
self.file_outputs[output.name] = output_filename

if arguments is not None:
for input_name, value in arguments.items():
self.artifact_arguments[input_name] = str(value)
if (isinstance(value, _pipeline_param.PipelineParam)):
self._component_spec_inputs_with_pipeline_params.append(value)

if input_name not in self.input_artifact_paths:
input_artifact_path = _components._generate_input_file_name(
input_name)
self.input_artifact_paths[input_name] = input_artifact_path

if self.file_outputs:
for output in self.file_outputs.keys():
output_type = self.outputs[output].param_type
Expand All @@ -1370,6 +1401,8 @@ def _set_metadata(self, metadata):
output_type = output_meta.type
self.outputs[output].param_type = output_type

self._set_single_output_attribute()

def add_pvolumes(self, pvolumes: Dict[str, V1Volume] = None):
"""Updates the existing pvolumes dict, extends volumes and volume_mounts and redefines the pvolume attribute.

Expand Down Expand Up @@ -1411,7 +1444,7 @@ def add_node_selector_constraint(self, label_name: Union[str, _pipeline_param.P
Returns:
self return to allow chained call with other resource specification.
"""
if self.container_spec and not(isinstance(label_name, _pipeline_param.PipelineParam) or isinstance(value, _pipeline_param.PipelineParam)):
if self.container_spec and not(isinstance(label_name, _pipeline_param.PipelineParam) or isinstance(value, _pipeline_param.PipelineParam)):
accelerator_cnt = 1
if self.container_spec.resources.accelerator.count > 1:
# Reserve the number if already set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"preprocess",
"--message-output-path",
"{{$.inputs.parameters['message']}}",
"--output-dataset-one-output-path",
"{{$.outputs.artifacts['output_dataset_one'].path}}",
"--output-dataset-two-output-path",
"{{$.outputs.artifacts['output_dataset_two'].path}}",
"--output-parameter-output-path",
"{{$.outputs.parameters['output_parameter'].output_file}}",
"--output-bool-parameter-output-path",
"{{$.outputs.parameters['output_bool_parameter'].output_file}}",
"--output-dict-parameter-output-path",
"{{$.outputs.parameters['output_dict_parameter'].output_file}}",
"--output-list-parameter-output-path",
"{{$.outputs.parameters['output_list_parameter'].output_file}}"
"preprocess"
],
"command": [
"sh",
Expand All @@ -125,23 +111,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"train",
"--dataset-one-output-path",
"{{$.inputs.artifacts['dataset_one'].path}}",
"--dataset-two-output-path",
"{{$.inputs.artifacts['dataset_two'].path}}",
"--message-output-path",
"{{$.inputs.parameters['message']}}",
"--input-bool-output-path",
"{{$.inputs.parameters['input_bool']}}",
"--input-dict-output-path",
"{{$.inputs.parameters['input_dict']}}",
"--input-list-output-path",
"{{$.inputs.parameters['input_list']}}",
"--num-steps-output-path",
"{{$.inputs.parameters['num_steps']}}",
"--model-output-path",
"{{$.outputs.artifacts['model'].path}}"
"train"
],
"command": [
"sh",
Expand Down Expand Up @@ -260,4 +230,4 @@
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"add_numbers",
"--first-output-path",
"{{$.inputs.parameters['first']}}",
"--second-output-path",
"{{$.inputs.parameters['second']}}",
"--Output",
"{{$.outputs.parameters['Output'].output_file}}"
"add_numbers"
],
"command": [
"sh",
Expand All @@ -129,13 +123,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"concat_message",
"--first-output-path",
"{{$.inputs.parameters['first']}}",
"--second-output-path",
"{{$.inputs.parameters['second']}}",
"--Output",
"{{$.outputs.parameters['Output'].output_file}}"
"concat_message"
],
"command": [
"sh",
Expand All @@ -155,13 +143,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"output_artifact",
"--number-output-path",
"{{$.inputs.parameters['number']}}",
"--message-output-path",
"{{$.inputs.parameters['message']}}",
"--Output",
"{{$.outputs.artifacts['Output'].path}}"
"output_artifact"
],
"command": [
"sh",
Expand All @@ -181,15 +163,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"output_named_tuple",
"--artifact-output-path",
"{{$.inputs.artifacts['artifact'].path}}",
"--scalar",
"{{$.outputs.parameters['scalar'].output_file}}",
"--metrics",
"{{$.outputs.artifacts['metrics'].path}}",
"--model",
"{{$.outputs.artifacts['model'].path}}"
"output_named_tuple"
],
"command": [
"sh",
Expand Down Expand Up @@ -354,4 +328,4 @@
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"training_op",
"--input1-output-path",
"{{$.inputs.parameters['input1']}}"
"training_op"
],
"command": [
"sh",
Expand All @@ -62,9 +60,7 @@
"--executor_input",
"{{$}}",
"--function_to_execute",
"training_op",
"--input1-output-path",
"{{$.inputs.parameters['input1']}}"
"training_op"
],
"command": [
"sh",
Expand Down Expand Up @@ -174,4 +170,4 @@
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
}
}
}
Loading