Skip to content

Commit

Permalink
backport PipelineTaskFinalStatus change to sdk/release-1.8 branch
Browse files Browse the repository at this point in the history
  • Loading branch information
chensun committed Mar 30, 2022
1 parent 3d2508e commit 8ccb372
Show file tree
Hide file tree
Showing 20 changed files with 779 additions and 59 deletions.
14 changes: 10 additions & 4 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from .structures import *
from ._data_passing import serialize_value, get_canonical_type_for_type_name

from kfp.v2.components.types import type_utils

_default_component_name = 'Component'


Expand Down Expand Up @@ -124,7 +126,8 @@ def load_component_from_text(text):


def load_component_from_spec(component_spec):
"""Loads component from a ComponentSpec and creates a task factory function.
"""Loads component from a ComponentSpec and creates a task factory
function.
Args:
component_spec: A ComponentSpec containing the component definition.
Expand All @@ -136,7 +139,7 @@ def load_component_from_spec(component_spec):
if component_spec is None:
raise TypeError
return _create_task_factory_from_component_spec(
component_spec=component_spec)
component_spec=component_spec)


def _fix_component_uri(uri: str) -> str:
Expand Down Expand Up @@ -416,7 +419,9 @@ def component_default_to_func_default(component_default: str,
port.type) if port.type else inspect.Parameter.empty),
default=component_default_to_func_default(port.default,
port.optional),
) for port in reordered_input_list
)
for port in reordered_input_list
if not type_utils.is_task_final_status_type(port.type)
]
factory_function_parameters = input_parameters #Outputs are no longer part of the task factory function signature. The paths are always generated by the system.

Expand Down Expand Up @@ -504,7 +509,8 @@ def expand_command_part(arg) -> Union[str, List[str], None]:
inputs_consumed_by_value[input_name] = serialized_argument
return serialized_argument
else:
if input_spec.optional:
if input_spec.optional or type_utils.is_task_final_status_type(
input_spec.type):
return None
else:
raise ValueError(
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/kfp/dsl/_component_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,15 @@ def _resolve_ir_placeholders_v2(
if isinstance(arg, _structures.InputValuePlaceholder):
input_name = arg.input_name
input_value = arguments.get(input_name, None)
if input_value is not None:
input_spec = inputs_dict[input_name]
if input_value is not None or type_utils.is_task_final_status_type(
input_spec.type):
return _input_parameter_placeholder(input_name)
elif input_spec.optional:
return None
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
return None
else:
raise ValueError(
'No value provided for input {}'.format(input_name))
raise ValueError(
'No value provided for input {}'.format(input_name))

elif isinstance(arg, _structures.InputUriPlaceholder):
input_name = arg.input_name
Expand Down
13 changes: 11 additions & 2 deletions sdk/python/kfp/dsl/_ops_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union

from typing import Optional, Union
import uuid

from kfp.dsl import _for_loop, _pipeline_param
Expand Down Expand Up @@ -144,8 +145,16 @@ class ExitHandler(OpsGroup):
op2 = ContainerOp(...)
"""

def __init__(self, exit_op: _container_op.ContainerOp):
def __init__(
self,
exit_op: _container_op.ContainerOp,
name: Optional[str] = None,
):
super(ExitHandler, self).__init__('exit_handler')

# Use user specified name as display name directly, instead of passing
# it to the super class constructor.
self.display_name = name
if exit_op.dependent_names:
raise ValueError('exit_op cannot depend on any other ops.')

Expand Down
8 changes: 8 additions & 0 deletions sdk/python/kfp/dsl/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,17 @@ def build_component_spec_from_structure(
result.executor_label = executor_label

for input_spec in component_spec.inputs or []:

# Special handling for PipelineTaskFinalStatus first.
if type_utils.is_task_final_status_type(input_spec.type):
result.input_definitions.parameters[
input_spec.name].type = pipeline_spec_pb2.PrimitiveType.STRING
continue

# skip inputs not present
if input_spec.name not in actual_inputs:
continue

if type_utils.is_parameter_type(input_spec.type):
result.input_definitions.parameters[
input_spec.name].type = type_utils.get_parameter_type(
Expand Down
16 changes: 15 additions & 1 deletion sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,13 @@ def _group_to_dag_spec(
deployment_config.executors[
importer_exec_label].importer.CopyFrom(
subgroup.importer_spec)
else:
for input_spec in subgroup._metadata.inputs or []:
if type_utils.is_task_final_status_type(
input_spec.type):
raise ValueError(
'PipelineTaskFinalStatus can only be used in an exit task.'
)

# Task level caching option.
subgroup.task_spec.caching_options.enable_cache = subgroup.enable_caching
Expand Down Expand Up @@ -1045,7 +1052,14 @@ def _create_pipeline_spec(
task_name = exit_handler_op.name
display_name = exit_handler_op.display_name

exit_handler_op.task_spec.task_info.name = display_name or task_name
exit_handler_op.task_spec.task_info.name = (
display_name or task_name)
for input_spec in exit_handler_op._metadata.inputs or []:
if type_utils.is_task_final_status_type(input_spec.type):
exit_handler_op.task_spec.inputs.parameters[
input_spec.name].task_final_status.producer_task = (
first_group.name)

exit_handler_op.task_spec.dependent_tasks.extend(
pipeline_spec.root.dag.tasks.keys())
exit_handler_op.task_spec.trigger_policy.strategy = (
Expand Down
41 changes: 41 additions & 0 deletions sdk/python/kfp/v2/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from kfp import components
from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.dsl import PipelineTaskFinalStatus
from kfp.dsl import types

VALID_PRODUCER_COMPONENT_SAMPLE = components.load_component_from_text("""
Expand Down Expand Up @@ -451,6 +452,46 @@ def my_pipeline():
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path='result.json')

def test_use_task_final_status_in_non_exit_op(self):

@dsl.component
def print_op(status: PipelineTaskFinalStatus):
return status

@dsl.pipeline(name='test-pipeline')
def my_pipeline():
print_op()

with self.assertRaisesRegex(
ValueError,
'PipelineTaskFinalStatus can only be used in an exit task.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path='result.json')

def test_use_task_final_status_in_non_exit_op_yaml(self):

print_op = components.load_component_from_text("""
name: Print Op
inputs:
- {name: message, type: PipelineTaskFinalStatus}
implementation:
container:
image: python:3.7
command:
- echo
- {inputValue: message}
""")

@dsl.pipeline(name='test-pipeline')
def my_pipeline():
print_op()

with self.assertRaisesRegex(
ValueError,
'PipelineTaskFinalStatus can only be used in an exit task.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path='result.json')


if __name__ == '__main__':
unittest.main()
6 changes: 6 additions & 0 deletions sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ def test_experimental_v2_component(self):
def test_pipeline_with_gcpc_types(self):
self._test_compile_py_to_json('pipeline_with_gcpc_types')

def test_pipeline_with_task_final_status(self):
self._test_compile_py_to_json('pipeline_with_task_final_status')

def test_pipeline_with_task_final_status_yaml(self):
self._test_compile_py_to_json('pipeline_with_task_final_status_yaml')

def test_v2_component_with_pip_index_urls(self):
self._test_compile_py_to_json('v2_component_with_pip_index_urls')

Expand Down
Loading

0 comments on commit 8ccb372

Please sign in to comment.