diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index d41683c4dd1..a6ee6a22e5b 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -3,6 +3,9 @@ ## Major Features and Improvements * Add v2 placeholder variables [\#6693](https://github.com/kubeflow/pipelines/pull/6693) +* Add a new command in KFP's CLI, `components`, that enables users to manage and build + v2 components in a container with Docker [\#6417](https://github.com/kubeflow/pipelines/pull/6417) + ## Breaking Changes @@ -22,6 +25,7 @@ * Depends on `typing-extensions>=3.7.4,<4; python_version<"3.9"` [\#6683](https://github.com/kubeflow/pipelines/pull/6683) * Depends on `click>=7.1.2,<9` [\#6691](https://github.com/kubeflow/pipelines/pull/6691) * Depends on `cloudpickle>=2.0.0,<3` [\#6703](https://github.com/kubeflow/pipelines/pull/6703) +* Depends on `typer>=0.3.2,<1.0` [\#6417](https://github.com/kubeflow/pipelines/pull/6417) ## Documentation Updates diff --git a/sdk/python/kfp/cli/cli.py b/sdk/python/kfp/cli/cli.py index 55b2ae14314..48460535785 100644 --- a/sdk/python/kfp/cli/cli.py +++ b/sdk/python/kfp/cli/cli.py @@ -12,15 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import click import logging import sys + +import click +import typer + from kfp._client import Client from kfp.cli.run import run from kfp.cli.pipeline import pipeline from kfp.cli.diagnose_me_cli import diagnose_me from kfp.cli.experiment import experiment from kfp.cli.output import OutputFormat +from kfp.cli import components @click.group() @@ -67,6 +71,7 @@ def main(): cli.add_command(pipeline) cli.add_command(diagnose_me, 'diagnose_me') cli.add_command(experiment) + cli.add_command(typer.main.get_command(components.app)) try: cli(obj={}, auto_envvar_prefix='KFP') except Exception as e: diff --git a/sdk/python/kfp/cli/components.py b/sdk/python/kfp/cli/components.py new file mode 100644 index 00000000000..cfede9c39d0 --- /dev/null +++ b/sdk/python/kfp/cli/components.py @@ -0,0 +1,481 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import configparser +import contextlib +import enum +import pathlib +import shutil +import subprocess +import tempfile +from typing import Any, List, Optional + +_DOCKER_IS_PRESENT = True +try: + import docker +except ImportError: + _DOCKER_IS_PRESENT = False + +import typer + +import kfp +from kfp.v2.components import component_factory, kfp_config, utils + +_REQUIREMENTS_TXT = 'requirements.txt' + +_DOCKERFILE = 'Dockerfile' + +_DOCKERFILE_TEMPLATE = ''' +FROM {base_image} + +WORKDIR {component_root_dir} +COPY requirements.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt +{maybe_copy_kfp_package} +RUN pip install --no-cache-dir {kfp_package_path} +COPY . . +''' + +_DOCKERIGNORE = '.dockerignore' + +# Location in which to write out shareable YAML for components. +_COMPONENT_METADATA_DIR = 'component_metadata' + +_DOCKERIGNORE_TEMPLATE = ''' +{}/ +'''.format(_COMPONENT_METADATA_DIR) + +# Location at which v2 Python function-based components will stored +# in containerized components. +_COMPONENT_ROOT_DIR = pathlib.Path('/usr/local/src/kfp/components') + + +@contextlib.contextmanager +def _registered_modules(): + registered_modules = {} + component_factory.REGISTERED_MODULES = registered_modules + try: + yield registered_modules + finally: + component_factory.REGISTERED_MODULES = None + + +class _Engine(str, enum.Enum): + """Supported container build engines.""" + DOCKER = 'docker' + KANIKO = 'kaniko' + CLOUD_BUILD = 'cloudbuild' + + +app = typer.Typer() + + +def _info(message: Any): + info = typer.style('INFO', fg=typer.colors.GREEN) + typer.echo('{}: {}'.format(info, message)) + + +def _warning(message: Any): + info = typer.style('WARNING', fg=typer.colors.YELLOW) + typer.echo('{}: {}'.format(info, message)) + + +def _error(message: Any): + info = typer.style('ERROR', fg=typer.colors.RED) + typer.echo('{}: {}'.format(info, message)) + + +class _ComponentBuilder(): + """Helper class for building containerized v2 KFP components.""" + + def __init__( + self, + context_directory: pathlib.Path, + kfp_package_path: Optional[pathlib.Path] = None, + component_filepattern: str = '**/*.py', + ): + """ComponentBuilder builds containerized components. + + Args: + context_directory: Directory containing one or more Python files + with one or more KFP v2 components. + kfp_package_path: Path to a pip-installable location for KFP. + This can either be pointing to KFP SDK root directory located in + a local clone of the KFP repo, or a git+https location. + If left empty, defaults to KFP on PyPi. + """ + self._context_directory = context_directory + self._dockerfile = self._context_directory / _DOCKERFILE + self._component_filepattern = component_filepattern + self._components: List[component_factory.component_factory.ComponentInfo + ] = [] + + # This is only set if we need to install KFP from local copy. + self._maybe_copy_kfp_package = '' + + if kfp_package_path is None: + self._kfp_package_path = 'kfp=={}'.format(kfp.__version__) + elif kfp_package_path.is_dir(): + _info( + 'Building KFP package from local directory {}'.format( + typer.style(str(kfp_package_path), fg=typer.colors.CYAN) + ) + ) + temp_dir = pathlib.Path(tempfile.mkdtemp()) + try: + subprocess.run([ + 'python3', + kfp_package_path / 'setup.py', + 'bdist_wheel', + '--dist-dir', + str(temp_dir), + ], + cwd=kfp_package_path) + wheel_files = list(temp_dir.glob('*.whl')) + if len(wheel_files) != 1: + _error( + 'Failed to find built KFP wheel under {}'. + format(temp_dir) + ) + raise typer.Exit(1) + + wheel_file = wheel_files[0] + shutil.copy(wheel_file, self._context_directory) + self._kfp_package_path = wheel_file.name + self._maybe_copy_kfp_package = 'COPY {wheel_name} {wheel_name}'.format( + wheel_name=self._kfp_package_path + ) + except subprocess.CalledProcessError as e: + _error('Failed to build KFP wheel locally:\n{}'.format(e)) + raise typer.Exit(1) + finally: + _info('Cleaning up temporary directory {}'.format(temp_dir)) + shutil.rmtree(temp_dir) + else: + self._kfp_package_path = kfp_package_path + + _info( + 'Building component using KFP package path: {}'.format( + typer.style(str(self._kfp_package_path), fg=typer.colors.CYAN) + ) + ) + + self._context_directory_files = [ + file.name + for file in self._context_directory.glob('*') + if file.is_file() + ] + + self._component_files = [ + file for file in + self._context_directory.glob(self._component_filepattern) + if file.is_file() + ] + + self._base_image = None + self._target_image = None + self._load_components() + + def _load_components(self): + if not self._component_files: + _error( + 'No component files found matching pattern `{}` in directory {}' + .format(self._component_filepattern, self._context_directory) + ) + raise typer.Exit(1) + + for python_file in self._component_files: + with _registered_modules() as component_modules: + module_name = python_file.name[:-len('.py')] + module_directory = python_file.parent + utils.load_module( + module_name=module_name, module_directory=module_directory + ) + + formatted_module_file = typer.style( + str(python_file), fg=typer.colors.CYAN + ) + if not component_modules: + _error( + 'No KFP components found in file {}'. + format(formatted_module_file) + ) + raise typer.Exit(1) + + _info( + 'Found {} component(s) in file {}:'.format( + len(component_modules), formatted_module_file + ) + ) + for name, component in component_modules.items(): + _info('{}: {}'.format(name, component)) + self._components.append(component) + + base_images = set([ + info.base_image for info in component_modules.values() + ]) + target_images = set([ + info.target_image for info in component_modules.values() + ]) + + if len(base_images) != 1: + _error( + 'Found {} unique base_image values {}. Components' + ' must specify the same base_image and target_image.'.format( + len(base_images), base_images + ) + ) + raise typer.Exit(1) + + self._base_image = base_images.pop() + if self._base_image is None: + _error( + 'Did not find a base_image specified in any of the' + ' components. A base_image must be specified in order to' + ' build the component.' + ) + raise typer.Exit(1) + _info( + 'Using base image: {}'.format( + typer.style(self._base_image, fg=typer.colors.YELLOW) + ) + ) + + if len(target_images) != 1: + _error( + 'Found {} unique target_image values {}. Components' + ' must specify the same base_image and' + ' target_image.'.format(len(target_images), target_images) + ) + raise typer.Exit(1) + + self._target_image = target_images.pop() + if self._target_image is None: + _error( + 'Did not find a target_image specified in any of the' + ' components. A target_image must be specified in order' + ' to build the component.' + ) + raise typer.Exit(1) + _info( + 'Using target image: {}'.format( + typer.style(self._target_image, fg=typer.colors.YELLOW) + ) + ) + + def _maybe_write_file( + self, filename: str, contents: str, overwrite: bool = False + ): + formatted_filename = typer.style(filename, fg=typer.colors.CYAN) + if filename in self._context_directory_files: + _info( + 'Found existing file {} under {}.'.format( + formatted_filename, self._context_directory + ) + ) + if not overwrite: + _info('Leaving this file untouched.') + return + else: + _warning( + 'Overwriting existing file {}'.format(formatted_filename) + ) + else: + _warning( + '{} not found under {}. Creating one.'.format( + formatted_filename, self._context_directory + ) + ) + + filepath = self._context_directory / filename + with open(filepath, 'w') as f: + f.write('# Generated by KFP.\n{}'.format(contents)) + _info('Generated file {}.'.format(filepath)) + + def maybe_generate_requirements_txt(self): + self._maybe_write_file(_REQUIREMENTS_TXT, '') + + def maybe_generate_dockerignore(self): + self._maybe_write_file(_DOCKERIGNORE, _DOCKERIGNORE_TEMPLATE) + + def write_component_files(self): + for component_info in self._components: + filename = ( + component_info.output_component_file or + component_info.function_name + '.yaml' + ) + container_filename = ( + self._context_directory / _COMPONENT_METADATA_DIR / filename + ) + container_filename.parent.mkdir(exist_ok=True, parents=True) + component_info.component_spec.save(container_filename) + + def generate_kfp_config(self): + config = kfp_config.KFPConfig(config_directory=self._context_directory) + for component_info in self._components: + relative_path = component_info.module_path.relative_to( + self._context_directory + ) + config.add_component( + function_name=component_info.function_name, path=relative_path + ) + config.save() + + def maybe_generate_dockerfile(self, overwrite_dockerfile: bool = False): + dockerfile_contents = _DOCKERFILE_TEMPLATE.format( + base_image=self._base_image, + maybe_copy_kfp_package=self._maybe_copy_kfp_package, + component_root_dir=_COMPONENT_ROOT_DIR, + kfp_package_path=self._kfp_package_path + ) + + self._maybe_write_file( + _DOCKERFILE, dockerfile_contents, overwrite_dockerfile + ) + + def build_image(self, push_image: bool = True): + _info( + 'Building image {} using Docker...'.format( + typer.style(self._target_image, fg=typer.colors.YELLOW) + ) + ) + client = docker.from_env() + + docker_log_prefix = typer.style('Docker', fg=typer.colors.CYAN) + + try: + context = str(self._context_directory) + logs = client.api.build( + path=context, + dockerfile='Dockerfile', + tag=self._target_image, + decode=True, + ) + for log in logs: + message = log.get('stream', '').rstrip('\n') + if message: + _info('{}: {}'.format(docker_log_prefix, message)) + + except docker.errors.BuildError as e: + for log in e.build_log: + message = log.get('message', '').rstrip('\n') + if message: + _error('{}: {}'.format(docker_log_prefix, message)) + _error('{}: {}'.format(docker_log_prefix, e)) + raise typer.Exit(1) + + if not push_image: + return + + _info( + 'Pushing image {}...'.format( + typer.style(self._target_image, fg=typer.colors.YELLOW) + ) + ) + + try: + response = client.images.push( + self._target_image, stream=True, decode=True + ) + for log in response: + status = log.get('status', '').rstrip('\n') + layer = log.get('id', '') + if status: + _info('{}: {} {}'.format(docker_log_prefix, layer, status)) + except docker.errors.BuildError as e: + _error('{}: {}'.format(docker_log_prefix, e)) + raise e + + _info( + 'Built and pushed component container {}'.format( + typer.style(self._target_image, fg=typer.colors.YELLOW) + ) + ) + + +@app.callback() +def components(): + """Builds shareable, containerized components.""" + pass + + +@app.command() +def build( + components_directory: pathlib.Path = typer.Argument( + ..., + help="Path to a directory containing one or more Python" + " files with KFP v2 components. The container will be built" + " with this directory as the context." + ), + component_filepattern: str = typer.Option( + '**/*.py', + help="Filepattern to use when searching for KFP components. The" + " default searches all Python files in the specified directory." + ), + engine: _Engine = typer.Option( + _Engine.DOCKER, + help="Engine to use to build the component's container." + ), + kfp_package_path: Optional[ + pathlib.Path + ] = typer.Option(None, help="A pip-installable path to the KFP package."), + overwrite_dockerfile: bool = typer.Option( + False, + help="Set this to true to always generate a Dockerfile" + " as part of the build process" + ), + push_image: bool = typer.Option( + True, help="Push the built image to its remote repository." + ) +): + """ + Builds containers for KFP v2 Python-based components. + """ + components_directory = components_directory.resolve() + if not components_directory.is_dir(): + _error( + '{} does not seem to be a valid directory.'. + format(components_directory) + ) + raise typer.Exit(1) + + if engine != _Engine.DOCKER: + _error('Currently, only `docker` is supported for --engine.') + raise typer.Exit(1) + + if engine == _Engine.DOCKER: + if not _DOCKER_IS_PRESENT: + _error( + 'The `docker` Python package was not found in the current' + ' environment. Please run `pip install docker` to install it.' + ' Optionally, you can also install KFP with all of its' + ' optional dependencies by running `pip install kfp[all]`.' + ) + raise typer.Exit(1) + + builder = _ComponentBuilder( + context_directory=components_directory, + kfp_package_path=kfp_package_path, + component_filepattern=component_filepattern, + ) + builder.write_component_files() + builder.generate_kfp_config() + + builder.maybe_generate_requirements_txt() + builder.maybe_generate_dockerignore() + builder.maybe_generate_dockerfile(overwrite_dockerfile=overwrite_dockerfile) + builder.build_image(push_image=push_image) + + +if __name__ == '__main__': + app() diff --git a/sdk/python/kfp/cli/components_test.py b/sdk/python/kfp/cli/components_test.py new file mode 100644 index 00000000000..f0e7fdae436 --- /dev/null +++ b/sdk/python/kfp/cli/components_test.py @@ -0,0 +1,451 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for `components` command group in KFP CLI.""" +import contextlib +import importlib +import pathlib +import sys +import textwrap +from typing import List, Optional, Union +import unittest +from unittest import mock + +from typer import testing + +# Docker is an optional install, but we need the import to succeed for tests. +# So we patch it before importing kfp.cli.components. +if importlib.util.find_spec('docker') is None: + sys.modules['docker'] = mock.Mock() +from kfp.cli import components + +_COMPONENT_TEMPLATE = ''' +from kfp.v2.dsl import * + +@component( + base_image={base_image}, + target_image={target_image}, + output_component_file={output_component_file}) +def {func_name}(): + pass +''' + + +def _make_component(func_name: str, + base_image: Optional[str] = None, + target_image: Optional[str] = None, + output_component_file: Optional[str] = None) -> str: + return textwrap.dedent(''' + from kfp.v2.dsl import * + + @component( + base_image={base_image}, + target_image={target_image}, + output_component_file={output_component_file}) + def {func_name}(): + pass + ''').format( + base_image=repr(base_image), + target_image=repr(target_image), + output_component_file=repr(output_component_file), + func_name=func_name) + + +def _write_file(filename: str, file_contents: str): + filepath = pathlib.Path(filename) + filepath.parent.mkdir(exist_ok=True, parents=True) + filepath.write_text(file_contents) + + +def _write_components(filename: str, component_template: Union[List[str], str]): + if isinstance(component_template, list): + file_contents = '\n\n'.join(component_template) + else: + file_contents = component_template + _write_file(filename=filename, file_contents=file_contents) + + +class Test(unittest.TestCase): + + def setUp(self) -> None: + self._runner = testing.CliRunner() + components._DOCKER_IS_PRESENT = True + + patcher = mock.patch('docker.from_env') + self._docker_client = patcher.start().return_value + self._docker_client.images.build.return_value = [{ + 'stream': 'Build logs' + }] + self._docker_client.images.push.return_value = [{'status': 'Pushed'}] + self.addCleanup(patcher.stop) + + self._app = components.app + with contextlib.ExitStack() as stack: + stack.enter_context(self._runner.isolated_filesystem()) + self._working_dir = pathlib.Path.cwd() + self.addCleanup(stack.pop_all().close) + + return super().setUp() + + def assertFileExists(self, path: str): + path_under_test_dir = self._working_dir / path + self.assertTrue(path_under_test_dir, f'File {path} does not exist!') + + def assertFileExistsAndContains(self, path: str, expected_content: str): + self.assertFileExists(path) + path_under_test_dir = self._working_dir / path + got_content = path_under_test_dir.read_text() + self.assertEqual(got_content, expected_content) + + def testKFPConfigForSingleFile(self): + preprocess_component = _make_component( + func_name='preprocess', target_image='custom-image') + train_component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', + [preprocess_component, train_component]) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + preprocess = components.py + train = components.py + + ''')) + + def testKFPConfigForSingleFileUnderNestedDirectory(self): + preprocess_component = _make_component( + func_name='preprocess', target_image='custom-image') + train_component = _make_component( + func_name='train', target_image='custom-image') + _write_components('dir1/dir2/dir3/components.py', + [preprocess_component, train_component]) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + preprocess = dir1/dir2/dir3/components.py + train = dir1/dir2/dir3/components.py + + ''')) + + def testKFPConfigForMultipleFiles(self): + component = _make_component( + func_name='preprocess', target_image='custom-image') + _write_components('preprocess_component.py', component) + + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('train_component.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + preprocess = preprocess_component.py + train = train_component.py + + ''')) + + def testKFPConfigForMultipleFilesUnderNestedDirectories(self): + component = _make_component( + func_name='preprocess', target_image='custom-image') + _write_components('preprocess/preprocess_component.py', component) + + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('train/train_component.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + preprocess = preprocess/preprocess_component.py + train = train/train_component.py + + ''')) + + def testComponentFilepatternCanBeUsedToRestrictDiscovery(self): + component = _make_component( + func_name='preprocess', target_image='custom-image') + _write_components('preprocess/preprocess_component.py', component) + + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('train/train_component.py', component) + + result = self._runner.invoke( + self._app, + [ + 'build', + str(self._working_dir), '--component-filepattern=train/*' + ], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + train = train/train_component.py + + ''')) + + def testEmptyRequirementsTxtFileIsGenerated(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke(self._app, + ['build', str(self._working_dir)]) + self.assertEqual(result.exit_code, 0) + self.assertFileExistsAndContains('requirements.txt', + '# Generated by KFP.\n') + + def testExistingRequirementsTxtFileIsUnchanged(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + _write_file('requirements.txt', 'Some pre-existing content') + + result = self._runner.invoke(self._app, + ['build', str(self._working_dir)]) + self.assertEqual(result.exit_code, 0) + self.assertFileExistsAndContains('requirements.txt', + 'Some pre-existing content') + + def testDockerignoreFileIsGenerated(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke(self._app, + ['build', str(self._working_dir)]) + self.assertEqual(result.exit_code, 0) + self.assertFileExistsAndContains( + '.dockerignore', + textwrap.dedent('''\ + # Generated by KFP. + + component_metadata/ + ''')) + + def testExistingDockerignoreFileIsUnchanged(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + _write_file('.dockerignore', 'Some pre-existing content') + + result = self._runner.invoke(self._app, + ['build', str(self._working_dir)]) + self.assertEqual(result.exit_code, 0) + self.assertFileExistsAndContains('.dockerignore', + 'Some pre-existing content') + + def testDockerEngineIsSupported(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--engine=docker']) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self._docker_client.images.push.assert_called_once_with( + 'custom-image', stream=True, decode=True) + + def testKanikoEngineIsNotSupported(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--engine=kaniko'], + ) + self.assertEqual(result.exit_code, 1) + self._docker_client.api.build.assert_not_called() + self._docker_client.images.push.assert_not_called() + + def testCloudBuildEngineIsNotSupported(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--engine=cloudbuild'], + ) + self.assertEqual(result.exit_code, 1) + self._docker_client.api.build.assert_not_called() + self._docker_client.images.push.assert_not_called() + + def testDockerClientIsCalledToBuildAndPushByDefault(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self._docker_client.api.build.assert_called_once() + self._docker_client.images.push.assert_called_once_with( + 'custom-image', stream=True, decode=True) + + def testDockerClientIsCalledToBuildButSkipsPushing(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--no-push-image'], + ) + self.assertEqual(result.exit_code, 0) + + self._docker_client.api.build.assert_called_once() + self._docker_client.images.push.assert_not_called() + + @mock.patch('kfp.__version__', '1.2.3') + def testDockerfileIsCreatedCorrectly(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self.assertFileExistsAndContains( + 'Dockerfile', + textwrap.dedent('''\ + # Generated by KFP. + + FROM python:3.7 + + WORKDIR /usr/local/src/kfp/components + COPY requirements.txt requirements.txt + RUN pip install --no-cache-dir -r requirements.txt + + RUN pip install --no-cache-dir kfp==1.2.3 + COPY . . + ''')) + + def testExistingDockerfileIsUnchangedByDefault(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + _write_file('Dockerfile', 'Existing Dockerfile contents') + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self.assertFileExistsAndContains('Dockerfile', + 'Existing Dockerfile contents') + + @mock.patch('kfp.__version__', '1.2.3') + def testExistingDockerfileCanBeOverwritten(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + _write_file('Dockerfile', 'Existing Dockerfile contents') + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--overwrite-dockerfile'], + ) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self.assertFileExistsAndContains( + 'Dockerfile', + textwrap.dedent('''\ + # Generated by KFP. + + FROM python:3.7 + + WORKDIR /usr/local/src/kfp/components + COPY requirements.txt requirements.txt + RUN pip install --no-cache-dir -r requirements.txt + + RUN pip install --no-cache-dir kfp==1.2.3 + COPY . . + ''')) + + def testDockerfileCanContainCustomKFPPackage(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + [ + 'build', + str(self._working_dir), + '--kfp-package-path=/Some/localdir/containing/kfp/source' + ], + ) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self.assertFileExistsAndContains( + 'Dockerfile', + textwrap.dedent('''\ + # Generated by KFP. + + FROM python:3.7 + + WORKDIR /usr/local/src/kfp/components + COPY requirements.txt requirements.txt + RUN pip install --no-cache-dir -r requirements.txt + + RUN pip install --no-cache-dir /Some/localdir/containing/kfp/source + COPY . . + ''')) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/v2/components/component_decorator.py b/sdk/python/kfp/v2/components/component_decorator.py index a8e4301e6ed..70db103a19d 100644 --- a/sdk/python/kfp/v2/components/component_decorator.py +++ b/sdk/python/kfp/v2/components/component_decorator.py @@ -21,14 +21,27 @@ def component(func: Optional[Callable] = None, *, base_image: Optional[str] = None, + target_image: Optional[str] = None, packages_to_install: List[str] = None, output_component_file: Optional[str] = None, install_kfp_package: bool = True, kfp_package_path: Optional[str] = None): """Decorator for Python-function based components in KFP v2. - A lightweight component is a self-contained Python function that includes - all necessary imports and dependencies. + A KFP v2 component can either be a lightweight component, or a containerized + one. + + If target_image is not specified, this function creates a lightweight + component. A lightweight component is a self-contained Python function that + includes all necessary imports and dependencies. In lightweight components, + packages_to_install will be used to install dependencies at runtime. The + parameters install_kfp_package and kfp_package_path can be used to control + how KFP should be installed when the lightweight component is executed. + + If target_image is specified, this function creates a component definition + based around the target_image. The assumption is that the function in func + will be packaged by KFP into this target_image. Use the KFP CLI's `build` + command to package func into target_image. Example usage: @@ -55,14 +68,19 @@ def pipeline(): should have type annotations for all its arguments, indicating how it is intended to be used (e.g. as an input/output Artifact object, a plain parameter, or a path to a file). - base_image: The image to use when executing |func|. It should + base_image: The image to use when executing func. It should contain a default Python interpreter that is compatible with KFP. packages_to_install: A list of optional packages to install before - executing |func|. + executing func. These will always be installed at component runtime. + output_component_file: If specified, this function will write a + shareable/loadable version of the component spec into this file. install_kfp_package: Specifies if we should add a KFP Python package to - |packages_to_install|. Lightweight Python functions always require - an installation of KFP in |base_image| to work. If you specify - a |base_image| that already contains KFP, you can set this to False. + packages_to_install. Lightweight Python functions always require + an installation of KFP in base_image to work. If you specify + a base_image that already contains KFP, you can set this to False. + This flag is ignored when target_image is specified, which implies + we're building a containerized component. Containerized components + will always install KFP as part of the build process. kfp_package_path: Specifies the location from which to install KFP. By default, this will try to install from PyPi using the same version as that used when this component was created. KFP developers can @@ -72,11 +90,12 @@ def pipeline(): Returns: A component task factory that can be used in pipeline definitions. - """ + """ if func is None: return functools.partial( component, base_image=base_image, + target_image=target_image, packages_to_install=packages_to_install, output_component_file=output_component_file, install_kfp_package=install_kfp_package, @@ -85,6 +104,7 @@ def pipeline(): return component_factory.create_component_from_func( func, base_image=base_image, + target_image=target_image, packages_to_install=packages_to_install, output_component_file=output_component_file, install_kfp_package=install_kfp_package, diff --git a/sdk/python/kfp/v2/components/component_factory.py b/sdk/python/kfp/v2/components/component_factory.py index 670ccf81e62..6bdb8f07c57 100644 --- a/sdk/python/kfp/v2/components/component_factory.py +++ b/sdk/python/kfp/v2/components/component_factory.py @@ -11,11 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import dataclasses import inspect import itertools +import pathlib import re import textwrap -from typing import Callable, Dict, List, Mapping, Optional, TypeVar +from typing import Callable, List, Optional, Tuple import warnings import docstring_parser @@ -27,6 +29,28 @@ _DEFAULT_BASE_IMAGE = 'python:3.7' +@dataclasses.dataclass +class ComponentInfo(): + """A dataclass capturing registered v2 components. + + This will likely be subsumed/augmented with v2 BaseComponent. + """ + name: str + function_name: str + func: Callable + target_image: str + module_path: pathlib.Path + component_spec: structures.ComponentSpec + output_component_file: Optional[str] = None + base_image: str = _DEFAULT_BASE_IMAGE + + +# A map from function_name to components. This is always populated when a +# module containing KFP v2 components is loaded. Primarily used by KFP CLI +# component builder to package components in a file into containers. +REGISTERED_MODULES = None + + def _python_function_name_to_component_name(name): name_with_spaces = re.sub(' +', ' ', name.replace('_', ' ')).strip(' ') return name_with_spaces[0].upper() + name_with_spaces[1:] @@ -122,79 +146,6 @@ def _maybe_make_unique(name: str, names: List[str]): raise RuntimeError('Too many arguments with the name {}'.format(name)) -# TODO(KFPv2): Replace with v2 ComponentSpec. -def _func_to_component_spec( - func: Callable, - base_image: Optional[str] = None, - packages_to_install: Optional[List[str]] = None, - install_kfp_package: bool = True, - kfp_package_path: Optional[str] = None) -> structures.ComponentSpec: - decorator_base_image = getattr(func, '_component_base_image', None) - if decorator_base_image is not None: - if base_image is not None and decorator_base_image != base_image: - raise ValueError( - 'base_image ({}) conflicts with the decorator-specified base image metadata ({})' - .format(base_image, decorator_base_image)) - else: - base_image = decorator_base_image - else: - if base_image is None: - base_image = _DEFAULT_BASE_IMAGE - if isinstance(base_image, Callable): - base_image = base_image() - - imports_source = [ - "import kfp", - "from kfp.v2 import dsl", - "from kfp.v2.dsl import *", - "from typing import *", - ] - - func_source = _get_function_source_definition(func) - - source = textwrap.dedent(""" - {imports_source} - - {func_source}\n""").format( - imports_source='\n'.join(imports_source), func_source=func_source) - - packages_to_install = packages_to_install or [] - if install_kfp_package: - if kfp_package_path is None: - kfp_package_path = _get_default_kfp_package_path() - packages_to_install.append(kfp_package_path) - - packages_to_install_command = _get_packages_to_install_command( - package_list=packages_to_install) - - from kfp.components._structures import ExecutorInputPlaceholder - component_spec = extract_component_interface(func) - - component_spec.implementation = structures.ContainerImplementation( - container=structures.ContainerSpec( - image=base_image, - command=packages_to_install_command + [ - 'sh', - '-ec', - textwrap.dedent('''\ - program_path=$(mktemp -d) - printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.v2.components.executor_main \ - --component_module_path \ - "$program_path/ephemeral_component.py" \ - "$@" - '''), - source, - ], - args=[ - "--executor_input", - ExecutorInputPlaceholder(), - "--function_to_execute", - func.__name__, - ])) - return component_spec - - def extract_component_interface(func: Callable) -> structures.ComponentSpec: single_output_name_const = 'Output' @@ -249,8 +200,8 @@ def extract_component_interface(func: Callable) -> structures.ComponentSpec: passing_style == type_annotations.InputPath and parameter.default is None): raise ValueError( - 'Path inputs only support default values of None. Default values for outputs are not supported.' - ) + 'Path inputs only support default values of None. Default' + ' values for outputs are not supported.') type_struct = _annotation_to_type_struct(parameter_type) @@ -283,8 +234,8 @@ def extract_component_interface(func: Callable) -> structures.ComponentSpec: parameter.default, outer_type_name) except Exception as ex: warnings.warn( - 'Could not serialize the default value of the parameter "{}". {}' - .format(parameter.name, ex)) + 'Could not serialize the default value of the' + ' parameter "{}". {}'.format(parameter.name, ex)) input_spec._passing_style = passing_style input_spec._parameter_name = parameter.name inputs.append(input_spec) @@ -313,12 +264,14 @@ def extract_component_interface(func: Callable) -> structures.ComponentSpec: output_spec._passing_style = None output_spec._return_tuple_field_name = field_name outputs.append(output_spec) - # Deprecated dict-based way of declaring multiple outputs. Was only used by the @component decorator + # Deprecated dict-based way of declaring multiple outputs. Was only used by + # the @component decorator elif isinstance(return_ann, dict): warnings.warn( - "The ability to specify multiple outputs using the dict syntax has been deprecated." - "It will be removed soon after release 0.1.32." - "Please use typing.NamedTuple to declare multiple outputs.") + "The ability to specify multiple outputs using the dict syntax" + " has been deprecated. It will be removed soon after release" + " 0.1.32. Please use typing.NamedTuple to declare multiple" + " outputs.") for output_name, output_type_annotation in return_ann.items(): output_type_struct = _annotation_to_type_struct( output_type_annotation) @@ -329,7 +282,8 @@ def extract_component_interface(func: Callable) -> structures.ComponentSpec: outputs.append(output_spec) elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty: output_name = _maybe_make_unique(single_output_name_const, output_names) - # Fixes exotic, but possible collision: `def func(output_path: OutputPath()) -> str: ...` + # Fixes exotic, but possible collision: + # `def func(output_path: OutputPath()) -> str: ...` output_names.add(output_name) type_struct = _annotation_to_type_struct(signature.return_annotation) output_spec = structures.OutputSpec( @@ -339,9 +293,11 @@ def extract_component_interface(func: Callable) -> structures.ComponentSpec: output_spec._passing_style = None outputs.append(output_spec) - # Component name and description are derived from the function's name and docstring. - # The name can be overridden by setting setting func.__name__ attribute (of the legacy func._component_human_name attribute). - # The description can be overridden by setting the func.__doc__ attribute (or the legacy func._component_description attribute). + # Component name and description are derived from the function's name and + # docstring. The name can be overridden by setting setting func.__name__ + # attribute (of the legacy func._component_human_name attribute). The + # description can be overridden by setting the func.__doc__ attribute (or + # the legacy func._component_description attribute). component_name = getattr(func, '_component_human_name', None) or _python_function_name_to_component_name( func.__name__) @@ -359,48 +315,135 @@ def extract_component_interface(func: Callable) -> structures.ComponentSpec: return component_spec +def _get_command_and_args_for_lightweight_component( + func: Callable) -> Tuple[List[str], List[str]]: + imports_source = [ + "import kfp", + "from kfp.v2 import dsl", + "from kfp.v2.dsl import *", + "from typing import *", + ] + + func_source = _get_function_source_definition(func) + source = textwrap.dedent(""" + {imports_source} + + {func_source}\n""").format( + imports_source='\n'.join(imports_source), func_source=func_source) + command = [ + 'sh', + '-ec', + textwrap.dedent('''\ + program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" + python3 -m kfp.v2.components.executor_main \ + --component_module_path \ + "$program_path/ephemeral_component.py" \ + "$@" + '''), + source, + ] + + args = [ + "--executor_input", + structures.ExecutorInputPlaceholder(), + "--function_to_execute", + func.__name__, + ] + + return command, args + + +def _get_command_and_args_for_containerized_component( + function_name: str) -> Tuple[List[str], List[str]]: + command = [ + 'python3', + '-m', + 'kfp.v2.components.executor_main', + ] + + args = [ + "--executor_input", + structures.ExecutorInputPlaceholder(), + "--function_to_execute", + function_name, + ] + return command, args + + def create_component_from_func(func: Callable, base_image: Optional[str] = None, + target_image: Optional[str] = None, packages_to_install: List[str] = None, output_component_file: Optional[str] = None, install_kfp_package: bool = True, kfp_package_path: Optional[str] = None): - """Converts a Python function to a v2 lightweight component. - - A lightweight component is a self-contained Python function that includes - all necessary imports and dependencies. - - Args: - func: The python function to create a component from. The function - should have type annotations for all its arguments, indicating how - it is intended to be used (e.g. as an input/output Artifact object, - a plain parameter, or a path to a file). - base_image: The image to use when executing |func|. It should - contain a default Python interpreter that is compatible with KFP. - packages_to_install: A list of optional packages to install before - executing |func|. - install_kfp_package: Specifies if we should add a KFP Python package to - |packages_to_install|. Lightweight Python functions always require - an installation of KFP in |base_image| to work. If you specify - a |base_image| that already contains KFP, you can set this to False. - kfp_package_path: Specifies the location from which to install KFP. By - default, this will try to install from PyPi using the same version - as that used when this component was created. KFP developers can - choose to override this to point to a Github pull request or - other pip-compatible location when testing changes to lightweight - Python functions. - - Returns: - A component task factory that can be used in pipeline definitions. + """Implementation for the @component decorator. + + The decorator is defined under component_decorator.py. See the decorator + for the canonical documentation for this function. """ - component_spec = _func_to_component_spec( + packages_to_install = packages_to_install or [] + + if install_kfp_package and target_image is None: + if kfp_package_path is None: + kfp_package_path = _get_default_kfp_package_path() + packages_to_install.append(kfp_package_path) + + packages_to_install_command = _get_packages_to_install_command( + package_list=packages_to_install) + + command = [] + args = [] + if base_image is None: + base_image = _DEFAULT_BASE_IMAGE + + component_image = base_image + + if target_image: + component_image = target_image + command, args = _get_command_and_args_for_containerized_component( + function_name=func.__name__,) + else: + command, args = _get_command_and_args_for_lightweight_component( + func=func) + + component_spec = extract_component_interface(func) + component_spec.implementation = structures.ContainerImplementation( + container=structures.ContainerSpec( + image=component_image, + command=packages_to_install_command + command, + args=args, + )) + + module_path = pathlib.Path(inspect.getsourcefile(func)) + module_path.resolve() + + component_name = _python_function_name_to_component_name(func.__name__) + component_info = ComponentInfo( + name=component_name, + function_name=func.__name__, func=func, - base_image=base_image, - packages_to_install=packages_to_install, - install_kfp_package=install_kfp_package, - kfp_package_path=kfp_package_path) + target_image=target_image, + module_path=module_path, + component_spec=component_spec, + output_component_file=output_component_file, + base_image=base_image) + + if REGISTERED_MODULES is not None: + REGISTERED_MODULES[component_name] = component_info + if output_component_file: component_spec.save(output_component_file) # TODO(KFPv2): Replace with v2 BaseComponent. - return _components._create_task_factory_from_component_spec(component_spec) + task_factory = _components._create_task_factory_from_component_spec( + component_spec) + + # TODO(KFPv2): Once this returns a BaseComponent, we should check for this + # in the Executor, and get the appropriate callable. For now, we'll look for + # this special attribute to hold the Python function in the task factory + # during runtime. + setattr(task_factory, 'python_func', func) + + return task_factory diff --git a/sdk/python/kfp/v2/components/executor.py b/sdk/python/kfp/v2/components/executor.py index e650c69c138..2e0a57f06ec 100644 --- a/sdk/python/kfp/v2/components/executor.py +++ b/sdk/python/kfp/v2/components/executor.py @@ -22,7 +22,11 @@ class Executor(): """Executor executes v2-based Python function components.""" def __init__(self, executor_input: Dict, function_to_execute: Callable): - self._func = function_to_execute + if hasattr(function_to_execute, 'python_func'): + self._func = function_to_execute.python_func + else: + self._func = function_to_execute + self._input = executor_input self._input_artifacts: Dict[str, artifact_types.Artifact] = {} self._output_artifacts: Dict[str, artifact_types.Artifact] = {} diff --git a/sdk/python/kfp/v2/components/executor_main.py b/sdk/python/kfp/v2/components/executor_main.py index e27e1a2dded..0b535110d96 100644 --- a/sdk/python/kfp/v2/components/executor_main.py +++ b/sdk/python/kfp/v2/components/executor_main.py @@ -13,48 +13,37 @@ # limitations under the License. import argparse import json -import importlib +import logging import os import sys from kfp.v2.components import executor as component_executor +from kfp.v2.components import kfp_config +from kfp.v2.components import utils -def _load_module(module_name: str, module_directory: str): - """Dynamically imports the Python module with the given name and package - path. - - E.g., Assuming there is a file called `my_module.py` under - `/some/directory/my_module`, we can use:: - - _load_module('my_module', '/some/directory') - - to effectively `import mymodule`. - - Args: - module_name: The name of the module. - package_path: The package under which the specified module resides. - """ - module_spec = importlib.util.spec_from_file_location( - name=module_name, - location=os.path.join(module_directory, f'{module_name}.py')) - module = importlib.util.module_from_spec(module_spec) - sys.modules[module_spec.name] = module - module_spec.loader.exec_module(module) - return module +def _setup_logging(): + logging_format = '[KFP Executor %(asctime)s %(levelname)s]: %(message)s' + logging.basicConfig( + stream=sys.stdout, format=logging_format, level=logging.INFO) def executor_main(): + _setup_logging() parser = argparse.ArgumentParser(description='KFP Component Executor.') + parser.add_argument( '--component_module_path', type=str, help='Path to a module containing the KFP component.') + parser.add_argument( '--function_to_execute', type=str, + required=True, help='The name of the component function in ' '--component_module_path file that is to be executed.') + parser.add_argument( '--executor_input', type=str, @@ -63,16 +52,47 @@ def executor_main(): args, _ = parser.parse_known_args() - module_directory = os.path.dirname(args.component_module_path) - module_name = os.path.basename(args.component_module_path)[:-len('.py')] - print('Loading KFP component module {} from dir {}'.format( - module_name, module_directory)) - - module = _load_module( + func_name = args.function_to_execute + module_path = None + module_directory = None + module_name = None + + if args.component_module_path is not None: + logging.info( + 'Looking for component `{}` in --component_module_path `{}`'.format( + func_name, args.component_module_path)) + module_path = args.component_module_path + module_directory = os.path.dirname(args.component_module_path) + module_name = os.path.basename(args.component_module_path)[:-len('.py')] + else: + # Look for module directory using kfp_config.ini + logging.info('--component_module_path is not specified. Looking for' + ' component `{}` in config file `kfp_config.ini`' + ' instead'.format(func_name)) + config = kfp_config.KFPConfig() + components = config.get_components() + if not components: + raise RuntimeError('No components found in `kfp_config.ini`') + try: + module_path = components[func_name] + except KeyError: + raise RuntimeError( + 'Could not find component `{}` in `kfp_config.ini`. Found the ' + ' following components instead:\n{}'.format( + func_name, components)) + + module_directory = str(module_path.parent) + module_name = str(module_path.name)[:-len('.py')] + + logging.info( + 'Loading KFP component "{}" from {} (directory "{}" and module name' + ' "{}")'.format(func_name, module_path, module_directory, module_name)) + + module = utils.load_module( module_name=module_name, module_directory=module_directory) executor_input = json.loads(args.executor_input) - function_to_execute = getattr(module, args.function_to_execute) + function_to_execute = getattr(module, func_name) executor = component_executor.Executor( executor_input=executor_input, function_to_execute=function_to_execute) diff --git a/sdk/python/kfp/v2/components/kfp_config.py b/sdk/python/kfp/v2/components/kfp_config.py new file mode 100644 index 00000000000..d525fd45fea --- /dev/null +++ b/sdk/python/kfp/v2/components/kfp_config.py @@ -0,0 +1,108 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Dict, Optional + +import configparser +import pathlib +import warnings + +_KFP_CONFIG_FILE = 'kfp_config.ini' + +_COMPONENTS_SECTION = 'Components' + + +class KFPConfig(): + """Class for managing KFP component configuration. + + The configuration is .ini file named `kfp_config.ini` that can be parsed by + Python's native configparser module. Currently, this class supports a single + `Components` section, which lists components as key-value pairs. The key is + the component name (i.e. the function name), and the value is the path to + the file containing this function. The path is usually relative from the + location of the configuration file, but absolute paths should also work. + + At runtime, the KFP v2 Executor, defined in executor_main.py, will look + for this configuration file in its current working directory. If found, + it will load its contents, and use this to find the file containing the + component to execute. + + Example of the file's contents: + + [Components] + my_component_1 = my_dir_1/my_component_1.py + my_component_2 = my_dir_2/my_component_2.py + ... + """ + + def __init__(self, config_directory: Optional[pathlib.Path] = None): + """Creates a KFPConfig object. + + Loads the config from an existing `kfp_config.ini` file if found. + + Args: + config_directory: Looks for a file named `kfp_config.ini` in this + directory. Defaults to the current directory. + """ + self._config_parser = configparser.ConfigParser() + # Preserve case for keys. + self._config_parser.optionxform = lambda x: x + + if config_directory is None: + self._config_filepath = pathlib.Path(_KFP_CONFIG_FILE) + else: + self._config_filepath = config_directory / _KFP_CONFIG_FILE + + try: + with open(str(self._config_filepath), 'r') as f: + self._config_parser.read_file(f) + except IOError: + warnings.warn('No existing KFP Config file found') + + if not self._config_parser.has_section(_COMPONENTS_SECTION): + self._config_parser.add_section(_COMPONENTS_SECTION) + + self._components = {} + + def add_component(self, function_name: str, path: pathlib.Path): + """Adds a KFP component. + + Args: + function_name: The name of the component function. + path: A path to the file containing the component. + """ + self._components[function_name] = str(path) + + def save(self): + """Writes out a KFP config file.""" + # Always write out components in alphabetical order for determinism, + # especially in tests. + for function_name in sorted(self._components.keys()): + self._config_parser[_COMPONENTS_SECTION][ + function_name] = self._components[function_name] + + with open(str(self._config_filepath), 'w') as f: + self._config_parser.write(f) + + def get_components(self) -> Dict[str, pathlib.Path]: + """Returns a list of known KFP components. + + Returns: + A dictionary from component name (function name) to a pathlib.Path + pointing to the Python file with this component's definition. + """ + result = { + function_name: pathlib.Path(module_path) for function_name, + module_path in self._config_parser[_COMPONENTS_SECTION].items() + } + return result diff --git a/sdk/python/kfp/v2/components/utils.py b/sdk/python/kfp/v2/components/utils.py index 86a7e35a244..3eaf7ae8895 100644 --- a/sdk/python/kfp/v2/components/utils.py +++ b/sdk/python/kfp/v2/components/utils.py @@ -13,7 +13,35 @@ # limitations under the License. """Definitions of utils methods.""" +import importlib +import os import re +import sys +import types + + +def load_module(module_name: str, module_directory: str) -> types.ModuleType: + """Dynamically imports the Python module with the given name and package + path. + + E.g., Assuming there is a file called `my_module.py` under + `/some/directory/my_module`, we can use:: + + load_module('my_module', '/some/directory') + + to effectively `import mymodule`. + + Args: + module_name: The name of the module. + package_path: The package under which the specified module resides. + """ + module_spec = importlib.util.spec_from_file_location( + name=module_name, + location=os.path.join(module_directory, f'{module_name}.py')) + module = importlib.util.module_from_spec(module_spec) + sys.modules[module_spec.name] = module + module_spec.loader.exec_module(module) + return module def maybe_rename_for_k8s(name: str) -> str: diff --git a/sdk/python/requirements.in b/sdk/python/requirements.in index 29b8374a863..7ae3de7820e 100644 --- a/sdk/python/requirements.in +++ b/sdk/python/requirements.in @@ -27,6 +27,7 @@ requests-toolbelt>=0.8.0,<1 # CLI tabulate>=0.8.6,<1 click>=7.1.2,<9 +typer>=0.3.2,<1.0 # kfp.v2 absl-py>=0.9,<=0.11 diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt index bc368b04045..2d3646acc93 100644 --- a/sdk/python/requirements.txt +++ b/sdk/python/requirements.txt @@ -17,8 +17,10 @@ certifi==2021.5.30 # requests charset-normalizer==2.0.6 # via requests -click>=7.1.2,<9 - # via -r requirements.in +click==8.0.3 + # via + # -r requirements.in + # typer cloudpickle==2.0.0 # via -r requirements.in deprecated==1.2.13 @@ -136,6 +138,8 @@ tabulate==0.8.9 # via -r requirements.in termcolor==1.1.0 # via fire +typer==0.4.0 + # via -r requirements.in typing-extensions==3.10.0.2 # via pydantic uritemplate==3.0.1 diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 3d8ecd88229..555494b55ec 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -54,6 +54,7 @@ 'pydantic>=1.8.2,<2', # Standard library backports 'dataclasses;python_version<"3.7"', + 'typer>=0.3.2,<1.0', 'typing-extensions>=3.7.4,<4;python_version<"3.9"', ] @@ -61,6 +62,10 @@ 'frozendict', ] +EXTRAS_REQUIRE = { + 'all': ['docker'], +} + def find_version(*file_path_parts): here = os.path.abspath(os.path.dirname(__file__)) @@ -85,13 +90,18 @@ def find_version(*file_path_parts): author='The Kubeflow Authors', url="https://github.com/kubeflow/pipelines", project_urls={ - "Documentation": "https://kubeflow-pipelines.readthedocs.io/en/stable/", - "Bug Tracker": "https://github.com/kubeflow/pipelines/issues", - "Source": "https://github.com/kubeflow/pipelines/tree/master/sdk", - "Changelog": "https://github.com/kubeflow/pipelines/blob/master/sdk/RELEASE.md", + "Documentation": + "https://kubeflow-pipelines.readthedocs.io/en/stable/", + "Bug Tracker": + "https://github.com/kubeflow/pipelines/issues", + "Source": + "https://github.com/kubeflow/pipelines/tree/master/sdk", + "Changelog": + "https://github.com/kubeflow/pipelines/blob/master/sdk/RELEASE.md", }, install_requires=REQUIRES, tests_require=TESTS_REQUIRE, + extras_require=EXTRAS_REQUIRE, packages=[ 'kfp', 'kfp.auth',