Skip to content

Commit

Permalink
feat(sdk): add set_env_variable for Pipeline task (#6919)
Browse files Browse the repository at this point in the history
* feat(sdk): add set_env_variable

* release notes

* enable test

* fix test

* fix test

* address comments
  • Loading branch information
ji-yaqi authored Nov 23, 2021
1 parent 4484ccd commit cd20373
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 102 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Add load_component_from_* for v2 [\#6822](https://github.com/kubeflow/pipelines/pull/6822)
* Merge v2 experimental change back to v2 namespace [\#6890](https://github.com/kubeflow/pipelines/pull/6890)
* Add ImporterSpec v2 [\#6917](https://github.com/kubeflow/pipelines/pull/6917)
* Add add set_env_variable for Pipeline task [\#6919](https://github.com/kubeflow/pipelines/pull/6919)

## Breaking Changes

Expand Down
3 changes: 0 additions & 3 deletions sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@

import collections
import inspect
import json
import re
import uuid
import warnings
from typing import (Any, Callable, Dict, List, Mapping, Optional, Set, Tuple,
Union)

Expand Down Expand Up @@ -922,7 +920,6 @@ def _build_spec_by_group(
deployment_config.executors[
executor_label].importer.CopyFrom(
subgroup_importer_spec)

elif isinstance(subgroup, dsl.ParallelFor):

# "Punch the hole", adding additional inputs (other than loop
Expand Down
12 changes: 5 additions & 7 deletions sdk/python/kfp/v2/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,15 +381,13 @@ def build_container_spec_for_task(
image=task.container_spec.image,
command=task.container_spec.commands,
args=task.container_spec.arguments,
env=[
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
.EnvVar(name=name, value=value)
for name, value in (task.container_spec.env or {}).items()
]
))

if task.container_spec.env is not None:
container_spec.env = [
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
.EnvVar(name=name, value=value)
for name, value in task.container_spec.env.items()
]

if task.container_spec.resources is not None:
container_spec.reources.cpu_limit = (
task.container_spec.resources.cpu_limit)
Expand Down
5 changes: 2 additions & 3 deletions sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ def test_pipeline_with_metrics_outputs(self):
def test_pipeline_with_exit_handler(self):
self._test_compile_py_to_json('pipeline_with_exit_handler')

# TODO: re-enable the test, add set_env_variable to PipelineTask
# def test_pipeline_with_env(self):
# self._test_compile_py_to_json('pipeline_with_env')
def test_pipeline_with_env(self):
self._test_compile_py_to_json('pipeline_with_env')

def test_v2_component_with_optional_inputs(self):
self._test_compile_py_to_json('v2_component_with_optional_inputs')
Expand Down
170 changes: 83 additions & 87 deletions sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.json
Original file line number Diff line number Diff line change
@@ -1,98 +1,94 @@
{
"pipelineSpec": {
"components": {
"comp-print-env": {
"executorLabel": "exec-print-env"
"components": {
"comp-print-env": {
"executorLabel": "exec-print-env"
},
"comp-print-env-op": {
"executorLabel": "exec-print-env-op"
}
},
"defaultPipelineRoot": "dummy_root",
"deploymentSpec": {
"executors": {
"exec-print-env": {
"container": {
"command": [
"sh",
"-c",
"set -e -x\necho \"$ENV2\"\necho \"$ENV3\"\n"
],
"env": [
{
"name": "ENV2",
"value": "val2"
},
{
"name": "ENV3",
"value": "val3"
}
],
"image": "alpine"
}
},
"comp-print-env-op": {
"executorLabel": "exec-print-env-op"
"exec-print-env-op": {
"container": {
"args": [
"--executor_input",
"{{$}}",
"--function_to_execute",
"print_env_op"
],
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.9' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_env_op():\n import os\n print(os.environ['ENV1'])\n\n"
],
"env": [
{
"name": "ENV1",
"value": "val1"
}
],
"image": "python:3.7"
}
}
},
"deploymentSpec": {
"executors": {
"exec-print-env": {
"container": {
"command": [
"sh",
"-c",
"set -e -x\necho \"$ENV2\"\necho \"$ENV3\"\n"
],
"env": [
{
"name": "ENV2",
"value": "val2"
},
{
"name": "ENV3",
"value": "val3"
}
],
"image": "alpine"
}
},
"pipelineInfo": {
"name": "pipeline-with-env"
},
"root": {
"dag": {
"tasks": {
"print-env": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-print-env"
},
"taskInfo": {
"name": "print-env"
}
},
"exec-print-env-op": {
"container": {
"args": [
"--executor_input",
"{{$}}",
"--function_to_execute",
"print_env_op"
],
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_env_op():\n import os\n print(os.environ['ENV1'])\n\n"
],
"env": [
{
"name": "ENV1",
"value": "val1"
}
],
"image": "python:3.7"
}
}
}
},
"pipelineInfo": {
"name": "pipeline-with-env"
},
"root": {
"dag": {
"tasks": {
"print-env": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-print-env"
},
"taskInfo": {
"name": "print-env"
}
"print-env-op": {
"cachingOptions": {
"enableCache": true
},
"print-env-op": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-print-env-op"
},
"taskInfo": {
"name": "print-env-op"
}
"componentRef": {
"name": "comp-print-env-op"
},
"taskInfo": {
"name": "print-env-op"
}
}
}
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-1.8.6"
}
},
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
}
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-1.8.9"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def print_env_op():
implementation:
container:
image: alpine
command:
commands:
- sh
- -c
- |
Expand Down
18 changes: 17 additions & 1 deletion sdk/python/kfp/v2/components/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import re
import copy
from typing import Any, Callable, List, Mapping, Optional, Union
from typing import Any, List, Mapping, Optional, Union

from kfp.v2.components import constants
from kfp.v2.components import pipeline_channel
Expand Down Expand Up @@ -506,6 +506,22 @@ def set_display_name(self, name: str) -> 'PipelineTask':
self.task_spec.display_name = name
return self

def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
"""Set environment variable for the pipelineTask.
Args:
name: The name of the environment variable.
value: The value of the environment variable.
Returns:
Self return to allow chained setting calls.
"""
if self.container_spec.env is not None:
self.container_spec.env[name] = value
else:
self.container_spec.env = {name: value}
return self

def after(self, *tasks) -> 'PipelineTask':
"""Specify explicit dependency on other tasks.
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/kfp/v2/components/pipeline_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,15 @@ def test_add_node_selector_constraint_accelerator_count(self):
accelerator_type='TPU_V3', accelerator_count=5),
task.container_spec.resources)

def test_set_env_variable(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.load_from_component_yaml(
V2_YAML),
arguments={'input1': 'value'},
)
task.set_env_variable('env_name', 'env_value')
self.assertEqual({'env_name': 'env_value'}, task.container_spec.env)

def test_set_display_name(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.load_from_component_yaml(
Expand Down

0 comments on commit cd20373

Please sign in to comment.