From 98d2f90184d6d0bedb42620f3737fb60554e1ced Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Thu, 24 Jun 2021 19:55:09 +0530 Subject: [PATCH] feat(sdk): inline user defined custom tasks --- guides/advanced_user_guide.md | 7 ++- sdk/python/kfp_tekton/compiler/compiler.py | 46 ++++++++++----- sdk/python/tests/compiler/compiler_tests.py | 56 +++++++++++-------- .../compiler/testdata/custom_task_ref.py | 51 +++++++++++++++++ ...c_noninlined.yaml => custom_task_ref.yaml} | 0 .../compiler/testdata/custom_task_spec.yaml | 34 +++++++---- .../custom_task_spec_customtask_cr1.yaml | 20 ------- 7 files changed, 146 insertions(+), 68 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/custom_task_ref.py rename sdk/python/tests/compiler/testdata/{custom_task_spec_noninlined.yaml => custom_task_ref.yaml} (100%) delete mode 100644 sdk/python/tests/compiler/testdata/custom_task_spec_customtask_cr1.yaml diff --git a/guides/advanced_user_guide.md b/guides/advanced_user_guide.md index 1d422c068d..970a74ca8f 100644 --- a/guides/advanced_user_guide.md +++ b/guides/advanced_user_guide.md @@ -58,6 +58,7 @@ Here, the `apiVersion`, `kind`, and `name` are mandatory fields for all custom t "--kind", "custom_task_kind", "--name", "custom_task_name", "--taskSpec", {"task_spec_key": "task_spec_value"}, + "--taskRef", {"task_ref_key": "task_ref_value"}, "--other_custom_task_argument_keys", custom_task_argument_values], file_outputs={"other_custom_task_argument_keys": '/anypath'} ) @@ -70,7 +71,11 @@ Here, the `apiVersion`, `kind`, and `name` are mandatory fields for all custom t - **--apiVersion**: Kubernetes API Version for your custom task CRD. - **--kind**: Kubernetes Kind for your custom task CRD. - **--name**: Kubernetes Resource reference name for your custom task CRD. - - **--taskSpec** (optional): Kubernetes Resource Spec for your custom task CRD. The value needs to define in Python Dictionary. + - **--taskRef** (optional): Kubernetes Resource Spec for your custom task CRD. One of `--taskSpec` or `--taskRef` can be specified at a time. + The value should be a Python Dictionary. + - **--taskSpec** (optional): Kubernetes Resource Spec for your custom task CRD. This gets inlined in the pipeline. One of `--taskSpec` or `--taskRef` can be specified at a time. + Custom task controller should support [embedded spec](https://github.com/tektoncd/pipeline/blob/main/docs/runs.md#2-specifying-the-target-custom-task-by-embedding-its-spec). + The value should be a Python Dictionary. - **Other arguments** (optional): Parameters for your custom task CRD inputs. Then, you can add any extra arguments to map as the custom task CRD's `spec.params` fields. diff --git a/sdk/python/kfp_tekton/compiler/compiler.py b/sdk/python/kfp_tekton/compiler/compiler.py index dfd0ad6adb..c73faf5b36 100644 --- a/sdk/python/kfp_tekton/compiler/compiler.py +++ b/sdk/python/kfp_tekton/compiler/compiler.py @@ -739,7 +739,7 @@ def is_custom_task_output(operand) -> bool: for index, item in enumerate(container_args): if item.startswith('--'): custom_task_args[item[2:]] = container_args[index + 1] - non_param_keys = ['name', 'apiVersion', 'kind', 'taskSpec'] + non_param_keys = ['name', 'apiVersion', 'kind', 'taskSpec', 'taskRef'] task_params = [] for key, value in custom_task_args.items(): if key not in non_param_keys: @@ -755,26 +755,44 @@ def is_custom_task_output(operand) -> bool: 'kind': custom_task_args['kind'] } } + if custom_task_args.get('taskRef', ''): + try: + custom_task_cr = { + 'apiVersion': custom_task_args['apiVersion'], + 'kind': custom_task_args['kind'], + 'metadata': { + 'name': custom_task_args['name'] + }, + 'spec': ast.literal_eval(custom_task_args['taskRef']) + } + for existing_cr in self.custom_task_crs: + if existing_cr == custom_task_cr: + # Skip duplicated CR resource + custom_task_cr = {} + break + if custom_task_cr: + self.custom_task_crs.append(custom_task_cr) + except ValueError: + raise("Custom task spec %s is not a valid Python Dictionary" % custom_task_args['taskSpec']) + # Only one of --taskRef and --taskSpec allowed. + if custom_task_args.get('taskRef', '') and custom_task_args.get('taskSpec', ''): + raise ("Custom task invalid configuration %s, Only one of --taskRef and --taskSpec allowed." % custom_task_args) + # Setting --taskRef flag indicates, that spec be inlined. if custom_task_args.get('taskSpec', ''): try: - if custom_task_args['taskSpec']: - custom_task_cr = { + task_ref = { + 'name': template['metadata']['name'], + 'params': task_params, + # For processing Tekton parameter mapping later on. + 'orig_params': task_ref['params'], + 'taskSpec': { 'apiVersion': custom_task_args['apiVersion'], 'kind': custom_task_args['kind'], - 'metadata': { - 'name': custom_task_args['name'] - }, 'spec': ast.literal_eval(custom_task_args['taskSpec']) } - for existing_cr in self.custom_task_crs: - if existing_cr == custom_task_cr: - # Skip duplicated CR resource - custom_task_cr = {} - break - if custom_task_cr: - self.custom_task_crs.append(custom_task_cr) + } except ValueError: - raise("Custom task spec %s is not a valid Python Dictionary" % custom_task_args['taskSpec']) + raise ("Custom task ref %s is not valid." % custom_task_args['taskRef']) # Pop custom task artifacts since we have no control of how # custom task controller is handling the container/task execution. self.artifact_items.pop(template['metadata']['name'], None) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 9c4e25bf26..961cdca523 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -231,7 +231,14 @@ def test_custom_task_spec_workflow(self): Test Tekton custom task with custom spec workflow. """ from .testdata.custom_task_spec import custom_task_pipeline - self._test_pipeline_workflow(custom_task_pipeline, 'custom_task_spec.yaml') + self._test_pipeline_workflow(custom_task_pipeline, 'custom_task_spec.yaml', skip_noninlined=True) + + def test_custom_task_ref_workflow(self): + """ + Test Tekton custom task with custom ref workflow. + """ + from .testdata.custom_task_ref import custom_task_pipeline + self._test_pipeline_workflow(custom_task_pipeline, 'custom_task_ref.yaml', skip_noninlined=True) def test_long_param_name_workflow(self): """ @@ -484,7 +491,7 @@ def test_tekton_pipeline_conf(self): Test applying Tekton pipeline config to a workflow """ from .testdata.tekton_pipeline_conf import echo_pipeline - pipeline_conf = compiler.pipeline_utils.TektonPipelineConf() + pipeline_conf = TektonPipelineConf() pipeline_conf.add_pipeline_label('test', 'label') pipeline_conf.add_pipeline_label('test2', 'label2') pipeline_conf.add_pipeline_annotation('test', 'annotation') @@ -531,27 +538,30 @@ def _test_pipeline_workflow(self, pipeline_function, pipeline_yaml, normalize_compiler_output_function=None, - tekton_pipeline_conf=TektonPipelineConf()): - test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') - golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml.replace(".yaml", "") + "_noninlined.yaml") - temp_dir = tempfile.mkdtemp() - compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml') - tekton_pipeline_conf.set_tekton_inline_spec(False) - try: - compiler.TektonCompiler().compile(pipeline_function, - compiled_yaml_file, - tekton_pipeline_conf=tekton_pipeline_conf) - with open(compiled_yaml_file, 'r') as f: - f = normalize_compiler_output_function( - f.read()) if normalize_compiler_output_function else f - compiled = yaml.safe_load(f) - self._verify_compiled_workflow(golden_yaml_file, compiled) - finally: - shutil.rmtree(temp_dir) - self._test_pipeline_workflow_inlined_spec(pipeline_function=pipeline_function, - pipeline_yaml=pipeline_yaml, - normalize_compiler_output_function=normalize_compiler_output_function, - tekton_pipeline_conf=tekton_pipeline_conf) + tekton_pipeline_conf=TektonPipelineConf(), + skip_noninlined=False): + self._test_pipeline_workflow_inlined_spec( + pipeline_function=pipeline_function, + pipeline_yaml=pipeline_yaml, + normalize_compiler_output_function=normalize_compiler_output_function, + tekton_pipeline_conf=tekton_pipeline_conf) + if not skip_noninlined: + test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') + golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml.replace(".yaml", "") + "_noninlined.yaml") + temp_dir = tempfile.mkdtemp() + compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml') + tekton_pipeline_conf.set_tekton_inline_spec(False) + try: + compiler.TektonCompiler().compile(pipeline_function, + compiled_yaml_file, + tekton_pipeline_conf=tekton_pipeline_conf) + with open(compiled_yaml_file, 'r') as f: + f = normalize_compiler_output_function( + f.read()) if normalize_compiler_output_function else f + compiled = yaml.safe_load(f) + self._verify_compiled_workflow(golden_yaml_file, compiled) + finally: + shutil.rmtree(temp_dir) def _test_workflow_without_decorator(self, pipeline_yaml, params_dict): """ diff --git a/sdk/python/tests/compiler/testdata/custom_task_ref.py b/sdk/python/tests/compiler/testdata/custom_task_ref.py new file mode 100644 index 0000000000..7304d45979 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/custom_task_ref.py @@ -0,0 +1,51 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import dsl + +MY_CUSTOM_TASK_IMAGE_NAME = "veryunique/image:latest" +from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES +TEKTON_CUSTOM_TASK_IMAGES = TEKTON_CUSTOM_TASK_IMAGES.append(MY_CUSTOM_TASK_IMAGE_NAME) + + +def getCustomOp(): + CustomOp = dsl.ContainerOp( + name="any-name", + image=MY_CUSTOM_TASK_IMAGE_NAME, + command=["any", "command"], + arguments=["--apiVersion", "custom_task_api_version", + "--kind", "custom_task_kind", + "--name", "custom_task_name", + "--taskRef", {"raw": "raw"}, + "--other_custom_task_argument_keys", "args"], + file_outputs={"other_custom_task_argument_keys": '/anypath'} + ) + # Annotation to tell the Argo controller that this CustomOp is for specific Tekton runtime only. + CustomOp.add_pod_annotation("valid_container", "false") + return CustomOp + + +@dsl.pipeline( + name='Tekton custom task on Kubeflow Pipeline', + description='Shows how to use Tekton custom task with custom spec on KFP' +) +def custom_task_pipeline(): + test = getCustomOp() + test2 = getCustomOp().after(test) + + +if __name__ == '__main__': + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(custom_task_pipeline, __file__.replace('.py', '.yaml')) + diff --git a/sdk/python/tests/compiler/testdata/custom_task_spec_noninlined.yaml b/sdk/python/tests/compiler/testdata/custom_task_ref.yaml similarity index 100% rename from sdk/python/tests/compiler/testdata/custom_task_spec_noninlined.yaml rename to sdk/python/tests/compiler/testdata/custom_task_ref.yaml diff --git a/sdk/python/tests/compiler/testdata/custom_task_spec.yaml b/sdk/python/tests/compiler/testdata/custom_task_spec.yaml index 7f01c07ff1..2f0c7d350e 100644 --- a/sdk/python/tests/compiler/testdata/custom_task_spec.yaml +++ b/sdk/python/tests/compiler/testdata/custom_task_spec.yaml @@ -31,21 +31,35 @@ spec: pipelineSpec: tasks: - name: any-name - params: - - name: other_custom_task_argument_keys - value: args - taskRef: + taskSpec: apiVersion: custom_task_api_version kind: custom_task_kind - name: custom_task_name + metadata: + annotations: + tekton.dev/template: '' + valid_container: 'false' + labels: + pipelines.kubeflow.org/cache_enabled: 'true' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/pipelinename: '' + spec: + raw: raw + timeout: 0s - name: any-name-2 - params: - - name: other_custom_task_argument_keys - value: args runAfter: - any-name - taskRef: + taskSpec: apiVersion: custom_task_api_version kind: custom_task_kind - name: custom_task_name + metadata: + annotations: + tekton.dev/template: '' + valid_container: 'false' + labels: + pipelines.kubeflow.org/cache_enabled: 'true' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/pipelinename: '' + spec: + raw: raw + timeout: 0s timeout: 0s diff --git a/sdk/python/tests/compiler/testdata/custom_task_spec_customtask_cr1.yaml b/sdk/python/tests/compiler/testdata/custom_task_spec_customtask_cr1.yaml deleted file mode 100644 index f26f75df10..0000000000 --- a/sdk/python/tests/compiler/testdata/custom_task_spec_customtask_cr1.yaml +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright 2021 kubeflow.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -apiVersion: custom_task_api_version -kind: custom_task_kind -metadata: - name: custom_task_name -spec: - raw: raw