Skip to content

Commit

Permalink
feat(sdk): add local execution config #localexecution
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy committed Nov 14, 2023
1 parent 91f50da commit 7059d9a
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 0 deletions.
24 changes: 24 additions & 0 deletions sdk/python/kfp/local/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The KFP local runner."""

from kfp.local.config import init
from kfp.local.config import LocalRunnerType
from kfp.local.config import SubprocessRunner

__all__ = [
'SubprocessRunner',
'LocalRunnerType',
'init',
]
103 changes: 103 additions & 0 deletions sdk/python/kfp/local/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Objects for configuring local execution."""
import abc
import dataclasses
import os


class LocalRunnerType(abc.ABC):
"""The ABC for user-facing Runner configurations.
Subclasses should be a dataclass.
They should implement a .validate() method.
"""

@abc.abstractmethod
def validate(self) -> None:
"""Validates that the configuration arguments provided by the user are
valid."""
raise NotImplementedError


@dataclasses.dataclass
class SubprocessRunner(LocalRunnerType):
"""Runner that indicates to run tasks in a subprocess.
Args:
use_venv: Whether to use a virtual environment for any dependencies or install dependencies in the current environment. Setting this value to true is recommended.
"""
use_venv: bool = True

def validate(self) -> None:
# current do not test the KFP SDK on Windows
# consider providing support for windows later
is_windows = os.name == 'nt'
if is_windows and self.use_venv:
raise ValueError(
"This 'use_venv=True' is not supported on Windows.")


class LocalExecutionConfig:
instance = None

def __new__(
cls,
pipeline_root: str,
runner: LocalRunnerType,
cleanup: bool,
raise_on_error: bool,
) -> 'LocalExecutionConfig':
# singleton pattern
cls.instance = super(LocalExecutionConfig, cls).__new__(cls)
return cls.instance

def __init__(
self,
pipeline_root: str,
runner: LocalRunnerType,
cleanup: bool,
raise_on_error: bool,
) -> None:
self.pipeline_root = pipeline_root
self.runner = runner
self.runner.validate()
self.cleanup = cleanup
self.raise_on_error = raise_on_error


def init(
runner: LocalRunnerType,
pipeline_root: str = './local_outputs',
cleanup: bool = False,
raise_on_error: bool = True,
) -> None:
"""Initializes a local execution session.
Once called, components can be invoked locally outside of a pipeline definition.
Args:
runner: The runner to use. Currently only SubprocessRunner is supported.
pipeline_root: Destination for task outputs.
cleanup: Whether to ensure outputs are cleaned up after execution. If True, the task will be run in a temporary directory, rather than pipeline_root.
raise_on_error: If True, raises an exception when a local task execution fails. Else, fails gracefully.
"""
# updates a global config
LocalExecutionConfig(
pipeline_root=pipeline_root,
runner=runner,
cleanup=cleanup,
raise_on_error=raise_on_error,
)
109 changes: 109 additions & 0 deletions sdk/python/kfp/local/config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for config.py."""
import unittest

from kfp import local
from kfp.local import config


class LocalRunnerConfigTest(unittest.TestCase):

def setUp(self):
config.LocalExecutionConfig.instance = None

def test_local_runner_config_init(self):
"""Test instance attributes with one init() call."""
config.LocalExecutionConfig(
pipeline_root='my/local/root',
runner=local.SubprocessRunner(use_venv=True),
cleanup=True,
raise_on_error=True,
)

instance = config.LocalExecutionConfig.instance

self.assertEqual(instance.pipeline_root, 'my/local/root')
self.assertEqual(instance.runner, local.SubprocessRunner(use_venv=True))
self.assertIs(instance.cleanup, True)
self.assertIs(instance.raise_on_error, True)

def test_local_runner_config_is_singleton(self):
"""Test instance attributes with multiple init() calls."""
config.LocalExecutionConfig(
pipeline_root='my/local/root',
runner=local.SubprocessRunner(),
cleanup=True,
raise_on_error=True,
)
config.LocalExecutionConfig(
pipeline_root='other/local/root',
runner=local.SubprocessRunner(use_venv=False),
cleanup=False,
raise_on_error=False,
)

instance = config.LocalExecutionConfig.instance

self.assertEqual(instance.pipeline_root, 'other/local/root')
self.assertEqual(instance.runner,
local.SubprocessRunner(use_venv=False))
self.assertFalse(instance.cleanup, False)
self.assertFalse(instance.raise_on_error, False)


class TestInitCalls(unittest.TestCase):

def setUp(self):
config.LocalExecutionConfig.instance = None

def test_init_more_than_once(self):
"""Tests config instance attributes with one init() call."""
local.init(
pipeline_root='my/local/root',
runner=local.SubprocessRunner(use_venv=True),
cleanup=True,
)

instance = config.LocalExecutionConfig.instance

self.assertEqual(instance.pipeline_root, 'my/local/root')
self.assertEqual(instance.runner, local.SubprocessRunner(use_venv=True))
self.assertTrue(instance.cleanup, True)

def test_init_more_than_once(self):
"""Test config instance attributes with multiple init() calls."""
local.init(
pipeline_root='my/local/root',
runner=local.SubprocessRunner(),
cleanup=True,
)
local.init(
pipeline_root='other/local/root',
runner=local.SubprocessRunner(use_venv=False),
cleanup=False,
raise_on_error=False,
)

instance = config.LocalExecutionConfig.instance

self.assertEqual(instance.pipeline_root, 'other/local/root')
self.assertEqual(instance.runner,
local.SubprocessRunner(use_venv=False))
self.assertFalse(instance.cleanup, False)
self.assertFalse(instance.raise_on_error, False)


if __name__ == '__main__':
unittest.main()

0 comments on commit 7059d9a

Please sign in to comment.