forked from kubeflow/pipelines
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(sdk): add local execution config #localexecution (kubeflow#10234)
- Loading branch information
1 parent
b4d7e18
commit 0d7913c
Showing
3 changed files
with
210 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Copyright 2023 The Kubeflow Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""The KFP local runner.""" | ||
|
||
from kfp.local.config import init | ||
from kfp.local.config import SubprocessRunner | ||
|
||
# TODO: uncomment when local execution is publicly available | ||
# __all__ = [ | ||
# 'SubprocessRunner', | ||
# 'init', | ||
# ] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# Copyright 2023 The Kubeflow Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Objects for configuring local execution.""" | ||
import dataclasses | ||
|
||
|
||
@dataclasses.dataclass | ||
class SubprocessRunner: | ||
"""Runner that indicates that local tasks should be run in a subprocess. | ||
Args: | ||
use_venv: Whether to run the subprocess in a virtual environment. If True, dependencies will be installed in the virtual environment. If False, dependencies will be installed in the current environment. Using a virtual environment is recommended. | ||
""" | ||
use_venv: bool = True | ||
|
||
|
||
class LocalExecutionConfig: | ||
instance = None | ||
|
||
def __new__( | ||
cls, | ||
runner: SubprocessRunner, | ||
pipeline_root: str, | ||
cleanup: bool, | ||
raise_on_error: bool, | ||
) -> 'LocalExecutionConfig': | ||
# singleton pattern | ||
cls.instance = super(LocalExecutionConfig, cls).__new__(cls) | ||
return cls.instance | ||
|
||
def __init__( | ||
self, | ||
runner: SubprocessRunner, | ||
pipeline_root: str, | ||
cleanup: bool, | ||
raise_on_error: bool, | ||
) -> None: | ||
self.runner = runner | ||
self.pipeline_root = pipeline_root | ||
self.cleanup = cleanup | ||
self.raise_on_error = raise_on_error | ||
|
||
|
||
def init( | ||
# more runner types will eventually be supported | ||
runner: SubprocessRunner, | ||
pipeline_root: str = './local_outputs', | ||
cleanup: bool = False, | ||
raise_on_error: bool = True, | ||
) -> None: | ||
"""Initializes a local execution session. | ||
Once called, components can be invoked locally outside of a pipeline definition. | ||
Args: | ||
runner: The runner to use. Currently only SubprocessRunner is supported. | ||
pipeline_root: Destination for task outputs. | ||
cleanup: Whether to ensure outputs are cleaned up after execution. If True, the task will be run in a temporary directory, rather than pipeline_root. | ||
raise_on_error: If True, raises an exception when a local task execution fails. If Falls, fails gracefully and does not terminal the current program. | ||
""" | ||
# updates a global config | ||
LocalExecutionConfig( | ||
runner=runner, | ||
pipeline_root=pipeline_root, | ||
cleanup=cleanup, | ||
raise_on_error=raise_on_error, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
# Copyright 2023 The Kubeflow Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""Tests for config.py.""" | ||
import unittest | ||
|
||
from kfp import local | ||
from kfp.local import config | ||
|
||
|
||
class LocalRunnerConfigTest(unittest.TestCase): | ||
|
||
def setUp(self): | ||
config.LocalExecutionConfig.instance = None | ||
|
||
def test_local_runner_config_init(self): | ||
"""Test instance attributes with one constructor call.""" | ||
config.LocalExecutionConfig( | ||
pipeline_root='my/local/root', | ||
runner=local.SubprocessRunner(use_venv=True), | ||
cleanup=True, | ||
raise_on_error=True, | ||
) | ||
|
||
instance = config.LocalExecutionConfig.instance | ||
|
||
self.assertEqual(instance.pipeline_root, 'my/local/root') | ||
self.assertEqual(instance.runner, local.SubprocessRunner(use_venv=True)) | ||
self.assertIs(instance.cleanup, True) | ||
self.assertIs(instance.raise_on_error, True) | ||
|
||
def test_local_runner_config_is_singleton(self): | ||
"""Test instance attributes with multiple constructor calls.""" | ||
config.LocalExecutionConfig( | ||
pipeline_root='my/local/root', | ||
runner=local.SubprocessRunner(), | ||
cleanup=True, | ||
raise_on_error=True, | ||
) | ||
config.LocalExecutionConfig( | ||
pipeline_root='other/local/root', | ||
runner=local.SubprocessRunner(use_venv=False), | ||
cleanup=False, | ||
raise_on_error=False, | ||
) | ||
|
||
instance = config.LocalExecutionConfig.instance | ||
|
||
self.assertEqual(instance.pipeline_root, 'other/local/root') | ||
self.assertEqual(instance.runner, | ||
local.SubprocessRunner(use_venv=False)) | ||
self.assertFalse(instance.cleanup, False) | ||
self.assertFalse(instance.raise_on_error, False) | ||
|
||
|
||
class TestInitCalls(unittest.TestCase): | ||
|
||
def setUp(self): | ||
config.LocalExecutionConfig.instance = None | ||
|
||
def test_init_more_than_once(self): | ||
"""Tests config instance attributes with one init() call.""" | ||
local.init( | ||
pipeline_root='my/local/root', | ||
runner=local.SubprocessRunner(use_venv=True), | ||
cleanup=True, | ||
) | ||
|
||
instance = config.LocalExecutionConfig.instance | ||
|
||
self.assertEqual(instance.pipeline_root, 'my/local/root') | ||
self.assertEqual(instance.runner, local.SubprocessRunner(use_venv=True)) | ||
self.assertTrue(instance.cleanup, True) | ||
|
||
def test_init_more_than_once(self): | ||
"""Test config instance attributes with multiple init() calls.""" | ||
local.init( | ||
pipeline_root='my/local/root', | ||
runner=local.SubprocessRunner(), | ||
cleanup=True, | ||
) | ||
local.init( | ||
pipeline_root='other/local/root', | ||
runner=local.SubprocessRunner(use_venv=False), | ||
cleanup=False, | ||
raise_on_error=False, | ||
) | ||
|
||
instance = config.LocalExecutionConfig.instance | ||
|
||
self.assertEqual(instance.pipeline_root, 'other/local/root') | ||
self.assertEqual(instance.runner, | ||
local.SubprocessRunner(use_venv=False)) | ||
self.assertFalse(instance.cleanup, False) | ||
self.assertFalse(instance.raise_on_error, False) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |