Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): backport PipelineTaskFinalStatus change to sdk/release-1.8 branch #7483

Merged
merged 2 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from .structures import *
from ._data_passing import serialize_value, get_canonical_type_for_type_name

from kfp.v2.components.types import type_utils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update copyright to 2018-2022, I think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applies to multiple files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding current year is optional per https://opensource.google/documentation/reference/copyright
I'll skip that for this branch. :)


_default_component_name = 'Component'


Expand Down Expand Up @@ -124,7 +126,8 @@ def load_component_from_text(text):


def load_component_from_spec(component_spec):
"""Loads component from a ComponentSpec and creates a task factory function.
"""Loads component from a ComponentSpec and creates a task factory
function.

Args:
component_spec: A ComponentSpec containing the component definition.
Expand All @@ -136,7 +139,7 @@ def load_component_from_spec(component_spec):
if component_spec is None:
raise TypeError
return _create_task_factory_from_component_spec(
component_spec=component_spec)
component_spec=component_spec)


def _fix_component_uri(uri: str) -> str:
Expand Down Expand Up @@ -416,7 +419,9 @@ def component_default_to_func_default(component_default: str,
port.type) if port.type else inspect.Parameter.empty),
default=component_default_to_func_default(port.default,
port.optional),
) for port in reordered_input_list
)
for port in reordered_input_list
if not type_utils.is_task_final_status_type(port.type)
]
factory_function_parameters = input_parameters #Outputs are no longer part of the task factory function signature. The paths are always generated by the system.

Expand Down Expand Up @@ -504,7 +509,8 @@ def expand_command_part(arg) -> Union[str, List[str], None]:
inputs_consumed_by_value[input_name] = serialized_argument
return serialized_argument
else:
if input_spec.optional:
if input_spec.optional or type_utils.is_task_final_status_type(
input_spec.type):
return None
else:
raise ValueError(
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/kfp/dsl/_component_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,15 @@ def _resolve_ir_placeholders_v2(
if isinstance(arg, _structures.InputValuePlaceholder):
input_name = arg.input_name
input_value = arguments.get(input_name, None)
if input_value is not None:
input_spec = inputs_dict[input_name]
if input_value is not None or type_utils.is_task_final_status_type(
input_spec.type):
return _input_parameter_placeholder(input_name)
elif input_spec.optional:
return None
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
return None
else:
raise ValueError(
'No value provided for input {}'.format(input_name))
raise ValueError(
'No value provided for input {}'.format(input_name))

elif isinstance(arg, _structures.InputUriPlaceholder):
input_name = arg.input_name
Expand Down
13 changes: 11 additions & 2 deletions sdk/python/kfp/dsl/_ops_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union

from typing import Optional, Union
import uuid

from kfp.dsl import _for_loop, _pipeline_param
Expand Down Expand Up @@ -144,8 +145,16 @@ class ExitHandler(OpsGroup):
op2 = ContainerOp(...)
"""

def __init__(self, exit_op: _container_op.ContainerOp):
def __init__(
self,
exit_op: _container_op.ContainerOp,
name: Optional[str] = None,
):
super(ExitHandler, self).__init__('exit_handler')

# Use user specified name as display name directly, instead of passing
# it to the super class constructor.
self.display_name = name
if exit_op.dependent_names:
raise ValueError('exit_op cannot depend on any other ops.')

Expand Down
8 changes: 8 additions & 0 deletions sdk/python/kfp/dsl/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,17 @@ def build_component_spec_from_structure(
result.executor_label = executor_label

for input_spec in component_spec.inputs or []:

# Special handling for PipelineTaskFinalStatus first.
if type_utils.is_task_final_status_type(input_spec.type):
result.input_definitions.parameters[
input_spec.name].type = pipeline_spec_pb2.PrimitiveType.STRING
continue

# skip inputs not present
if input_spec.name not in actual_inputs:
continue

if type_utils.is_parameter_type(input_spec.type):
result.input_definitions.parameters[
input_spec.name].type = type_utils.get_parameter_type(
Expand Down
16 changes: 15 additions & 1 deletion sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,13 @@ def _group_to_dag_spec(
deployment_config.executors[
importer_exec_label].importer.CopyFrom(
subgroup.importer_spec)
else:
for input_spec in subgroup._metadata.inputs or []:
if type_utils.is_task_final_status_type(
input_spec.type):
raise ValueError(
'PipelineTaskFinalStatus can only be used in an exit task.'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit / personal preference: consider using PipelineTaskFinalStatus.__name__ so that the string name will remain up-to-date if the dataclass name ever changes.

)

# Task level caching option.
subgroup.task_spec.caching_options.enable_cache = subgroup.enable_caching
Expand Down Expand Up @@ -1045,7 +1052,14 @@ def _create_pipeline_spec(
task_name = exit_handler_op.name
display_name = exit_handler_op.display_name

exit_handler_op.task_spec.task_info.name = display_name or task_name
exit_handler_op.task_spec.task_info.name = (
display_name or task_name)
for input_spec in exit_handler_op._metadata.inputs or []:
if type_utils.is_task_final_status_type(input_spec.type):
exit_handler_op.task_spec.inputs.parameters[
input_spec.name].task_final_status.producer_task = (
first_group.name)

exit_handler_op.task_spec.dependent_tasks.extend(
pipeline_spec.root.dag.tasks.keys())
exit_handler_op.task_spec.trigger_policy.strategy = (
Expand Down
41 changes: 41 additions & 0 deletions sdk/python/kfp/v2/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from kfp import components
from kfp.v2 import compiler
from kfp.v2 import dsl
from kfp.v2.dsl import PipelineTaskFinalStatus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(duplicate) see other import path comments

from kfp.dsl import types

VALID_PRODUCER_COMPONENT_SAMPLE = components.load_component_from_text("""
Expand Down Expand Up @@ -451,6 +452,46 @@ def my_pipeline():
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path='result.json')

def test_use_task_final_status_in_non_exit_op(self):

@dsl.component
def print_op(status: PipelineTaskFinalStatus):
return status

@dsl.pipeline(name='test-pipeline')
def my_pipeline():
print_op()

with self.assertRaisesRegex(
ValueError,
'PipelineTaskFinalStatus can only be used in an exit task.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path='result.json')

def test_use_task_final_status_in_non_exit_op_yaml(self):

print_op = components.load_component_from_text("""
name: Print Op
inputs:
- {name: message, type: PipelineTaskFinalStatus}
implementation:
container:
image: python:3.7
command:
- echo
- {inputValue: message}
""")

@dsl.pipeline(name='test-pipeline')
def my_pipeline():
print_op()

with self.assertRaisesRegex(
ValueError,
'PipelineTaskFinalStatus can only be used in an exit task.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path='result.json')


if __name__ == '__main__':
unittest.main()
6 changes: 6 additions & 0 deletions sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ def test_experimental_v2_component(self):
def test_pipeline_with_gcpc_types(self):
self._test_compile_py_to_json('pipeline_with_gcpc_types')

def test_pipeline_with_task_final_status(self):
self._test_compile_py_to_json('pipeline_with_task_final_status')

def test_pipeline_with_task_final_status_yaml(self):
self._test_compile_py_to_json('pipeline_with_task_final_status_yaml')

def test_v2_component_with_pip_index_urls(self):
self._test_compile_py_to_json('v2_component_with_pip_index_urls')

Expand Down
Loading