Skip to content

Commit

Permalink
feat(sdk): inline user defined custom tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrapCodes committed Jun 25, 2021
1 parent 37d4f33 commit 98d2f90
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 68 deletions.
7 changes: 6 additions & 1 deletion guides/advanced_user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Here, the `apiVersion`, `kind`, and `name` are mandatory fields for all custom t
"--kind", "custom_task_kind",
"--name", "custom_task_name",
"--taskSpec", {"task_spec_key": "task_spec_value"},
"--taskRef", {"task_ref_key": "task_ref_value"},
"--other_custom_task_argument_keys", custom_task_argument_values],
file_outputs={"other_custom_task_argument_keys": '/anypath'}
)
Expand All @@ -70,7 +71,11 @@ Here, the `apiVersion`, `kind`, and `name` are mandatory fields for all custom t
- **--apiVersion**: Kubernetes API Version for your custom task CRD.
- **--kind**: Kubernetes Kind for your custom task CRD.
- **--name**: Kubernetes Resource reference name for your custom task CRD.
- **--taskSpec** (optional): Kubernetes Resource Spec for your custom task CRD. The value needs to define in Python Dictionary.
- **--taskRef** (optional): Kubernetes Resource Spec for your custom task CRD. One of `--taskSpec` or `--taskRef` can be specified at a time.
The value should be a Python Dictionary.
- **--taskSpec** (optional): Kubernetes Resource Spec for your custom task CRD. This gets inlined in the pipeline. One of `--taskSpec` or `--taskRef` can be specified at a time.
Custom task controller should support [embedded spec](https://github.com/tektoncd/pipeline/blob/main/docs/runs.md#2-specifying-the-target-custom-task-by-embedding-its-spec).
The value should be a Python Dictionary.
- **Other arguments** (optional): Parameters for your custom task CRD inputs.

Then, you can add any extra arguments to map as the custom task CRD's `spec.params` fields.
Expand Down
46 changes: 32 additions & 14 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ def is_custom_task_output(operand) -> bool:
for index, item in enumerate(container_args):
if item.startswith('--'):
custom_task_args[item[2:]] = container_args[index + 1]
non_param_keys = ['name', 'apiVersion', 'kind', 'taskSpec']
non_param_keys = ['name', 'apiVersion', 'kind', 'taskSpec', 'taskRef']
task_params = []
for key, value in custom_task_args.items():
if key not in non_param_keys:
Expand All @@ -755,26 +755,44 @@ def is_custom_task_output(operand) -> bool:
'kind': custom_task_args['kind']
}
}
if custom_task_args.get('taskRef', ''):
try:
custom_task_cr = {
'apiVersion': custom_task_args['apiVersion'],
'kind': custom_task_args['kind'],
'metadata': {
'name': custom_task_args['name']
},
'spec': ast.literal_eval(custom_task_args['taskRef'])
}
for existing_cr in self.custom_task_crs:
if existing_cr == custom_task_cr:
# Skip duplicated CR resource
custom_task_cr = {}
break
if custom_task_cr:
self.custom_task_crs.append(custom_task_cr)
except ValueError:
raise("Custom task spec %s is not a valid Python Dictionary" % custom_task_args['taskSpec'])
# Only one of --taskRef and --taskSpec allowed.
if custom_task_args.get('taskRef', '') and custom_task_args.get('taskSpec', ''):
raise ("Custom task invalid configuration %s, Only one of --taskRef and --taskSpec allowed." % custom_task_args)
# Setting --taskRef flag indicates, that spec be inlined.
if custom_task_args.get('taskSpec', ''):
try:
if custom_task_args['taskSpec']:
custom_task_cr = {
task_ref = {
'name': template['metadata']['name'],
'params': task_params,
# For processing Tekton parameter mapping later on.
'orig_params': task_ref['params'],
'taskSpec': {
'apiVersion': custom_task_args['apiVersion'],
'kind': custom_task_args['kind'],
'metadata': {
'name': custom_task_args['name']
},
'spec': ast.literal_eval(custom_task_args['taskSpec'])
}
for existing_cr in self.custom_task_crs:
if existing_cr == custom_task_cr:
# Skip duplicated CR resource
custom_task_cr = {}
break
if custom_task_cr:
self.custom_task_crs.append(custom_task_cr)
}
except ValueError:
raise("Custom task spec %s is not a valid Python Dictionary" % custom_task_args['taskSpec'])
raise ("Custom task ref %s is not valid." % custom_task_args['taskRef'])
# Pop custom task artifacts since we have no control of how
# custom task controller is handling the container/task execution.
self.artifact_items.pop(template['metadata']['name'], None)
Expand Down
56 changes: 33 additions & 23 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,14 @@ def test_custom_task_spec_workflow(self):
Test Tekton custom task with custom spec workflow.
"""
from .testdata.custom_task_spec import custom_task_pipeline
self._test_pipeline_workflow(custom_task_pipeline, 'custom_task_spec.yaml')
self._test_pipeline_workflow(custom_task_pipeline, 'custom_task_spec.yaml', skip_noninlined=True)

def test_custom_task_ref_workflow(self):
"""
Test Tekton custom task with custom ref workflow.
"""
from .testdata.custom_task_ref import custom_task_pipeline
self._test_pipeline_workflow(custom_task_pipeline, 'custom_task_ref.yaml', skip_noninlined=True)

def test_long_param_name_workflow(self):
"""
Expand Down Expand Up @@ -484,7 +491,7 @@ def test_tekton_pipeline_conf(self):
Test applying Tekton pipeline config to a workflow
"""
from .testdata.tekton_pipeline_conf import echo_pipeline
pipeline_conf = compiler.pipeline_utils.TektonPipelineConf()
pipeline_conf = TektonPipelineConf()
pipeline_conf.add_pipeline_label('test', 'label')
pipeline_conf.add_pipeline_label('test2', 'label2')
pipeline_conf.add_pipeline_annotation('test', 'annotation')
Expand Down Expand Up @@ -531,27 +538,30 @@ def _test_pipeline_workflow(self,
pipeline_function,
pipeline_yaml,
normalize_compiler_output_function=None,
tekton_pipeline_conf=TektonPipelineConf()):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml.replace(".yaml", "") + "_noninlined.yaml")
temp_dir = tempfile.mkdtemp()
compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml')
tekton_pipeline_conf.set_tekton_inline_spec(False)
try:
compiler.TektonCompiler().compile(pipeline_function,
compiled_yaml_file,
tekton_pipeline_conf=tekton_pipeline_conf)
with open(compiled_yaml_file, 'r') as f:
f = normalize_compiler_output_function(
f.read()) if normalize_compiler_output_function else f
compiled = yaml.safe_load(f)
self._verify_compiled_workflow(golden_yaml_file, compiled)
finally:
shutil.rmtree(temp_dir)
self._test_pipeline_workflow_inlined_spec(pipeline_function=pipeline_function,
pipeline_yaml=pipeline_yaml,
normalize_compiler_output_function=normalize_compiler_output_function,
tekton_pipeline_conf=tekton_pipeline_conf)
tekton_pipeline_conf=TektonPipelineConf(),
skip_noninlined=False):
self._test_pipeline_workflow_inlined_spec(
pipeline_function=pipeline_function,
pipeline_yaml=pipeline_yaml,
normalize_compiler_output_function=normalize_compiler_output_function,
tekton_pipeline_conf=tekton_pipeline_conf)
if not skip_noninlined:
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml.replace(".yaml", "") + "_noninlined.yaml")
temp_dir = tempfile.mkdtemp()
compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml')
tekton_pipeline_conf.set_tekton_inline_spec(False)
try:
compiler.TektonCompiler().compile(pipeline_function,
compiled_yaml_file,
tekton_pipeline_conf=tekton_pipeline_conf)
with open(compiled_yaml_file, 'r') as f:
f = normalize_compiler_output_function(
f.read()) if normalize_compiler_output_function else f
compiled = yaml.safe_load(f)
self._verify_compiled_workflow(golden_yaml_file, compiled)
finally:
shutil.rmtree(temp_dir)

def _test_workflow_without_decorator(self, pipeline_yaml, params_dict):
"""
Expand Down
51 changes: 51 additions & 0 deletions sdk/python/tests/compiler/testdata/custom_task_ref.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl

MY_CUSTOM_TASK_IMAGE_NAME = "veryunique/image:latest"
from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES
TEKTON_CUSTOM_TASK_IMAGES = TEKTON_CUSTOM_TASK_IMAGES.append(MY_CUSTOM_TASK_IMAGE_NAME)


def getCustomOp():
CustomOp = dsl.ContainerOp(
name="any-name",
image=MY_CUSTOM_TASK_IMAGE_NAME,
command=["any", "command"],
arguments=["--apiVersion", "custom_task_api_version",
"--kind", "custom_task_kind",
"--name", "custom_task_name",
"--taskRef", {"raw": "raw"},
"--other_custom_task_argument_keys", "args"],
file_outputs={"other_custom_task_argument_keys": '/anypath'}
)
# Annotation to tell the Argo controller that this CustomOp is for specific Tekton runtime only.
CustomOp.add_pod_annotation("valid_container", "false")
return CustomOp


@dsl.pipeline(
name='Tekton custom task on Kubeflow Pipeline',
description='Shows how to use Tekton custom task with custom spec on KFP'
)
def custom_task_pipeline():
test = getCustomOp()
test2 = getCustomOp().after(test)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(custom_task_pipeline, __file__.replace('.py', '.yaml'))

34 changes: 24 additions & 10 deletions sdk/python/tests/compiler/testdata/custom_task_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,35 @@ spec:
pipelineSpec:
tasks:
- name: any-name
params:
- name: other_custom_task_argument_keys
value: args
taskRef:
taskSpec:
apiVersion: custom_task_api_version
kind: custom_task_kind
name: custom_task_name
metadata:
annotations:
tekton.dev/template: ''
valid_container: 'false'
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
spec:
raw: raw
timeout: 0s
- name: any-name-2
params:
- name: other_custom_task_argument_keys
value: args
runAfter:
- any-name
taskRef:
taskSpec:
apiVersion: custom_task_api_version
kind: custom_task_kind
name: custom_task_name
metadata:
annotations:
tekton.dev/template: ''
valid_container: 'false'
labels:
pipelines.kubeflow.org/cache_enabled: 'true'
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/pipelinename: ''
spec:
raw: raw
timeout: 0s
timeout: 0s

This file was deleted.

0 comments on commit 98d2f90

Please sign in to comment.