From cd2037331037469e3612c440f16fbdfaf92a57f1 Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Tue, 23 Nov 2021 13:46:47 -0800 Subject: [PATCH] feat(sdk): add set_env_variable for Pipeline task (#6919) * feat(sdk): add set_env_variable * release notes * enable test * fix test * fix test * address comments --- sdk/RELEASE.md | 1 + sdk/python/kfp/v2/compiler/compiler.py | 3 - .../kfp/v2/compiler/pipeline_spec_builder.py | 12 +- .../compiler_cli_tests/compiler_cli_tests.py | 5 +- .../test_data/pipeline_with_env.json | 170 +++++++++--------- .../test_data/pipeline_with_env.py | 2 +- sdk/python/kfp/v2/components/pipeline_task.py | 18 +- .../kfp/v2/components/pipeline_task_test.py | 9 + 8 files changed, 118 insertions(+), 102 deletions(-) diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 1b76b759e93..d51c80b8ed5 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -7,6 +7,7 @@ * Add load_component_from_* for v2 [\#6822](https://github.com/kubeflow/pipelines/pull/6822) * Merge v2 experimental change back to v2 namespace [\#6890](https://github.com/kubeflow/pipelines/pull/6890) * Add ImporterSpec v2 [\#6917](https://github.com/kubeflow/pipelines/pull/6917) +* Add add set_env_variable for Pipeline task [\#6919](https://github.com/kubeflow/pipelines/pull/6919) ## Breaking Changes diff --git a/sdk/python/kfp/v2/compiler/compiler.py b/sdk/python/kfp/v2/compiler/compiler.py index 25612bb1073..cb560a17e87 100644 --- a/sdk/python/kfp/v2/compiler/compiler.py +++ b/sdk/python/kfp/v2/compiler/compiler.py @@ -20,10 +20,8 @@ import collections import inspect -import json import re import uuid -import warnings from typing import (Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Union) @@ -922,7 +920,6 @@ def _build_spec_by_group( deployment_config.executors[ executor_label].importer.CopyFrom( subgroup_importer_spec) - elif isinstance(subgroup, dsl.ParallelFor): # "Punch the hole", adding additional inputs (other than loop diff --git a/sdk/python/kfp/v2/compiler/pipeline_spec_builder.py b/sdk/python/kfp/v2/compiler/pipeline_spec_builder.py index 3f9e678165b..608145ed23b 100644 --- a/sdk/python/kfp/v2/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/v2/compiler/pipeline_spec_builder.py @@ -381,15 +381,13 @@ def build_container_spec_for_task( image=task.container_spec.image, command=task.container_spec.commands, args=task.container_spec.arguments, + env=[ + pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec + .EnvVar(name=name, value=value) + for name, value in (task.container_spec.env or {}).items() + ] )) - if task.container_spec.env is not None: - container_spec.env = [ - pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec - .EnvVar(name=name, value=value) - for name, value in task.container_spec.env.items() - ] - if task.container_spec.resources is not None: container_spec.reources.cpu_limit = ( task.container_spec.resources.cpu_limit) diff --git a/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py b/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py index 531aab6cb81..fdae3cdf2a1 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py +++ b/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py @@ -171,9 +171,8 @@ def test_pipeline_with_metrics_outputs(self): def test_pipeline_with_exit_handler(self): self._test_compile_py_to_json('pipeline_with_exit_handler') - # TODO: re-enable the test, add set_env_variable to PipelineTask - # def test_pipeline_with_env(self): - # self._test_compile_py_to_json('pipeline_with_env') + def test_pipeline_with_env(self): + self._test_compile_py_to_json('pipeline_with_env') def test_v2_component_with_optional_inputs(self): self._test_compile_py_to_json('v2_component_with_optional_inputs') diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.json index 78ec22e9660..e2497600d73 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.json @@ -1,98 +1,94 @@ { - "pipelineSpec": { - "components": { - "comp-print-env": { - "executorLabel": "exec-print-env" + "components": { + "comp-print-env": { + "executorLabel": "exec-print-env" + }, + "comp-print-env-op": { + "executorLabel": "exec-print-env-op" + } + }, + "defaultPipelineRoot": "dummy_root", + "deploymentSpec": { + "executors": { + "exec-print-env": { + "container": { + "command": [ + "sh", + "-c", + "set -e -x\necho \"$ENV2\"\necho \"$ENV3\"\n" + ], + "env": [ + { + "name": "ENV2", + "value": "val2" + }, + { + "name": "ENV3", + "value": "val3" + } + ], + "image": "alpine" + } }, - "comp-print-env-op": { - "executorLabel": "exec-print-env-op" + "exec-print-env-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_env_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.9' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_env_op():\n import os\n print(os.environ['ENV1'])\n\n" + ], + "env": [ + { + "name": "ENV1", + "value": "val1" + } + ], + "image": "python:3.7" + } } - }, - "deploymentSpec": { - "executors": { - "exec-print-env": { - "container": { - "command": [ - "sh", - "-c", - "set -e -x\necho \"$ENV2\"\necho \"$ENV3\"\n" - ], - "env": [ - { - "name": "ENV2", - "value": "val2" - }, - { - "name": "ENV3", - "value": "val3" - } - ], - "image": "alpine" + } + }, + "pipelineInfo": { + "name": "pipeline-with-env" + }, + "root": { + "dag": { + "tasks": { + "print-env": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-env" + }, + "taskInfo": { + "name": "print-env" } }, - "exec-print-env-op": { - "container": { - "args": [ - "--executor_input", - "{{$}}", - "--function_to_execute", - "print_env_op" - ], - "command": [ - "sh", - "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n", - "sh", - "-ec", - "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", - "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_env_op():\n import os\n print(os.environ['ENV1'])\n\n" - ], - "env": [ - { - "name": "ENV1", - "value": "val1" - } - ], - "image": "python:3.7" - } - } - } - }, - "pipelineInfo": { - "name": "pipeline-with-env" - }, - "root": { - "dag": { - "tasks": { - "print-env": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-print-env" - }, - "taskInfo": { - "name": "print-env" - } + "print-env-op": { + "cachingOptions": { + "enableCache": true }, - "print-env-op": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-print-env-op" - }, - "taskInfo": { - "name": "print-env-op" - } + "componentRef": { + "name": "comp-print-env-op" + }, + "taskInfo": { + "name": "print-env-op" } } } - }, - "schemaVersion": "2.1.0", - "sdkVersion": "kfp-1.8.6" + } }, - "runtimeConfig": { - "gcsOutputDirectory": "dummy_root" - } + "schemaVersion": "2.1.0", + "sdkVersion": "kfp-1.8.9" } \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.py b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.py index ca00738fe12..5598b050fcf 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.py +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_env.py @@ -29,7 +29,7 @@ def print_env_op(): implementation: container: image: alpine - command: + commands: - sh - -c - | diff --git a/sdk/python/kfp/v2/components/pipeline_task.py b/sdk/python/kfp/v2/components/pipeline_task.py index 37e7d086060..47a8753ea41 100644 --- a/sdk/python/kfp/v2/components/pipeline_task.py +++ b/sdk/python/kfp/v2/components/pipeline_task.py @@ -15,7 +15,7 @@ import re import copy -from typing import Any, Callable, List, Mapping, Optional, Union +from typing import Any, List, Mapping, Optional, Union from kfp.v2.components import constants from kfp.v2.components import pipeline_channel @@ -506,6 +506,22 @@ def set_display_name(self, name: str) -> 'PipelineTask': self.task_spec.display_name = name return self + def set_env_variable(self, name: str, value: str) -> 'PipelineTask': + """Set environment variable for the pipelineTask. + + Args: + name: The name of the environment variable. + value: The value of the environment variable. + + Returns: + Self return to allow chained setting calls. + """ + if self.container_spec.env is not None: + self.container_spec.env[name] = value + else: + self.container_spec.env = {name: value} + return self + def after(self, *tasks) -> 'PipelineTask': """Specify explicit dependency on other tasks. diff --git a/sdk/python/kfp/v2/components/pipeline_task_test.py b/sdk/python/kfp/v2/components/pipeline_task_test.py index 19f5cd95791..0224ff1b23d 100644 --- a/sdk/python/kfp/v2/components/pipeline_task_test.py +++ b/sdk/python/kfp/v2/components/pipeline_task_test.py @@ -353,6 +353,15 @@ def test_add_node_selector_constraint_accelerator_count(self): accelerator_type='TPU_V3', accelerator_count=5), task.container_spec.resources) + def test_set_env_variable(self): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.load_from_component_yaml( + V2_YAML), + arguments={'input1': 'value'}, + ) + task.set_env_variable('env_name', 'env_value') + self.assertEqual({'env_name': 'env_value'}, task.container_spec.env) + def test_set_display_name(self): task = pipeline_task.PipelineTask( component_spec=structures.ComponentSpec.load_from_component_yaml(