Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): ImporterSpec v2 #6917

Merged
merged 10 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
apiVersion: argoproj.io/v1alpha1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this file?

kind: Workflow
metadata:
generateName: pipeline-flip-coin-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.9, pipelines.kubeflow.org/pipeline_compilation_time: '2021-11-17T11:37:42.129215',
pipelines.kubeflow.org/pipeline_spec: '{"description": "shows how to use dsl.Condition.",
"name": "pipeline flip coin"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.9}
spec:
entrypoint: pipeline-flip-coin
templates:
- name: condition-1
outputs:
parameters:
- name: flip-again-output
valueFrom: {parameter: '{{tasks.flip-again.outputs.parameters.flip-again-output}}'}
dag:
tasks:
- name: condition-2
template: condition-2
when: '"{{tasks.flip-again.outputs.parameters.flip-again-output}}" == "tails"'
dependencies: [flip-again]
arguments:
parameters:
- {name: flip-again-output, value: '{{tasks.flip-again.outputs.parameters.flip-again-output}}'}
- {name: flip-again, template: flip-again}
- name: condition-2
inputs:
parameters:
- {name: flip-again-output}
dag:
tasks:
- name: print1
template: print1
arguments:
parameters:
- {name: flip-again-output, value: '{{inputs.parameters.flip-again-output}}'}
- name: condition-3
inputs:
parameters:
- {name: flip-again-output}
dag:
tasks:
- name: print2
template: print2
arguments:
parameters:
- {name: flip-again-output, value: '{{inputs.parameters.flip-again-output}}'}
- name: flip
container:
args: ['python -c "import random; result = ''heads'' if random.randint(0,1)
== 0 else ''tails''; print(result)" | tee /tmp/output']
command: [sh, -c]
image: python:alpine3.6
outputs:
parameters:
- name: flip-output
valueFrom: {path: /tmp/output}
artifacts:
- {name: flip-output, path: /tmp/output}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
- name: flip-again
container:
args: ['python -c "import random; result = ''heads'' if random.randint(0,1)
== 0 else ''tails''; print(result)" | tee /tmp/output']
command: [sh, -c]
image: python:alpine3.6
outputs:
parameters:
- name: flip-again-output
valueFrom: {path: /tmp/output}
artifacts:
- {name: flip-again-output, path: /tmp/output}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
- name: pipeline-flip-coin
dag:
tasks:
- name: condition-1
template: condition-1
when: '"{{tasks.flip.outputs.parameters.flip-output}}" == "heads"'
dependencies: [flip]
- name: condition-3
template: condition-3
when: '"{{tasks.flip.outputs.parameters.flip-output}}" == "tails"'
dependencies: [condition-1, flip]
arguments:
parameters:
- {name: flip-again-output, value: '{{tasks.condition-1.outputs.parameters.flip-again-output}}'}
- {name: flip, template: flip}
- name: print1
container:
command: [echo, '{{inputs.parameters.flip-again-output}}']
image: alpine:3.6
inputs:
parameters:
- {name: flip-again-output}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
- name: print2
container:
command: [echo, '{{inputs.parameters.flip-again-output}}']
image: alpine:3.6
inputs:
parameters:
- {name: flip-again-output}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.9
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
arguments:
parameters: []
serviceAccountName: pipeline-runner
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Implement experimental v2 `@component` component [\#6825](https://github.com/kubeflow/pipelines/pull/6825)
* Add load_component_from_* for v2 [\#6822](https://github.com/kubeflow/pipelines/pull/6822)
* Merge v2 experimental change back to v2 namespace [\#6890](https://github.com/kubeflow/pipelines/pull/6890)
* Add ImporterSpec v2 [\#6917](https://github.com/kubeflow/pipelines/pull/6917)

## Breaking Changes

Expand Down
20 changes: 12 additions & 8 deletions sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,17 +907,21 @@ def _build_spec_by_group(
task_name_to_component_spec[
subgroup.name] = subgroup_component_spec

# TODO: handler importer spec.

subgroup_container_spec = builder.build_container_spec_for_task(
task=subgroup)

executor_label = subgroup_component_spec.executor_label

if executor_label not in deployment_config.executors:
deployment_config.executors[
executor_label].container.CopyFrom(
subgroup_container_spec)
if hasattr(subgroup, 'container_spec'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we document both container_spec and importer_spec as class attributes (in the docstring of PipelineTask), hasattr seems to be a bit contradictory.
How about we initialize them to None. and check if subgroup.container_spec is not None here?

subgroup_container_spec = builder.build_container_spec_for_task(
task=subgroup)
deployment_config.executors[
executor_label].container.CopyFrom(
subgroup_container_spec)
elif hasattr(subgroup, 'importer_spec'):
subgroup_importer_spec = builder.build_importer_spec_for_task(
task=subgroup)
deployment_config.executors[
executor_label].importer.CopyFrom(
subgroup_importer_spec)

elif isinstance(subgroup, dsl.ParallelFor):

Expand Down
25 changes: 24 additions & 1 deletion sdk/python/kfp/v2/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,29 @@ def build_component_spec_for_task(

return component_spec

def build_importer_spec_for_task(
task: pipeline_task.PipelineTask
) -> pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec:
"""Builds ImporterSpec for a pipeline task.

Args:
task: The task to build a ComponentSpec for.

Returns:
A ImporterSpec object for the task.
"""
type_schema = type_utils.get_artifact_type_schema(task.importer_spec.type_schema)
importer_spec = pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec(
type_schema=type_schema,
reimport=task.importer_spec.reimport)

if isinstance(task.importer_spec.artifact_uri, pipeline_channel.PipelineParameterChannel):
importer_spec.artifact_uri.runtime_parameter = 'uri'
elif isinstance(task.importer_spec.artifact_uri, str):
importer_spec.artifact_uri.constant.string_value = task.importer_spec.artifact_uri

return importer_spec


def build_container_spec_for_task(
task: pipeline_task.PipelineTask
Expand All @@ -351,7 +374,7 @@ def build_container_spec_for_task(
task: The task to build a ComponentSpec for.

Returns:
A PipelineContaienrSpec object for the task.
A PipelineContainerSpec object for the task.
"""
container_spec = (
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec(
Expand Down
5 changes: 2 additions & 3 deletions sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,8 @@ def test_two_step_pipeline(self):
'two_step_pipeline',
['--pipeline-parameters', '{"text":"Hello KFP!"}'])

# TODO: re-enable the test, re-implement importer using v2 structures.
# def test_pipeline_with_importer(self):
# self._test_compile_py_to_json('pipeline_with_importer')
def test_pipeline_with_importer(self):
self._test_compile_py_to_json('pipeline_with_importer')

# TODO: re-enable the test, debug load_component_from_file error
# def test_pipeline_with_ontology(self):
Expand Down
Loading