Skip to content

Commit

Permalink
fix the code format and ut cases issues
Browse files Browse the repository at this point in the history
  • Loading branch information
huixa committed Sep 29, 2021
1 parent 4185284 commit 822635f
Show file tree
Hide file tree
Showing 5 changed files with 1,006 additions and 3 deletions.
35 changes: 32 additions & 3 deletions sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,

# Use workspaces to tasks if big data passing instead of 'results', 'copy-inputs'
for task_template in container_templates:
task_template = big_data_passing_tasks(task_template,
task_template = big_data_passing_tasks(pipelinerun_name,
task_template,
pipelinerun_template,
inputs_consumed_as_artifacts,
outputs_consumed_as_artifacts)
Expand Down Expand Up @@ -470,8 +471,8 @@ def big_data_passing_pipelinerun(name: str, pr: dict, pw: set):
return pr, prw


def big_data_passing_tasks(task: dict, pipelinerun_template: dict,
inputs_tasks: set, outputs_tasks: set) -> dict:
def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
inputs_tasks: set, outputs_tasks: set) -> dict:
task_name = task.get('name')
task_spec = task.get('taskSpec', {})
# Data passing for the task outputs
Expand Down Expand Up @@ -524,6 +525,16 @@ def big_data_passing_tasks(task: dict, pipelinerun_template: dict,
# add input artifact processes
task = input_artifacts_tasks(task, task_artifact)

if (prname, task_artifact.get('name')) in inputs_tasks:
# add input artifact processes for pipeline parameter
if not task_artifact.setdefault('raw', {}):
for i in range(len(pipelinerun_template['spec']['params'])):
param_name = pipelinerun_template['spec']['params'][i]['name']
param_value = pipelinerun_template['spec']['params'][i]['value']
if (task_artifact.get('name') == param_name):
task_artifact['raw']['data'] = param_value
task = input_artifacts_tasks_pr_params(task, task_artifact)

# Remove artifacts parameter from params
task.get("taskSpec", {})['params'] = [
param for param in task_spec.get('params', [])
Expand All @@ -537,6 +548,24 @@ def big_data_passing_tasks(task: dict, pipelinerun_template: dict,
return task


def input_artifacts_tasks_pr_params(template: dict, artifact: dict) -> dict:
copy_inputs_step = _get_base_step('copy-inputs')
task_name = template.get('name')
task_spec = template.get('taskSpec', {})
task_params = task_spec.get('params', [])
for task_param in task_params:
workspaces_parameter = '$(workspaces.%s.path)/%s' % (
task_name, task_param.get('name'))
if 'raw' in artifact:
copy_inputs_step['script'] += 'echo -n "%s" > %s\n' % (
artifact['raw']['data'], workspaces_parameter)

template['taskSpec']['steps'] = _prepend_steps(
[copy_inputs_step], template['taskSpec']['steps'])

return template


def input_artifacts_tasks(template: dict, artifact: dict) -> dict:
# The input artifacts in KFP is not pulling from s3, it will always be passed as a raw input.
# Visit https://github.com/kubeflow/pipelines/issues/336 for more details on the implementation.
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ def test_loop_in_recur_workflow(self):
from .testdata.loop_in_recursion import flipcoin
self._test_pipeline_workflow(flipcoin, 'loop_in_recursion.yaml')

def test_data_passing_pipeline_param_as_file(self):
"""
Test compiling a pipeline_param_as_file workflow.
"""
from .testdata.data_passing_pipeline_param_as_file import data_passing_pipeline
self._test_pipeline_workflow(data_passing_pipeline, 'data_passing_pipeline_param_as_file.yaml')

def test_recur_nested_workflow(self):
"""
Test compiling a nested recursive workflow.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp
from kfp.components import create_component_from_func, InputPath
from kfp_tekton.compiler import TektonCompiler


# Consume as file
@create_component_from_func
def consume_anything_as_file(data_path: InputPath()):
with open(data_path) as f:
print("consume_anything_as_file: " + f.read())


@create_component_from_func
def consume_something_as_file(data_path: InputPath('Something')):
with open(data_path) as f:
print("consume_something_as_file: " + f.read())


@create_component_from_func
def consume_string_as_file(data_path: InputPath(str)):
with open(data_path) as f:
print("consume_string_as_file: " + f.read())


# Pipeline
@kfp.dsl.pipeline(name='data_passing_pipeline')
def data_passing_pipeline(
anything_param="anything_param",
something_param: "Something" = "something_param",
string_param: str = "string_param",
):

# Pass pipeline parameter; consume as file
consume_anything_as_file(anything_param)
consume_anything_as_file(something_param)
consume_anything_as_file(string_param)
consume_something_as_file(anything_param)
consume_something_as_file(something_param)
consume_string_as_file(anything_param)
consume_string_as_file(string_param)


if __name__ == "__main__":
kfp_endpoint = None
TektonCompiler().compile(data_passing_pipeline, __file__.replace('.py', '.yaml'))
Loading

0 comments on commit 822635f

Please sign in to comment.