Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alpha feature to output big data passing file path instead of task run name #993

Merged
merged 9 commits into from
Jul 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 56 additions & 22 deletions sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
BIG_DATA_MIDPATH = "artifacts/$ORIG_PR_NAME"
BIG_DATA_PATH_FORMAT = "/".join(["$(workspaces.$TASK_NAME.path)", BIG_DATA_MIDPATH, "$TASKRUN_NAME", "$TASK_PARAM_NAME"])
ARTIFACT_OUTPUTLIST_ANNOTATION_KEY = 'artifact_outputs'
OUTPUT_RESULT_PATH_SUFFIX = '-datap'


def fix_big_data_passing(workflow: dict, loops_pipeline: dict, loop_name_prefix: str) -> dict:
Expand Down Expand Up @@ -324,6 +325,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
), # TODO: pipeline has no name, use pipelineRun name?
input_parameter['name']) in inputs_consumed_as_parameters
or input_parameter['name'].endswith("-trname")
or input_parameter['name'].endswith(OUTPUT_RESULT_PATH_SUFFIX)
]

# Remove output parameters unless they're used downstream
Expand Down Expand Up @@ -363,6 +365,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
parameter_argument['name']) not in inputs_consumed_as_artifacts
or task['name'] in resource_template_names
or parameter_argument['name'].endswith("-trname")
or parameter_argument['name'].endswith(OUTPUT_RESULT_PATH_SUFFIX)
]

# tekton results doesn't support underscore
Expand Down Expand Up @@ -506,6 +509,7 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
task_spec = task.get('taskSpec', {})
# Data passing for the task outputs
appended_taskrun_name = False
appended_taskrun_path_step = None
artifact_output_list = task_spec.get('metadata', {}).get('annotations', {}).get(ARTIFACT_OUTPUTLIST_ANNOTATION_KEY, '')
if artifact_output_list:
temp_list = json.loads(artifact_output_list)
Expand All @@ -524,21 +528,36 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
task_output.get('name')))
workspaces_parameter = '$(workspaces.%s.path)/%s/%s/%s' % (
task_name, BIG_DATA_MIDPATH, "$(context.taskRun.name)", task_output.get('name'))
# For child nodes to know the taskrun name, it has to pass to results via /tekton/results emptydir
if not appended_taskrun_name:
copy_taskrun_name_step = _get_base_step('output-taskrun-name')
copy_taskrun_name_step['command'].append('echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"')
task['taskSpec']['results'].append({"name": "taskrun-name", "type": "string"})
task['taskSpec']['steps'].append(copy_taskrun_name_step)
_append_original_pr_name_env(task)
appended_taskrun_name = True
if env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
# For child nodes to know the taskrun output path, it has to pass to results via /tekton/results emptydir
if not appended_taskrun_path_step:
appended_taskrun_path_step = _get_base_step('output-taskrun-path')
Tomcli marked this conversation as resolved.
Show resolved Hide resolved
if len(appended_taskrun_path_step['command']) <= 2:
appended_taskrun_path_step['command'].append('')
appended_taskrun_path_step['command'][-1] += 'echo -n "%s/%s/%s" > $(results.%s%s.path)\n' % \
(BIG_DATA_MIDPATH, "$(context.taskRun.name)", task_output.get('name'),
task_output.get('name'), OUTPUT_RESULT_PATH_SUFFIX)
task['taskSpec']['results'].append({"name": "%s%s" % (task_output.get('name'), OUTPUT_RESULT_PATH_SUFFIX),
"type": "string"})
else:
# For child nodes to know the taskrun name, it has to pass to results via /tekton/results emptydir
if not appended_taskrun_name:
copy_taskrun_name_step = _get_base_step('output-taskrun-name')
copy_taskrun_name_step['command'].append('echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"')
task['taskSpec']['results'].append({"name": "taskrun-name", "type": "string"})
task['taskSpec']['steps'].append(copy_taskrun_name_step)
_append_original_pr_name_env(task)
appended_taskrun_name = True
task['taskSpec'] = replace_big_data_placeholder(
task.get("taskSpec", {}), placeholder, workspaces_parameter)
artifact_items = pipelinerun_template['metadata']['annotations']['tekton.dev/artifact_items']
artifact_items[task['name']] = replace_big_data_placeholder(
artifact_items[task['name']], placeholder, workspaces_parameter)
pipelinerun_template['metadata']['annotations']['tekton.dev/artifact_items'] = \
artifact_items
if appended_taskrun_path_step:
task['taskSpec']['steps'].append(appended_taskrun_path_step)
_append_original_pr_name_env(task)

