Skip to content

Commit

Permalink
Fix loop issue that the loop param is from task output (kubeflow#452)
Browse files Browse the repository at this point in the history
* enhance the when condition UI display

* Fix loop issue that the loop param is from task output
  • Loading branch information
fenglixa authored Feb 2, 2021
1 parent 71c6494 commit 9b89a08
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 2 deletions.
3 changes: 1 addition & 2 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies):
if pipeline_param.op_name is None:
withparam_value = '$(params.%s)' % pipeline_param.name
else:
param_name = '%s-%s' % (
sanitize_k8s_name(pipeline_param.op_name), pipeline_param.name)
param_name = sanitize_k8s_name(pipeline_param.name)
withparam_value = '$(tasks.%s.results.%s)' % (
sanitize_k8s_name(pipeline_param.op_name),
param_name)
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,20 @@ def test_withparam_global_workflow(self):
from .testdata.withparam_global import pipeline
self._test_pipeline_workflow(pipeline, 'withparam_global.yaml')

def test_loop_over_lightweight_output_workflow(self):
"""
Test compiling a loop over lightweight output in workflow.
"""
from .testdata.loop_over_lightweight_output import pipeline
self._test_pipeline_workflow(pipeline, 'loop_over_lightweight_output.yaml')

def test_withparam_output_workflow(self):
"""
Test compiling a withparam output in workflow.
"""
from .testdata.withparam_output import pipeline
self._test_pipeline_workflow(pipeline, 'withparam_output.yaml')

