Skip to content

Commit

Permalink
feat(sdk): add executor output path and executor input message placeh…
Browse files Browse the repository at this point in the history
…olders (kubeflow#10240)

* add support for '{{$.outputs.output_file}}' placeholder

* dedupe executor input code
  • Loading branch information
connor-mccarthy authored Nov 20, 2023
1 parent 0d7913c commit d3323c0
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 21 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Current Version (in development)

## Features
* Add support for `dsl.PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER` and `dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER` [\#10240](https://github.com/kubeflow/pipelines/pull/10240)

## Breaking changes

Expand Down
29 changes: 29 additions & 0 deletions sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
'PIPELINE_JOB_ID_PLACEHOLDER',
'PIPELINE_TASK_NAME_PLACEHOLDER',
'PIPELINE_TASK_ID_PLACEHOLDER',
'PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER',
'PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER',
'PIPELINE_ROOT_PLACEHOLDER',
'PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER',
'PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER',
Expand Down Expand Up @@ -134,6 +136,33 @@ def my_pipeline():
)
"""

PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER = '{{$.outputs.output_file}}'
"""A placeholder used to obtain the path to the executor_output.json file within the task container.
Example:
::
@dsl.pipeline
def my_pipeline():
create_artifact_with_metadata(
metadata={'foo': 'bar'},
executor_output_destination=dsl.PIPELINE_TASK_EXECUTOR_OUTPUT_PATH_PLACEHOLDER,
)
"""

PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER = '{{$}}'
"""A placeholder used to obtain executor input message passed to the task.
Example:
::
@dsl.pipeline
def my_pipeline():
custom_container_op(
executor_input=dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER,
)
"""

PIPELINE_ROOT_PLACEHOLDER = '{{$.pipeline_root}}'
"""A placeholder used to obtain the pipeline root.
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/kfp/dsl/component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import docstring_parser
import kfp
from kfp import dsl
from kfp.dsl import container_component_artifact_channel
from kfp.dsl import container_component_class
from kfp.dsl import graph_component
Expand Down Expand Up @@ -479,7 +480,7 @@ def _get_command_and_args_for_lightweight_component(

args = [
'--executor_input',
placeholders.ExecutorInputPlaceholder(),
dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER,
'--function_to_execute',
func.__name__,
]
Expand All @@ -497,7 +498,7 @@ def _get_command_and_args_for_containerized_component(

args = [
'--executor_input',
placeholders.ExecutorInputPlaceholder()._to_string(),
dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER,
'--function_to_execute',
function_name,
]
Expand Down
9 changes: 2 additions & 7 deletions sdk/python/kfp/dsl/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
from typing import Any, Dict, List, Optional, Union

from kfp import dsl
from kfp.dsl import utils
from kfp.dsl.types import type_utils

Expand All @@ -42,12 +43,6 @@ def __eq__(self, other: Any) -> bool:
self.__class__) and self.__dict__ == other.__dict__


class ExecutorInputPlaceholder(Placeholder):

def _to_string(self) -> str:
return '{{$}}'


class InputValuePlaceholder(Placeholder):

def __init__(self, input_name: str) -> None:
Expand Down Expand Up @@ -417,7 +412,7 @@ def maybe_convert_v1_yaml_placeholder_to_v2_placeholder(
])

elif first_key == 'executorInput':
return ExecutorInputPlaceholder()
return dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER

elif 'if' in arg:
if_ = arg['if']
Expand Down
8 changes: 0 additions & 8 deletions sdk/python/kfp/dsl/placeholders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@
from kfp.dsl import placeholders


class TestExecutorInputPlaceholder(parameterized.TestCase):

def test(self):
self.assertEqual(placeholders.ExecutorInputPlaceholder()._to_string(),
'{{$}}')


class TestInputValuePlaceholder(parameterized.TestCase):

def test(self):
Expand Down Expand Up @@ -489,7 +482,6 @@ def test_pass_through(self, val: Any):
val)

@parameterized.parameters([
(placeholders.ExecutorInputPlaceholder(), '{{$}}'),
(placeholders.InputValuePlaceholder('input1'),
"""{{$.inputs.parameters['input1']}}"""),
(placeholders.OutputPathPlaceholder('output1'),
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/kfp/dsl/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,7 @@ def check_placeholder_references_valid_io_name(
for arg in arg.items:
check_placeholder_references_valid_io_name(inputs_dict,
outputs_dict, arg)
elif not isinstance(
arg, placeholders.ExecutorInputPlaceholder) and not isinstance(
arg, str):
elif not isinstance(arg, str):
raise TypeError(f'Unexpected argument "{arg}" of type {type(arg)}.')


Expand Down
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/structures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
],
args=[
'--executor_input',
placeholders.ExecutorInputPlaceholder(),
dsl.PIPELINE_TASK_EXECUTOR_INPUT_PLACEHOLDER,
'--function_name',
'test_function',
])),
Expand Down

0 comments on commit d3323c0

Please sign in to comment.