task_spec = task.get('taskSpec', {})
task_params = task_spec.get('params', [])
Expand Down Expand Up @@ -566,34 +585,48 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
# If the param name is constructed with task_name-param_name,
# use the current task_name as the path prefix

def append_taskrun_params(task_name_append: str):
def append_taskrun_params(task_name_append: str, task_path_name: str):
taskrun_param_name = task_name_append + "-trname"
inserted_taskrun_param = False
for param in task['taskSpec'].get('params', []):
if param.get('name', "") == taskrun_param_name:
inserted_taskrun_param = True
break
if not inserted_taskrun_param:
task['taskSpec']['params'].append({"name": taskrun_param_name})
task['params'].append({"name": taskrun_param_name, "value": "$(tasks.%s.results.taskrun-name)" % task_name_append})
param_name = {"name": taskrun_param_name}
param_nested_name = task_name_append + '-taskrun-name'
param_content = {"name": taskrun_param_name, "value": "$(tasks.%s.results.taskrun-name)" % task_name_append}
param_nested_content = {'name': task_name_append + '-taskrun-name',
'value': '$(tasks.%s.results.taskrun-name)' % task_name_append}
if env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
param_path_name = '-'.join([task_name_append, task_path])
param_name = {"name": param_path_name}
param_nested_name = param_path_name
param_content = {"name": param_path_name,
"value": "$(tasks.%s.results.%s)" % (task_name_append, task_path_name)}
param_nested_content = param_content
else:
for param in task['taskSpec'].get('params', []):
if param.get('name', "") == taskrun_param_name:
inserted_taskrun_param = True
break
if (not inserted_taskrun_param) or env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
task['taskSpec']['params'].append(param_name)
task['params'].append(param_content)
parent_task_queue = [task['name']]
while parent_task_queue:
current_task = parent_task_queue.pop(0)
for loop_name, loop_spec in loops_pipeline.items():
# print(loop_name, loop_spec)
if current_task in loop_spec.get('task_list', []):
parent_task_queue.append(loop_name.replace(loop_name_prefix, ""))
loop_param_names = [loop_param['name'] for loop_param in loops_pipeline[loop_name]['spec']['params']]
if task_name_append + '-taskrun-name' in loop_param_names:
if param_nested_name in loop_param_names:
continue
loops_pipeline[loop_name]['spec']['params'].append({'name': task_name_append + '-taskrun-name',
'value': '$(tasks.%s.results.taskrun-name)' % task_name_append})
loops_pipeline[loop_name]['spec']['params'].append(param_nested_content)

if task_param_task_name:
workspaces_parameter = '$(workspaces.%s.path)/%s/$(params.%s-trname)/%s' % (
task_name, BIG_DATA_MIDPATH, task_param_task_name, task_param_param_name)
task_path = sanitize_k8s_name(task_param_param_name) + OUTPUT_RESULT_PATH_SUFFIX
if env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
workspaces_parameter = '$(workspaces.%s.path)/$(params.%s)' % (task_name, '-'.join([task_param_task_name, task_path]))
if task_param_task_name != task_name:
append_taskrun_params(task_param_task_name) # need to get taskrun name from parent path
append_taskrun_params(task_param_task_name, task_path) # need to get taskrun name from parent path
else:
workspaces_parameter = '$(workspaces.%s.path)/%s/%s/%s' % (
task_name, BIG_DATA_MIDPATH, "$(context.taskRun.name)", task_param.get('name'))
Expand Down Expand Up @@ -653,7 +686,8 @@ def append_taskrun_params(task_name_append: str):
task.get("taskSpec", {})['params'] = [
param for param in task_spec.get('params', [])
if (task_name, param.get('name')) not in inputs_tasks or
param.get('name').endswith("-trname")
param.get('name').endswith("-trname") or
param.get('name').endswith(OUTPUT_RESULT_PATH_SUFFIX)
]

# Remove artifacts from task_spec
Expand Down