def test_pipelineparams_workflow(self):
"""
Test compiling a pipelineparams workflow.
Expand Down
67 changes: 67 additions & 0 deletions sdk/python/tests/compiler/testdata/loop_over_lightweight_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp
from kfp import dsl
from kfp.dsl import _for_loop


class Coder:
def __init__(self, ):
self._code_id = 0

def get_code(self, ):
self._code_id += 1
return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS)


dsl.ParallelFor._get_unique_id_code = Coder().get_code

produce_op = kfp.components.load_component_from_text('''\
name: Produce list
outputs:
- name: data_list
implementation:
container:
image: busybox
command:
- sh
- -c
- echo "[1, 2, 3]" > "$0"
- outputPath: data_list
''')

consume_op = kfp.components.load_component_from_text('''\
name: Consume data
inputs:
- name: data
implementation:
container:
image: busybox
command:
- echo
- inputValue: data
''')


@dsl.pipeline(name='Loop over lightweight output', description='Test pipeline to verify functions of par loop.')
def pipeline():
source_task = produce_op()
with dsl.ParallelFor(source_task.output) as item:
consume_op(item)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: loop-over-lightweight-output
annotations:
tekton.dev/output_artifacts: '{"produce-list": [{"key": "artifacts/$PIPELINERUN/produce-list/data_list.tgz",
"name": "produce-list-data_list", "path": "/tmp/outputs/data_list/data"}]}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"consume-data": [], "produce-list": [["data_list",
"$(results.data-list.path)"]]}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/pipeline_spec: '{"description": "Test pipeline to verify
functions of par loop.", "name": "Loop over lightweight output"}'
spec:
pipelineSpec:
tasks:
- name: produce-list
taskSpec:
steps:
- name: main
command: [sh, -c, 'echo "[1, 2, 3]" > "$0"', $(results.data-list.path)]
image: busybox
results:
- {name: data-list, description: /tmp/outputs/data_list/data}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation":
{"container": {"command": ["sh", "-c", "echo \"[1, 2, 3]\" > \"$0\"",
{"outputPath": "data_list"}], "image": "busybox"}}, "name": "Produce
list", "outputs": [{"name": "data_list"}]}'}
timeout: 0s
- runAfter: [produce-list]
name: for-loop-for-loop-00000001-1
taskRef: {apiVersion: custom.tekton.dev/v1alpha1, kind: PipelineLoop, name: for-loop-for-loop-00000001-1}
params:
- {name: produce-list-data_list-loop-item, value: $(tasks.produce-list.results.data-list)}
timeout: 0s
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
metadata: {name: for-loop-for-loop-00000001-1}
spec:
pipelineSpec:
params:
- {name: produce-list-data_list-loop-item, type: string}
tasks:
- name: consume-data
params:
- {name: produce-list-data_list-loop-item, value: $(params.produce-list-data_list-loop-item)}
taskSpec:
steps:
- name: main
command: [echo, $(inputs.params.produce-list-data_list-loop-item)]
image: busybox
params:
- {name: produce-list-data_list-loop-item, type: string}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation":
{"container": {"command": ["echo", {"inputValue": "data"}], "image":
"busybox"}}, "inputs": [{"name": "data"}], "name": "Consume data"}'}
timeout: 0s
iterateParam: produce-list-data_list-loop-item
2 changes: 2 additions & 0 deletions sdk/python/tests/compiler/testdata/withparam_output.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

[my-out-cop2 : main] do output op2, outp: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
59 changes: 59 additions & 0 deletions sdk/python/tests/compiler/testdata/withparam_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.dsl as dsl
from kfp.dsl import _for_loop


class Coder:
def __init__(self, ):
self._code_id = 0

def get_code(self, ):
self._code_id += 1
return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS)


dsl.ParallelFor._get_unique_id_code = Coder().get_code


@dsl.pipeline(name='my-pipeline')
def pipeline():
op0 = dsl.ContainerOp(
name="my-out-cop0",
image='python:alpine3.6',
command=["sh", "-c"],
arguments=['python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'],
file_outputs={'out': '/tmp/out.json'},
)

with dsl.ParallelFor(op0.output) as item:
op1 = dsl.ContainerOp(
name="my-in-cop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo do output op1 item: %s" % item],
)

op_out = dsl.ContainerOp(
name="my-out-cop2",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo do output op2, outp: %s" % op0.output],
)


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml'))
62 changes: 62 additions & 0 deletions sdk/python/tests/compiler/testdata/withparam_output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: my-pipeline
annotations:
tekton.dev/output_artifacts: '{"my-out-cop0": [{"key": "artifacts/$PIPELINERUN/my-out-cop0/out.tgz",
"name": "my-out-cop0-out", "path": "/tmp/out.json"}]}'
tekton.dev/input_artifacts: '{"my-out-cop2": [{"name": "my-out-cop0-out", "parent_task":
"my-out-cop0"}]}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"my-in-cop1": [], "my-out-cop0": [["out", "$(results.out.path)"]],
"my-out-cop2": []}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/pipeline_spec: '{"name": "my-pipeline"}'
spec:
pipelineSpec:
tasks:
- name: my-out-cop0
taskSpec:
steps:
- name: main
args: ['python -c "import json; import sys; json.dump([i for i in range(20,
31)], open(''$(results.out.path)'', ''w''))"']
command: [sh, -c]
image: python:alpine3.6
results:
- {name: out, description: /tmp/out.json}
timeout: 0s
- name: my-out-cop2
params:
- {name: my-out-cop0-out, value: $(tasks.my-out-cop0.results.out)}
taskSpec:
steps:
- name: main
args: ['echo do output op2, outp: $(inputs.params.my-out-cop0-out)']
command: [sh, -c]
image: library/bash:4.4.23
params:
- {name: my-out-cop0-out}
timeout: 0s
- runAfter: [my-out-cop0]
name: for-loop-for-loop-00000001-1
taskRef: {apiVersion: custom.tekton.dev/v1alpha1, kind: PipelineLoop, name: for-loop-for-loop-00000001-1}
params:
- {name: my-out-cop0-out-loop-item, value: $(tasks.my-out-cop0.results.out)}
timeout: 0s
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
metadata: {name: for-loop-for-loop-00000001-1}
spec:
pipelineSpec:
params:
- {name: my-out-cop0-out-loop-item, type: string}
tasks:
- name: my-in-cop1
params:
- {name: my-out-cop0-out-loop-item, value: $(params.my-out-cop0-out-loop-item)}
taskSpec:
steps:
- name: main
args: ['echo do output op1 item: $(inputs.params.my-out-cop0-out-loop-item)']
command: [sh, -c]
image: library/bash:4.4.23
params:
- {name: my-out-cop0-out-loop-item, type: string}
timeout: 0s
iterateParam: my-out-cop0-out-loop-item

0 comments on commit 9b89a08

Please sign in to comment.