diff --git a/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py b/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py index 5bd8399258..0bdd3d8b76 100644 --- a/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py +++ b/sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py @@ -300,7 +300,8 @@ def mark_upstream_ios_of_output(template_output, marked_inputs, # Use workspaces to tasks if big data passing instead of 'results', 'copy-inputs' for task_template in container_templates: - task_template = big_data_passing_tasks(task_template, + task_template = big_data_passing_tasks(pipelinerun_name, + task_template, pipelinerun_template, inputs_consumed_as_artifacts, outputs_consumed_as_artifacts) @@ -470,8 +471,8 @@ def big_data_passing_pipelinerun(name: str, pr: dict, pw: set): return pr, prw -def big_data_passing_tasks(task: dict, pipelinerun_template: dict, - inputs_tasks: set, outputs_tasks: set) -> dict: +def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict, + inputs_tasks: set, outputs_tasks: set) -> dict: task_name = task.get('name') task_spec = task.get('taskSpec', {}) # Data passing for the task outputs @@ -524,6 +525,16 @@ def big_data_passing_tasks(task: dict, pipelinerun_template: dict, # add input artifact processes task = input_artifacts_tasks(task, task_artifact) + if (prname, task_artifact.get('name')) in inputs_tasks: + # add input artifact processes for pipeline parameter + if not task_artifact.setdefault('raw', {}): + for i in range(len(pipelinerun_template['spec']['params'])): + param_name = pipelinerun_template['spec']['params'][i]['name'] + param_value = pipelinerun_template['spec']['params'][i]['value'] + if (task_artifact.get('name') == param_name): + task_artifact['raw']['data'] = param_value + task = input_artifacts_tasks_pr_params(task, task_artifact) + # Remove artifacts parameter from params task.get("taskSpec", {})['params'] = [ param for param in task_spec.get('params', []) @@ -537,6 +548,24 @@ def big_data_passing_tasks(task: dict, pipelinerun_template: dict, return task +def input_artifacts_tasks_pr_params(template: dict, artifact: dict) -> dict: + copy_inputs_step = _get_base_step('copy-inputs') + task_name = template.get('name') + task_spec = template.get('taskSpec', {}) + task_params = task_spec.get('params', []) + for task_param in task_params: + workspaces_parameter = '$(workspaces.%s.path)/%s' % ( + task_name, task_param.get('name')) + if 'raw' in artifact: + copy_inputs_step['script'] += 'echo -n "%s" > %s\n' % ( + artifact['raw']['data'], workspaces_parameter) + + template['taskSpec']['steps'] = _prepend_steps( + [copy_inputs_step], template['taskSpec']['steps']) + + return template + + def input_artifacts_tasks(template: dict, artifact: dict) -> dict: # The input artifacts in KFP is not pulling from s3, it will always be passed as a raw input. # Visit https://github.com/kubeflow/pipelines/issues/336 for more details on the implementation. diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 72a87a7e6a..6b68fffdc6 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -126,6 +126,13 @@ def test_loop_in_recur_workflow(self): from .testdata.loop_in_recursion import flipcoin self._test_pipeline_workflow(flipcoin, 'loop_in_recursion.yaml') + def test_data_passing_pipeline_param_as_file(self): + """ + Test compiling a pipeline_param_as_file workflow. + """ + from .testdata.data_passing_pipeline_param_as_file import data_passing_pipeline + self._test_pipeline_workflow(data_passing_pipeline, 'data_passing_pipeline_param_as_file.yaml') + def test_recur_nested_workflow(self): """ Test compiling a nested recursive workflow. diff --git a/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file.py b/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file.py new file mode 100644 index 0000000000..31e46fd5cf --- /dev/null +++ b/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file.py @@ -0,0 +1,59 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp +from kfp.components import create_component_from_func, InputPath +from kfp_tekton.compiler import TektonCompiler + + +# Consume as file +@create_component_from_func +def consume_anything_as_file(data_path: InputPath()): + with open(data_path) as f: + print("consume_anything_as_file: " + f.read()) + + +@create_component_from_func +def consume_something_as_file(data_path: InputPath('Something')): + with open(data_path) as f: + print("consume_something_as_file: " + f.read()) + + +@create_component_from_func +def consume_string_as_file(data_path: InputPath(str)): + with open(data_path) as f: + print("consume_string_as_file: " + f.read()) + + +# Pipeline +@kfp.dsl.pipeline(name='data_passing_pipeline') +def data_passing_pipeline( + anything_param="anything_param", + something_param: "Something" = "something_param", + string_param: str = "string_param", +): + + # Pass pipeline parameter; consume as file + consume_anything_as_file(anything_param) + consume_anything_as_file(something_param) + consume_anything_as_file(string_param) + consume_something_as_file(anything_param) + consume_something_as_file(something_param) + consume_string_as_file(anything_param) + consume_string_as_file(string_param) + + +if __name__ == "__main__": + kfp_endpoint = None + TektonCompiler().compile(data_passing_pipeline, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file.yaml b/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file.yaml new file mode 100644 index 0000000000..954a7e2f2b --- /dev/null +++ b/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file.yaml @@ -0,0 +1,454 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: data-passing-pipeline + annotations: + tekton.dev/output_artifacts: '{}' + tekton.dev/input_artifacts: '{}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"consume-anything-as-file": [], "consume-anything-as-file-2": + [], "consume-anything-as-file-3": [], "consume-something-as-file": [], "consume-something-as-file-2": + [], "consume-string-as-file": [], "consume-string-as-file-2": []}' + sidecar.istio.io/inject: "false" + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "anything_param", + "name": "anything_param", "optional": true}, {"default": "something_param", + "name": "something_param", "optional": true, "type": "Something"}, {"default": + "string_param", "name": "string_param", "optional": true, "type": "String"}], + "name": "data_passing_pipeline"}' +spec: + params: + - name: anything_param + value: anything_param + - name: something_param + value: something_param + - name: string_param + value: string_param + pipelineSpec: + params: + - name: anything_param + default: anything_param + - name: something_param + default: something_param + - name: string_param + default: string_param + tasks: + - name: consume-anything-as-file + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "anything_param" > $(workspaces.consume-anything-as-file.path)/anything_param + - name: main + args: + - --data + - $(workspaces.consume-anything-as-file.path)/anything_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_anything_as_file(data_path): + with open(data_path) as f: + print("consume_anything_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume anything as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_anything_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_anything_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + anything as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume + anything as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-anything-as-file + timeout: 0s + workspaces: + - name: consume-anything-as-file + workspace: data-passing-pipeline + - name: consume-anything-as-file-2 + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "something_param" > $(workspaces.consume-anything-as-file-2.path)/something_param + - name: main + args: + - --data + - $(workspaces.consume-anything-as-file-2.path)/something_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_anything_as_file(data_path): + with open(data_path) as f: + print("consume_anything_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume anything as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_anything_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_anything_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + anything as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume + anything as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-anything-as-file-2 + timeout: 0s + workspaces: + - name: consume-anything-as-file-2 + workspace: data-passing-pipeline + - name: consume-anything-as-file-3 + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "string_param" > $(workspaces.consume-anything-as-file-3.path)/string_param + - name: main + args: + - --data + - $(workspaces.consume-anything-as-file-3.path)/string_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_anything_as_file(data_path): + with open(data_path) as f: + print("consume_anything_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume anything as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_anything_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_anything_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + anything as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume + anything as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-anything-as-file-3 + timeout: 0s + workspaces: + - name: consume-anything-as-file-3 + workspace: data-passing-pipeline + - name: consume-something-as-file + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "anything_param" > $(workspaces.consume-something-as-file.path)/anything_param + - name: main + args: + - --data + - $(workspaces.consume-something-as-file.path)/anything_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_something_as_file(data_path): + with open(data_path) as f: + print("consume_something_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume something as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_something_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_something_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_something_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + something as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_something_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data", "type": "Something"}], + "name": "Consume something as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-something-as-file + timeout: 0s + workspaces: + - name: consume-something-as-file + workspace: data-passing-pipeline + - name: consume-something-as-file-2 + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "something_param" > $(workspaces.consume-something-as-file-2.path)/something_param + - name: main + args: + - --data + - $(workspaces.consume-something-as-file-2.path)/something_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_something_as_file(data_path): + with open(data_path) as f: + print("consume_something_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume something as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_something_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_something_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_something_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + something as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_something_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data", "type": "Something"}], + "name": "Consume something as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-something-as-file-2 + timeout: 0s + workspaces: + - name: consume-something-as-file-2 + workspace: data-passing-pipeline + - name: consume-string-as-file + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "anything_param" > $(workspaces.consume-string-as-file.path)/anything_param + - name: main + args: + - --data + - $(workspaces.consume-string-as-file.path)/anything_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_string_as_file(data_path): + with open(data_path) as f: + print("consume_string_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume string as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_string_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_string_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_string_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + string as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_string_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data", "type": "String"}], + "name": "Consume string as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-string-as-file + timeout: 0s + workspaces: + - name: consume-string-as-file + workspace: data-passing-pipeline + - name: consume-string-as-file-2 + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "string_param" > $(workspaces.consume-string-as-file-2.path)/string_param + - name: main + args: + - --data + - $(workspaces.consume-string-as-file-2.path)/string_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_string_as_file(data_path): + with open(data_path) as f: + print("consume_string_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume string as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_string_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_string_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_string_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + string as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_string_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data", "type": "String"}], + "name": "Consume string as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-string-as-file-2 + timeout: 0s + workspaces: + - name: consume-string-as-file-2 + workspace: data-passing-pipeline + workspaces: + - name: data-passing-pipeline + timeout: 0s + workspaces: + - name: data-passing-pipeline + volumeClaimTemplate: + spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 2Gi diff --git a/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file_noninlined.yaml b/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file_noninlined.yaml new file mode 100644 index 0000000000..954a7e2f2b --- /dev/null +++ b/sdk/python/tests/compiler/testdata/data_passing_pipeline_param_as_file_noninlined.yaml @@ -0,0 +1,454 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: data-passing-pipeline + annotations: + tekton.dev/output_artifacts: '{}' + tekton.dev/input_artifacts: '{}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"consume-anything-as-file": [], "consume-anything-as-file-2": + [], "consume-anything-as-file-3": [], "consume-something-as-file": [], "consume-something-as-file-2": + [], "consume-string-as-file": [], "consume-string-as-file-2": []}' + sidecar.istio.io/inject: "false" + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "anything_param", + "name": "anything_param", "optional": true}, {"default": "something_param", + "name": "something_param", "optional": true, "type": "Something"}, {"default": + "string_param", "name": "string_param", "optional": true, "type": "String"}], + "name": "data_passing_pipeline"}' +spec: + params: + - name: anything_param + value: anything_param + - name: something_param + value: something_param + - name: string_param + value: string_param + pipelineSpec: + params: + - name: anything_param + default: anything_param + - name: something_param + default: something_param + - name: string_param + default: string_param + tasks: + - name: consume-anything-as-file + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "anything_param" > $(workspaces.consume-anything-as-file.path)/anything_param + - name: main + args: + - --data + - $(workspaces.consume-anything-as-file.path)/anything_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_anything_as_file(data_path): + with open(data_path) as f: + print("consume_anything_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume anything as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_anything_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_anything_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + anything as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume + anything as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-anything-as-file + timeout: 0s + workspaces: + - name: consume-anything-as-file + workspace: data-passing-pipeline + - name: consume-anything-as-file-2 + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "something_param" > $(workspaces.consume-anything-as-file-2.path)/something_param + - name: main + args: + - --data + - $(workspaces.consume-anything-as-file-2.path)/something_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_anything_as_file(data_path): + with open(data_path) as f: + print("consume_anything_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume anything as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_anything_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_anything_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + anything as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume + anything as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-anything-as-file-2 + timeout: 0s + workspaces: + - name: consume-anything-as-file-2 + workspace: data-passing-pipeline + - name: consume-anything-as-file-3 + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "string_param" > $(workspaces.consume-anything-as-file-3.path)/string_param + - name: main + args: + - --data + - $(workspaces.consume-anything-as-file-3.path)/string_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_anything_as_file(data_path): + with open(data_path) as f: + print("consume_anything_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume anything as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_anything_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_anything_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + anything as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume + anything as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-anything-as-file-3 + timeout: 0s + workspaces: + - name: consume-anything-as-file-3 + workspace: data-passing-pipeline + - name: consume-something-as-file + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "anything_param" > $(workspaces.consume-something-as-file.path)/anything_param + - name: main + args: + - --data + - $(workspaces.consume-something-as-file.path)/anything_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_something_as_file(data_path): + with open(data_path) as f: + print("consume_something_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume something as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_something_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_something_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_something_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + something as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_something_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data", "type": "Something"}], + "name": "Consume something as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-something-as-file + timeout: 0s + workspaces: + - name: consume-something-as-file + workspace: data-passing-pipeline + - name: consume-something-as-file-2 + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "something_param" > $(workspaces.consume-something-as-file-2.path)/something_param + - name: main + args: + - --data + - $(workspaces.consume-something-as-file-2.path)/something_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_something_as_file(data_path): + with open(data_path) as f: + print("consume_something_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume something as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_something_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_something_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_something_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + something as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_something_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data", "type": "Something"}], + "name": "Consume something as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-something-as-file-2 + timeout: 0s + workspaces: + - name: consume-something-as-file-2 + workspace: data-passing-pipeline + - name: consume-string-as-file + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "anything_param" > $(workspaces.consume-string-as-file.path)/anything_param + - name: main + args: + - --data + - $(workspaces.consume-string-as-file.path)/anything_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_string_as_file(data_path): + with open(data_path) as f: + print("consume_string_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume string as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_string_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_string_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_string_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + string as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_string_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data", "type": "String"}], + "name": "Consume string as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-string-as-file + timeout: 0s + workspaces: + - name: consume-string-as-file + workspace: data-passing-pipeline + - name: consume-string-as-file-2 + taskSpec: + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "string_param" > $(workspaces.consume-string-as-file-2.path)/string_param + - name: main + args: + - --data + - $(workspaces.consume-string-as-file-2.path)/string_param + command: + - sh + - -ec + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def consume_string_as_file(data_path): + with open(data_path) as f: + print("consume_string_as_file: " + f.read()) + + import argparse + _parser = argparse.ArgumentParser(prog='Consume string as file', description='') + _parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = consume_string_as_file(**_parsed_args) + image: python:3.7 + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec", + "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 + -u \"$program_path\" \"$@\"\n", "def consume_string_as_file(data_path):\n with + open(data_path) as f:\n print(\"consume_string_as_file: \" + + f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume + string as file'', description='''')\n_parser.add_argument(\"--data\", + dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args + = vars(_parser.parse_args())\n\n_outputs = consume_string_as_file(**_parsed_args)\n"], + "image": "python:3.7"}}, "inputs": [{"name": "data", "type": "String"}], + "name": "Consume string as file"}' + tekton.dev/template: '' + workspaces: + - name: consume-string-as-file-2 + timeout: 0s + workspaces: + - name: consume-string-as-file-2 + workspace: data-passing-pipeline + workspaces: + - name: data-passing-pipeline + timeout: 0s + workspaces: + - name: data-passing-pipeline + volumeClaimTemplate: + spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 2Gi