diff --git a/sdk/python/kfp/cli/__init__.py b/sdk/python/kfp/cli/__init__.py new file mode 100644 index 00000000000..c55179a49ed --- /dev/null +++ b/sdk/python/kfp/cli/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/sdk/python/kfp/cli/cli.py b/sdk/python/kfp/cli/cli.py new file mode 100644 index 00000000000..87f30fb1254 --- /dev/null +++ b/sdk/python/kfp/cli/cli.py @@ -0,0 +1,82 @@ +# Copyright 2018 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys + +import click +import typer +from kfp.cli import components +from kfp.cli.diagnose_me_cli import diagnose_me +from kfp.cli.experiment import experiment +from kfp.cli.output import OutputFormat +from kfp.cli.pipeline import pipeline +from kfp.cli.recurring_run import recurring_run +from kfp.cli.run import run +from kfp.client import Client + +_NO_CLIENT_COMMANDS = ['diagnose_me', 'components'] + + +@click.group() +@click.option('--endpoint', help='Endpoint of the KFP API service to connect.') +@click.option('--iap-client-id', help='Client ID for IAP protected endpoint.') +@click.option( + '-n', + '--namespace', + default='kubeflow', + show_default=True, + help='Kubernetes namespace to connect to the KFP API.') +@click.option( + '--other-client-id', + help='Client ID for IAP protected endpoint to obtain the refresh token.') +@click.option( + '--other-client-secret', + help='Client ID for IAP protected endpoint to obtain the refresh token.') +@click.option( + '--output', + type=click.Choice(list(map(lambda x: x.name, OutputFormat))), + default=OutputFormat.table.name, + show_default=True, + help='The formatting style for command output.') +@click.pass_context +def cli(ctx: click.Context, endpoint: str, iap_client_id: str, namespace: str, + other_client_id: str, other_client_secret: str, output: OutputFormat): + """kfp is the command line interface to KFP service. + + Feature stage: + [Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha) + """ + if ctx.invoked_subcommand in _NO_CLIENT_COMMANDS: + # Do not create a client for these subcommands + return + ctx.obj['client'] = Client(endpoint, iap_client_id, namespace, + other_client_id, other_client_secret) + ctx.obj['namespace'] = namespace + ctx.obj['output'] = output + + +def main(): + logging.basicConfig(format='%(message)s', level=logging.INFO) + cli.add_command(run) + cli.add_command(recurring_run) + cli.add_command(pipeline) + cli.add_command(diagnose_me, 'diagnose_me') + cli.add_command(experiment) + cli.add_command(typer.main.get_command(components.app)) + try: + cli(obj={}, auto_envvar_prefix='KFP') + except Exception as e: + click.echo(str(e), err=True) + sys.exit(1) diff --git a/sdk/python/kfp/cli/components.py b/sdk/python/kfp/cli/components.py new file mode 100644 index 00000000000..81f25df6ae1 --- /dev/null +++ b/sdk/python/kfp/cli/components.py @@ -0,0 +1,409 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import configparser +import contextlib +import enum +import pathlib +import shutil +import subprocess +import tempfile +from typing import Any, List, Optional + +_DOCKER_IS_PRESENT = True +try: + import docker +except ImportError: + _DOCKER_IS_PRESENT = False + +import kfp as kfp +import typer +from kfp.components import component_factory, kfp_config, utils + +_REQUIREMENTS_TXT = 'requirements.txt' + +_DOCKERFILE = 'Dockerfile' + +_DOCKERFILE_TEMPLATE = ''' +FROM {base_image} + +WORKDIR {component_root_dir} +COPY requirements.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt +{maybe_copy_kfp_package} +RUN pip install --no-cache-dir {kfp_package_path} +COPY . . +''' + +_DOCKERIGNORE = '.dockerignore' + +# Location in which to write out shareable YAML for components. +_COMPONENT_METADATA_DIR = 'component_metadata' + +_DOCKERIGNORE_TEMPLATE = ''' +{}/ +'''.format(_COMPONENT_METADATA_DIR) + +# Location at which v2 Python function-based components will stored +# in containerized components. +_COMPONENT_ROOT_DIR = pathlib.Path('/usr/local/src/kfp/components') + + +@contextlib.contextmanager +def _registered_modules(): + registered_modules = {} + component_factory.REGISTERED_MODULES = registered_modules + try: + yield registered_modules + finally: + component_factory.REGISTERED_MODULES = None + + +class _Engine(str, enum.Enum): + """Supported container build engines.""" + DOCKER = 'docker' + KANIKO = 'kaniko' + CLOUD_BUILD = 'cloudbuild' + + +app = typer.Typer() + + +def _info(message: Any): + info = typer.style('INFO', fg=typer.colors.GREEN) + typer.echo('{}: {}'.format(info, message)) + + +def _warning(message: Any): + info = typer.style('WARNING', fg=typer.colors.YELLOW) + typer.echo('{}: {}'.format(info, message)) + + +def _error(message: Any): + info = typer.style('ERROR', fg=typer.colors.RED) + typer.echo('{}: {}'.format(info, message)) + + +class _ComponentBuilder(): + """Helper class for building containerized v2 KFP components.""" + + def __init__( + self, + context_directory: pathlib.Path, + kfp_package_path: Optional[pathlib.Path] = None, + component_filepattern: str = '**/*.py', + ): + """ComponentBuilder builds containerized components. + + Args: + context_directory: Directory containing one or more Python files + with one or more KFP v2 components. + kfp_package_path: Path to a pip-installable location for KFP. + This can either be pointing to KFP SDK root directory located in + a local clone of the KFP repo, or a git+https location. + If left empty, defaults to KFP on PyPi. + """ + self._context_directory = context_directory + self._dockerfile = self._context_directory / _DOCKERFILE + self._component_filepattern = component_filepattern + self._components: List[ + component_factory.component_factory.ComponentInfo] = [] + + # This is only set if we need to install KFP from local copy. + self._maybe_copy_kfp_package = '' + + if kfp_package_path is None: + self._kfp_package_path = 'kfp=={}'.format(kfp.__version__) + elif kfp_package_path.is_dir(): + _info('Building KFP package from local directory {}'.format( + typer.style(str(kfp_package_path), fg=typer.colors.CYAN))) + temp_dir = pathlib.Path(tempfile.mkdtemp()) + try: + subprocess.run([ + 'python3', + kfp_package_path / 'setup.py', + 'bdist_wheel', + '--dist-dir', + str(temp_dir), + ], + cwd=kfp_package_path) + wheel_files = list(temp_dir.glob('*.whl')) + if len(wheel_files) != 1: + _error('Failed to find built KFP wheel under {}'.format( + temp_dir)) + raise typer.Exit(1) + + wheel_file = wheel_files[0] + shutil.copy(wheel_file, self._context_directory) + self._kfp_package_path = wheel_file.name + self._maybe_copy_kfp_package = 'COPY {wheel_name} {wheel_name}'.format( + wheel_name=self._kfp_package_path) + except subprocess.CalledProcessError as e: + _error('Failed to build KFP wheel locally:\n{}'.format(e)) + raise typer.Exit(1) + finally: + _info('Cleaning up temporary directory {}'.format(temp_dir)) + shutil.rmtree(temp_dir) + else: + self._kfp_package_path = kfp_package_path + + _info('Building component using KFP package path: {}'.format( + typer.style(str(self._kfp_package_path), fg=typer.colors.CYAN))) + + self._context_directory_files = [ + file.name + for file in self._context_directory.glob('*') + if file.is_file() + ] + + self._component_files = [ + file for file in self._context_directory.glob( + self._component_filepattern) if file.is_file() + ] + + self._base_image = None + self._target_image = None + self._load_components() + + def _load_components(self): + if not self._component_files: + _error( + 'No component files found matching pattern `{}` in directory {}' + .format(self._component_filepattern, self._context_directory)) + raise typer.Exit(1) + + for python_file in self._component_files: + with _registered_modules() as component_modules: + module_name = python_file.name[:-len('.py')] + module_directory = python_file.parent + utils.load_module( + module_name=module_name, module_directory=module_directory) + + formatted_module_file = typer.style( + str(python_file), fg=typer.colors.CYAN) + if not component_modules: + _error('No KFP components found in file {}'.format( + formatted_module_file)) + raise typer.Exit(1) + + _info('Found {} component(s) in file {}:'.format( + len(component_modules), formatted_module_file)) + for name, component in component_modules.items(): + _info('{}: {}'.format(name, component)) + self._components.append(component) + + base_images = set([info.base_image for info in self._components]) + target_images = set([info.target_image for info in self._components]) + + if len(base_images) != 1: + _error('Found {} unique base_image values {}. Components' + ' must specify the same base_image and target_image.'.format( + len(base_images), base_images)) + raise typer.Exit(1) + + self._base_image = base_images.pop() + if self._base_image is None: + _error('Did not find a base_image specified in any of the' + ' components. A base_image must be specified in order to' + ' build the component.') + raise typer.Exit(1) + _info('Using base image: {}'.format( + typer.style(self._base_image, fg=typer.colors.YELLOW))) + + if len(target_images) != 1: + _error('Found {} unique target_image values {}. Components' + ' must specify the same base_image and' + ' target_image.'.format(len(target_images), target_images)) + raise typer.Exit(1) + + self._target_image = target_images.pop() + if self._target_image is None: + _error('Did not find a target_image specified in any of the' + ' components. A target_image must be specified in order' + ' to build the component.') + raise typer.Exit(1) + _info('Using target image: {}'.format( + typer.style(self._target_image, fg=typer.colors.YELLOW))) + + def _maybe_write_file(self, + filename: str, + contents: str, + overwrite: bool = False): + formatted_filename = typer.style(filename, fg=typer.colors.CYAN) + if filename in self._context_directory_files: + _info('Found existing file {} under {}.'.format( + formatted_filename, self._context_directory)) + if not overwrite: + _info('Leaving this file untouched.') + return + else: + _warning( + 'Overwriting existing file {}'.format(formatted_filename)) + else: + _warning('{} not found under {}. Creating one.'.format( + formatted_filename, self._context_directory)) + + filepath = self._context_directory / filename + with open(filepath, 'w') as f: + f.write('# Generated by KFP.\n{}'.format(contents)) + _info('Generated file {}.'.format(filepath)) + + def maybe_generate_requirements_txt(self): + self._maybe_write_file(_REQUIREMENTS_TXT, '') + + def maybe_generate_dockerignore(self): + self._maybe_write_file(_DOCKERIGNORE, _DOCKERIGNORE_TEMPLATE) + + def write_component_files(self): + for component_info in self._components: + filename = ( + component_info.output_component_file or + component_info.function_name + '.yaml') + container_filename = ( + self._context_directory / _COMPONENT_METADATA_DIR / filename) + container_filename.parent.mkdir(exist_ok=True, parents=True) + component_info.component_spec.save_to_component_yaml( + str(container_filename)) + + def generate_kfp_config(self): + config = kfp_config.KFPConfig(config_directory=self._context_directory) + for component_info in self._components: + relative_path = component_info.module_path.relative_to( + self._context_directory) + config.add_component( + function_name=component_info.function_name, path=relative_path) + config.save() + + def maybe_generate_dockerfile(self, overwrite_dockerfile: bool = False): + dockerfile_contents = _DOCKERFILE_TEMPLATE.format( + base_image=self._base_image, + maybe_copy_kfp_package=self._maybe_copy_kfp_package, + component_root_dir=_COMPONENT_ROOT_DIR, + kfp_package_path=self._kfp_package_path) + + self._maybe_write_file(_DOCKERFILE, dockerfile_contents, + overwrite_dockerfile) + + def build_image(self, push_image: bool = True): + _info('Building image {} using Docker...'.format( + typer.style(self._target_image, fg=typer.colors.YELLOW))) + client = docker.from_env() + + docker_log_prefix = typer.style('Docker', fg=typer.colors.CYAN) + + try: + context = str(self._context_directory) + logs = client.api.build( + path=context, + dockerfile='Dockerfile', + tag=self._target_image, + decode=True, + ) + for log in logs: + message = log.get('stream', '').rstrip('\n') + if message: + _info('{}: {}'.format(docker_log_prefix, message)) + + except docker.errors.BuildError as e: + for log in e.build_log: + message = log.get('message', '').rstrip('\n') + if message: + _error('{}: {}'.format(docker_log_prefix, message)) + _error('{}: {}'.format(docker_log_prefix, e)) + raise typer.Exit(1) + + if not push_image: + return + + _info('Pushing image {}...'.format( + typer.style(self._target_image, fg=typer.colors.YELLOW))) + + try: + response = client.images.push( + self._target_image, stream=True, decode=True) + for log in response: + status = log.get('status', '').rstrip('\n') + layer = log.get('id', '') + if status: + _info('{}: {} {}'.format(docker_log_prefix, layer, status)) + except docker.errors.BuildError as e: + _error('{}: {}'.format(docker_log_prefix, e)) + raise e + + _info('Built and pushed component container {}'.format( + typer.style(self._target_image, fg=typer.colors.YELLOW))) + + +@app.callback() +def components(): + """Builds shareable, containerized components.""" + pass + + +@app.command() +def build(components_directory: pathlib.Path = typer.Argument( + ..., + help="Path to a directory containing one or more Python" + " files with KFP v2 components. The container will be built" + " with this directory as the context."), + component_filepattern: str = typer.Option( + '**/*.py', + help="Filepattern to use when searching for KFP components. The" + " default searches all Python files in the specified directory."), + engine: _Engine = typer.Option( + _Engine.DOCKER, + help="Engine to use to build the component's container."), + kfp_package_path: Optional[pathlib.Path] = typer.Option( + None, help="A pip-installable path to the KFP package."), + overwrite_dockerfile: bool = typer.Option( + False, + help="Set this to true to always generate a Dockerfile" + " as part of the build process"), + push_image: bool = typer.Option( + True, help="Push the built image to its remote repository.")): + """Builds containers for KFP v2 Python-based components.""" + components_directory = components_directory.resolve() + if not components_directory.is_dir(): + _error('{} does not seem to be a valid directory.'.format( + components_directory)) + raise typer.Exit(1) + + if engine != _Engine.DOCKER: + _error('Currently, only `docker` is supported for --engine.') + raise typer.Exit(1) + + if engine == _Engine.DOCKER: + if not _DOCKER_IS_PRESENT: + _error( + 'The `docker` Python package was not found in the current' + ' environment. Please run `pip install docker` to install it.' + ' Optionally, you can also install KFP with all of its' + ' optional dependencies by running `pip install kfp[all]`.') + raise typer.Exit(1) + + builder = _ComponentBuilder( + context_directory=components_directory, + kfp_package_path=kfp_package_path, + component_filepattern=component_filepattern, + ) + builder.write_component_files() + builder.generate_kfp_config() + + builder.maybe_generate_requirements_txt() + builder.maybe_generate_dockerignore() + builder.maybe_generate_dockerfile(overwrite_dockerfile=overwrite_dockerfile) + builder.build_image(push_image=push_image) + + +if __name__ == '__main__': + app() diff --git a/sdk/python/kfp/cli/components_test.py b/sdk/python/kfp/cli/components_test.py new file mode 100644 index 00000000000..bc76a765e1a --- /dev/null +++ b/sdk/python/kfp/cli/components_test.py @@ -0,0 +1,494 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for `components` command group in KFP CLI.""" +import contextlib +import importlib +import pathlib +import sys +import textwrap +import unittest +from typing import List, Optional, Union +from unittest import mock + +from typer import testing + +# Docker is an optional install, but we need the import to succeed for tests. +# So we patch it before importing kfp.cli.components. +try: + import docker # pylint: disable=unused-import +except ImportError: + sys.modules['docker'] = mock.Mock() +from kfp.cli import components +from kfp.deprecated.cli import components + +_COMPONENT_TEMPLATE = ''' +from kfp.dsl import * + +@component( + base_image={base_image}, + target_image={target_image}, + output_component_file={output_component_file}) +def {func_name}(): + pass +''' + + +def _make_component(func_name: str, + base_image: Optional[str] = None, + target_image: Optional[str] = None, + output_component_file: Optional[str] = None) -> str: + return textwrap.dedent(''' + from kfp.dsl import * + + @component( + base_image={base_image}, + target_image={target_image}, + output_component_file={output_component_file}) + def {func_name}(): + pass + ''').format( + base_image=repr(base_image), + target_image=repr(target_image), + output_component_file=repr(output_component_file), + func_name=func_name) + + +def _write_file(filename: str, file_contents: str): + filepath = pathlib.Path(filename) + filepath.parent.mkdir(exist_ok=True, parents=True) + filepath.write_text(file_contents) + + +def _write_components(filename: str, component_template: Union[List[str], str]): + if isinstance(component_template, list): + file_contents = '\n\n'.join(component_template) + else: + file_contents = component_template + _write_file(filename=filename, file_contents=file_contents) + + +class Test(unittest.TestCase): + + def setUp(self) -> None: + self._runner = testing.CliRunner() + components._DOCKER_IS_PRESENT = True + + patcher = mock.patch('docker.from_env') + self._docker_client = patcher.start().return_value + self._docker_client.images.build.return_value = [{ + 'stream': 'Build logs' + }] + self._docker_client.images.push.return_value = [{'status': 'Pushed'}] + self.addCleanup(patcher.stop) + + self._app = components.app + with contextlib.ExitStack() as stack: + stack.enter_context(self._runner.isolated_filesystem()) + self._working_dir = pathlib.Path.cwd() + self.addCleanup(stack.pop_all().close) + + return super().setUp() + + def assertFileExists(self, path: str): + path_under_test_dir = self._working_dir / path + self.assertTrue(path_under_test_dir, f'File {path} does not exist!') + + def assertFileExistsAndContains(self, path: str, expected_content: str): + self.assertFileExists(path) + path_under_test_dir = self._working_dir / path + got_content = path_under_test_dir.read_text() + self.assertEqual(got_content, expected_content) + + def testKFPConfigForSingleFile(self): + preprocess_component = _make_component( + func_name='preprocess', target_image='custom-image') + train_component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', + [preprocess_component, train_component]) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + preprocess = components.py + train = components.py + + ''')) + + def testKFPConfigForSingleFileUnderNestedDirectory(self): + preprocess_component = _make_component( + func_name='preprocess', target_image='custom-image') + train_component = _make_component( + func_name='train', target_image='custom-image') + _write_components('dir1/dir2/dir3/components.py', + [preprocess_component, train_component]) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + preprocess = dir1/dir2/dir3/components.py + train = dir1/dir2/dir3/components.py + + ''')) + + def testKFPConfigForMultipleFiles(self): + component = _make_component( + func_name='preprocess', target_image='custom-image') + _write_components('preprocess_component.py', component) + + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('train_component.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + preprocess = preprocess_component.py + train = train_component.py + + ''')) + + def testKFPConfigForMultipleFilesUnderNestedDirectories(self): + component = _make_component( + func_name='preprocess', target_image='custom-image') + _write_components('preprocess/preprocess_component.py', component) + + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('train/train_component.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + preprocess = preprocess/preprocess_component.py + train = train/train_component.py + + ''')) + + def testTargetImageMustBeTheSameInAllComponents(self): + component_one = _make_component(func_name='one', target_image='image-1') + component_two = _make_component(func_name='two', target_image='image-1') + _write_components('one_two/one_two.py', [component_one, component_two]) + + component_three = _make_component( + func_name='three', target_image='image-2') + component_four = _make_component( + func_name='four', target_image='image-3') + _write_components('three_four/three_four.py', + [component_three, component_four]) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 1) + + def testTargetImageMustBeTheSameInAllComponents(self): + component_one = _make_component( + func_name='one', base_image='image-1', target_image='target-image') + component_two = _make_component( + func_name='two', base_image='image-1', target_image='target-image') + _write_components('one_two/one_two.py', [component_one, component_two]) + + component_three = _make_component( + func_name='three', + base_image='image-2', + target_image='target-image') + component_four = _make_component( + func_name='four', base_image='image-3', target_image='target-image') + _write_components('three_four/three_four.py', + [component_three, component_four]) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 1) + + def testComponentFilepatternCanBeUsedToRestrictDiscovery(self): + component = _make_component( + func_name='preprocess', target_image='custom-image') + _write_components('preprocess/preprocess_component.py', component) + + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('train/train_component.py', component) + + result = self._runner.invoke( + self._app, + [ + 'build', + str(self._working_dir), '--component-filepattern=train/*' + ], + ) + self.assertEqual(result.exit_code, 0) + + self.assertFileExistsAndContains( + 'kfp_config.ini', + textwrap.dedent('''\ + [Components] + train = train/train_component.py + + ''')) + + def testEmptyRequirementsTxtFileIsGenerated(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke(self._app, + ['build', str(self._working_dir)]) + self.assertEqual(result.exit_code, 0) + self.assertFileExistsAndContains('requirements.txt', + '# Generated by KFP.\n') + + def testExistingRequirementsTxtFileIsUnchanged(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + _write_file('requirements.txt', 'Some pre-existing content') + + result = self._runner.invoke(self._app, + ['build', str(self._working_dir)]) + self.assertEqual(result.exit_code, 0) + self.assertFileExistsAndContains('requirements.txt', + 'Some pre-existing content') + + def testDockerignoreFileIsGenerated(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke(self._app, + ['build', str(self._working_dir)]) + self.assertEqual(result.exit_code, 0) + self.assertFileExistsAndContains( + '.dockerignore', + textwrap.dedent('''\ + # Generated by KFP. + + component_metadata/ + ''')) + + def testExistingDockerignoreFileIsUnchanged(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + _write_file('.dockerignore', 'Some pre-existing content') + + result = self._runner.invoke(self._app, + ['build', str(self._working_dir)]) + self.assertEqual(result.exit_code, 0) + self.assertFileExistsAndContains('.dockerignore', + 'Some pre-existing content') + + def testDockerEngineIsSupported(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--engine=docker']) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self._docker_client.images.push.assert_called_once_with( + 'custom-image', stream=True, decode=True) + + def testKanikoEngineIsNotSupported(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--engine=kaniko'], + ) + self.assertEqual(result.exit_code, 1) + self._docker_client.api.build.assert_not_called() + self._docker_client.images.push.assert_not_called() + + def testCloudBuildEngineIsNotSupported(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--engine=cloudbuild'], + ) + self.assertEqual(result.exit_code, 1) + self._docker_client.api.build.assert_not_called() + self._docker_client.images.push.assert_not_called() + + def testDockerClientIsCalledToBuildAndPushByDefault(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + + self._docker_client.api.build.assert_called_once() + self._docker_client.images.push.assert_called_once_with( + 'custom-image', stream=True, decode=True) + + def testDockerClientIsCalledToBuildButSkipsPushing(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--no-push-image'], + ) + self.assertEqual(result.exit_code, 0) + + self._docker_client.api.build.assert_called_once() + self._docker_client.images.push.assert_not_called() + + @mock.patch('kfp.__version__', '1.2.3') + def testDockerfileIsCreatedCorrectly(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self.assertFileExistsAndContains( + 'Dockerfile', + textwrap.dedent('''\ + # Generated by KFP. + + FROM python:3.7 + + WORKDIR /usr/local/src/kfp/components + COPY requirements.txt requirements.txt + RUN pip install --no-cache-dir -r requirements.txt + + RUN pip install --no-cache-dir kfp==1.8.11 + COPY . . + ''')) + + def testExistingDockerfileIsUnchangedByDefault(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + _write_file('Dockerfile', 'Existing Dockerfile contents') + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir)], + ) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self.assertFileExistsAndContains('Dockerfile', + 'Existing Dockerfile contents') + + @mock.patch('kfp.__version__', '1.2.3') + def testExistingDockerfileCanBeOverwritten(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + _write_file('Dockerfile', 'Existing Dockerfile contents') + + result = self._runner.invoke( + self._app, + ['build', str(self._working_dir), '--overwrite-dockerfile'], + ) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self.assertFileExistsAndContains( + 'Dockerfile', + textwrap.dedent('''\ + # Generated by KFP. + + FROM python:3.7 + + WORKDIR /usr/local/src/kfp/components + COPY requirements.txt requirements.txt + RUN pip install --no-cache-dir -r requirements.txt + + RUN pip install --no-cache-dir kfp==1.8.11 + COPY . . + ''')) + + def testDockerfileCanContainCustomKFPPackage(self): + component = _make_component( + func_name='train', target_image='custom-image') + _write_components('components.py', component) + + result = self._runner.invoke( + self._app, + [ + 'build', + str(self._working_dir), + '--kfp-package-path=/Some/localdir/containing/kfp/source' + ], + ) + self.assertEqual(result.exit_code, 0) + self._docker_client.api.build.assert_called_once() + self.assertFileExistsAndContains( + 'Dockerfile', + textwrap.dedent('''\ + # Generated by KFP. + + FROM python:3.7 + + WORKDIR /usr/local/src/kfp/components + COPY requirements.txt requirements.txt + RUN pip install --no-cache-dir -r requirements.txt + + RUN pip install --no-cache-dir /Some/localdir/containing/kfp/source + COPY . . + ''')) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/cli/diagnose_me/__init__.py b/sdk/python/kfp/cli/diagnose_me/__init__.py new file mode 100644 index 00000000000..5d32951430f --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/sdk/python/kfp/cli/diagnose_me/dev_env.py b/sdk/python/kfp/cli/diagnose_me/dev_env.py new file mode 100644 index 00000000000..e32dc85a012 --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/dev_env.py @@ -0,0 +1,72 @@ +# Lint as: python3 +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Functions for diagnostic data collection from development development.""" + +import enum + +from kfp.cli.diagnose_me import utility + + +class Commands(enum.Enum): + """Enum for gcloud and gsutil commands.""" + PIP3_LIST = 1 + PYTHON3_PIP_LIST = 2 + PIP3_VERSION = 3 + PYHYON3_PIP_VERSION = 4 + WHICH_PYHYON3 = 5 + WHICH_PIP3 = 6 + + +_command_string = { + Commands.PIP3_LIST: 'pip3 list', + Commands.PYTHON3_PIP_LIST: 'python3 -m pip list', + Commands.PIP3_VERSION: 'pip3 -V', + Commands.PYHYON3_PIP_VERSION: 'python3 -m pip -V', + Commands.WHICH_PYHYON3: 'which python3', + Commands.WHICH_PIP3: 'which pip3', +} + + +def get_dev_env_configuration( + configuration: Commands, + human_readable: bool = False) -> utility.ExecutorResponse: + """Captures the specified environment configuration. + + Captures the developement environment configuration including PIP version and + Phython version as specifeid by configuration + + Args: + configuration: Commands for specific information to be retrieved + - PIP3LIST: captures pip3 freeze results + - PYTHON3PIPLIST: captuers python3 -m pip freeze results + - PIP3VERSION: captuers pip3 -V results + - PYHYON3PIPVERSION: captuers python3 -m pip -V results + human_readable: If true all output will be in human readable form insted of + Json. + + Returns: + A utility.ExecutorResponse with the output results for the specified + command. + """ + command_list = _command_string[configuration].split(' ') + if not human_readable and configuration not in ( + Commands.PIP3_VERSION, + Commands.PYHYON3_PIP_VERSION, + Commands.WHICH_PYHYON3, + Commands.WHICH_PIP3, + ): + command_list.extend(['--format', 'json']) + + return utility.ExecutorResponse().execute_command(command_list) diff --git a/sdk/python/kfp/cli/diagnose_me/dev_env_test.py b/sdk/python/kfp/cli/diagnose_me/dev_env_test.py new file mode 100644 index 00000000000..f859e724c45 --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/dev_env_test.py @@ -0,0 +1,63 @@ +# Lint as: python3 +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Integration tests for diagnose_me.dev_env.""" + +import unittest +from typing import Text +from unittest import mock + +from kfp.cli.diagnose_me import dev_env, utility + + +class DevEnvTest(unittest.TestCase): + + def test_Commands(self): + """Verify commands are formaated properly.""" + for command in dev_env.Commands: + self.assertIsInstance(dev_env._command_string[command], Text) + self.assertNotIn('\t', dev_env._command_string[command]) + self.assertNotIn('\n', dev_env._command_string[command]) + + @mock.patch.object(utility, 'ExecutorResponse', autospec=True) + def test_dev_env_configuration(self, mock_executor_response): + """Tests dev_env command execution.""" + dev_env.get_dev_env_configuration(dev_env.Commands.PIP3_LIST) + mock_executor_response().execute_command.assert_called_with( + ['pip3', 'list', '--format', 'json']) + + @mock.patch.object(utility, 'ExecutorResponse', autospec=True) + def test_dev_env_configuration_human_readable(self, mock_executor_response): + """Tests dev_env command execution.""" + dev_env.get_dev_env_configuration( + dev_env.Commands.PIP3_LIST, human_readable=True) + mock_executor_response().execute_command.assert_called_with( + ['pip3', 'list']) + + @mock.patch.object(utility, 'ExecutorResponse', autospec=True) + def test_dev_env_configuration_version(self, mock_executor_response): + """Tests dev_env command execution.""" + # human readable = false should not set format flag for version calls + dev_env.get_dev_env_configuration( + dev_env.Commands.PIP3_VERSION, human_readable=False) + mock_executor_response().execute_command.assert_called_with( + ['pip3', '-V']) + dev_env.get_dev_env_configuration( + dev_env.Commands.PYHYON3_PIP_VERSION, human_readable=False) + mock_executor_response().execute_command.assert_called_with( + ['python3', '-m', 'pip', '-V']) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/cli/diagnose_me/gcp.py b/sdk/python/kfp/cli/diagnose_me/gcp.py new file mode 100644 index 00000000000..5dc47b1baae --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/gcp.py @@ -0,0 +1,153 @@ +# Lint as: python3 +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Functions for collecting GCP related environment configurations.""" + +import enum +from typing import List, Optional, Text + +from kfp.cli.diagnose_me import utility + + +class Commands(enum.Enum): + """Enum for gcloud and gsutil commands.""" + GET_APIS = 1 + GET_CONTAINER_CLUSTERS = 2 + GET_CONTAINER_IMAGES = 3 + GET_DISKS = 4 + GET_GCLOUD_DEFAULT = 5 + GET_NETWORKS = 6 + GET_QUOTAS = 7 + GET_SCOPES = 8 + GET_SERVICE_ACCOUNTS = 9 + GET_STORAGE_BUCKETS = 10 + GET_GCLOUD_VERSION = 11 + GET_AUTH_LIST = 12 + + +_command_string = { + Commands.GET_APIS: 'services list', + Commands.GET_CONTAINER_CLUSTERS: 'container clusters list', + Commands.GET_CONTAINER_IMAGES: 'container images list', + Commands.GET_DISKS: 'compute disks list', + Commands.GET_GCLOUD_DEFAULT: 'config list --all', + Commands.GET_NETWORKS: 'compute networks list', + Commands.GET_QUOTAS: 'compute regions list', + Commands.GET_SCOPES: 'compute instances list', + Commands.GET_SERVICE_ACCOUNTS: 'iam service-accounts list', + Commands.GET_STORAGE_BUCKETS: 'ls', + Commands.GET_GCLOUD_VERSION: 'version', + Commands.GET_AUTH_LIST: 'auth list', +} + + +def execute_gcloud_command( + gcloud_command_list: List[Text], + project_id: Optional[Text] = None, + human_readable: Optional[bool] = False) -> utility.ExecutorResponse: + """Function for invoking gcloud command. + + Args: + gcloud_command_list: a command string list to be past to gcloud example + format is ['config', 'list', '--all'] + project_id: specificies the project to run the commands against if not + provided provided will use gcloud default project if one is configured + otherwise will return an error message. + human_readable: If false sets parameter --format json for all calls, + otherwie output will be in human readable format. + + Returns: + utility.ExecutorResponse with outputs from stdout,stderr and execution code. + """ + command_list = ['gcloud'] + command_list.extend(gcloud_command_list) + if not human_readable: + command_list.extend(['--format', 'json']) + + if project_id is not None: + command_list.extend(['--project', project_id]) + + return utility.ExecutorResponse().execute_command(command_list) + + +def execute_gsutil_command( + gsutil_command_list: List[Text], + project_id: Optional[Text] = None) -> utility.ExecutorResponse: + """Function for invoking gsutil command. + + This function takes in a gsutil parameter list and returns the results as a + list of dictionaries. + Args: + gsutil_command_list: a command string list to be past to gsutil example + format is ['config', 'list', '--all'] + project_id: specific project to check the QUOTASs for,if no project id is + provided will use gcloud default project if one is configured otherwise + will return an erro massage. + + Returns: + utility.ExecutorResponse with outputs from stdout,stderr and execution code. + """ + command_list = ['gsutil'] + command_list.extend(gsutil_command_list) + + if project_id is not None: + command_list.extend(['-p', project_id]) + + return utility.ExecutorResponse().execute_command(command_list) + + +def get_gcp_configuration( + configuration: Commands, + project_id: Optional[Text] = None, + human_readable: Optional[bool] = False) -> utility.ExecutorResponse: + """Captures the specified environment configuration. + + Captures the environment configuration for the specified setting such as + NETWORKSing configuration, project QUOTASs, etc. + + Args: + configuration: Commands for specific information to be retrieved + - APIS: Captures a complete list of enabled APISs and their configuration + details under the specified project. + - CONTAINER_CLUSTERS: List all visible k8 clusters under the project. + - CONTAINER_IMAGES: List of all container images under the project + container repo. + - DISKS: List of storage allocated by the project including notebook + instances as well as k8 pds with corresponding state. + - GCLOUD_DEFAULT: Environment default configuration for gcloud + - NETWORKS: List all NETWORKSs and their configuration under the project. + - QUOTAS: Captures a complete list of QUOTASs for project per + region,returns the results as a list of dictionaries. + - SCOPES: list of SCOPESs for each compute resources in the project. + - SERVICE_ACCOUNTS: List of all service accounts that are enabled under + this project. + - STORAGE_BUCKETS: list of buckets and corresponding access information. + project_id: specific project to check the QUOTASs for,if no project id is + provided will use gcloud default project if one is configured otherwise + will return an error message. + human_readable: If true all output will be in human readable form insted of + Json. + + Returns: + A utility.ExecutorResponse with the output results for the specified + command. + """ + # storage bucket call requires execute_gsutil_command + if configuration is Commands.GET_STORAGE_BUCKETS: + return execute_gsutil_command( + [_command_string[Commands.GET_STORAGE_BUCKETS]], project_id) + + # For all other cases can execute the command directly + return execute_gcloud_command(_command_string[configuration].split(' '), + project_id, human_readable) diff --git a/sdk/python/kfp/cli/diagnose_me/gcp_test.py b/sdk/python/kfp/cli/diagnose_me/gcp_test.py new file mode 100644 index 00000000000..de441559868 --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/gcp_test.py @@ -0,0 +1,87 @@ +# Lint as: python3 +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for diagnose_me.gcp.""" + +import unittest +from typing import Text +from unittest import mock + +from kfp.cli.diagnose_me import gcp, utility + + +class GoogleCloudTest(unittest.TestCase): + + @mock.patch.object(gcp, 'execute_gcloud_command', autospec=True) + def test_project_configuration_gcloud(self, mock_execute_gcloud_command): + """Tests gcloud commands.""" + gcp.get_gcp_configuration(gcp.Commands.GET_APIS) + mock_execute_gcloud_command.assert_called_once_with( + ['services', 'list'], project_id=None, human_readable=False) + + @mock.patch.object(gcp, 'execute_gsutil_command', autospec=True) + def test_project_configuration_gsutil(self, mock_execute_gsutil_command): + """Test Gsutil commands.""" + gcp.get_gcp_configuration(gcp.Commands.GET_STORAGE_BUCKETS) + mock_execute_gsutil_command.assert_called_once_with(['ls'], + project_id=None) + + def test_Commands(self): + """Verify commands are formaated properly.""" + for command in gcp.Commands: + self.assertIsInstance(gcp._command_string[command], Text) + self.assertNotIn('\t', gcp._command_string[command]) + self.assertNotIn('\n', gcp._command_string[command]) + + @mock.patch.object(utility, 'ExecutorResponse', autospec=True) + def test_execute_gsutil_command(self, mock_executor_response): + """Test execute_gsutil_command.""" + gcp.execute_gsutil_command( + [gcp._command_string[gcp.Commands.GET_STORAGE_BUCKETS]]) + mock_executor_response().execute_command.assert_called_once_with( + ['gsutil', 'ls']) + + gcp.execute_gsutil_command( + [gcp._command_string[gcp.Commands.GET_STORAGE_BUCKETS]], + project_id='test_project') + mock_executor_response().execute_command.assert_called_with( + ['gsutil', 'ls', '-p', 'test_project']) + + @mock.patch.object(utility, 'ExecutorResponse', autospec=True) + def test_execute_gcloud_command(self, mock_executor_response): + """Test execute_gcloud_command.""" + gcp.execute_gcloud_command( + gcp._command_string[gcp.Commands.GET_APIS].split(' ')) + mock_executor_response().execute_command.assert_called_once_with( + ['gcloud', 'services', 'list', '--format', 'json']) + + gcp.execute_gcloud_command( + gcp._command_string[gcp.Commands.GET_APIS].split(' '), + project_id='test_project') + # verify project id is added correctly + mock_executor_response().execute_command.assert_called_with([ + 'gcloud', 'services', 'list', '--format', 'json', '--project', + 'test_project' + ]) + # verify human_readable removes json fromat flag + gcp.execute_gcloud_command( + gcp._command_string[gcp.Commands.GET_APIS].split(' '), + project_id='test_project', + human_readable=True) + mock_executor_response().execute_command.assert_called_with( + ['gcloud', 'services', 'list', '--project', 'test_project']) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py new file mode 100644 index 00000000000..9d0f4e0cc73 --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py @@ -0,0 +1,125 @@ +# Lint as: python3 +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Functions for collecting diagnostic information on Kubernetes cluster.""" + +import enum +from typing import List, Text + +from kfp.cli.diagnose_me import utility + + +class Commands(enum.Enum): + """Enum for kubernetes commands.""" + GET_CONFIGURED_CONTEXT = 1 + GET_PODS = 2 + GET_PVCS = 3 + GET_PVS = 4 + GET_SECRETS = 5 + GET_SERVICES = 6 + GET_KUBECTL_VERSION = 7 + GET_CONFIG_MAPS = 8 + + +_command_string = { + Commands.GET_CONFIGURED_CONTEXT: 'config view', + Commands.GET_PODS: 'get pods', + Commands.GET_PVCS: 'get pvc', + Commands.GET_PVS: 'get pv', + Commands.GET_SECRETS: 'get secrets', + Commands.GET_SERVICES: 'get services', + Commands.GET_KUBECTL_VERSION: 'version', + Commands.GET_CONFIG_MAPS: 'get configmaps', +} + + +def execute_kubectl_command( + kubectl_command_list: List[Text], + human_readable: bool = False) -> utility.ExecutorResponse: + """Invokes the kubectl command. + + Args: + kubectl_command_list: a command string list to be past to kubectl example + format is ['config', 'view'] + human_readable: If false sets parameter -o json for all calls, otherwie + output will be in human readable format. + + Returns: + utility.ExecutorResponse with outputs from stdout,stderr and execution code. + """ + command_list = ['kubectl'] + command_list.extend(kubectl_command_list) + if not human_readable: + command_list.extend(['-o', 'json']) + + return utility.ExecutorResponse().execute_command(command_list) + + +def get_kubectl_configuration( + configuration: Commands, + kubernetes_context: Text = None, + namespace: Text = None, + human_readable: bool = False) -> utility.ExecutorResponse: + """Captures the specified environment configuration. + + Captures the environment state for the specified setting such as current + context, active pods, etc and returns it in as a dictionary format. if no + context is specified the system will use the current_context or error out of + none is specified. + + Args: + configuration: + - K8_CONFIGURED_CONTEXT: returns all k8 configuration available in the + current env including current_context. + - PODS: returns all pods and their status details. + - PVCS: returns all PersistentVolumeClaim and their status details. + - SECRETS: returns all accessible k8 secrests. + - PVS: returns all PersistentVolume and their status details. + - SERVICES: returns all services and their status details. + kubernetes_context: Context to use to retrieve cluster specific commands, if + set to None calls will rely on current_context configured. + namespace: default name space to be used for the commaand, if not specifeid + --all-namespaces will be used. + human_readable: If true all output will be in human readable form insted of + Json. + + Returns: + A list of dictionaries matching gcloud / gsutil output for the specified + configuration,or an error message if any occurs during execution. + """ + + if configuration in (Commands.GET_CONFIGURED_CONTEXT, + Commands.GET_KUBECTL_VERSION): + return execute_kubectl_command( + (_command_string[configuration]).split(' '), human_readable) + + execution_command = _command_string[configuration].split(' ') + if kubernetes_context: + execution_command.extend(['--context', kubernetes_context]) + if namespace: + execution_command.extend(['--namespace', namespace]) + else: + execution_command.extend(['--all-namespaces']) + + return execute_kubectl_command(execution_command, human_readable) + + +def _get_kfp_runtime() -> Text: + """Captures the current version of kpf in k8 cluster. + + Returns: + Returns the run-time version of kfp in as a string. + """ + # TODO(chavoshi) needs to be implemented. + raise NotImplementedError diff --git a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py new file mode 100644 index 00000000000..74999738b75 --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py @@ -0,0 +1,77 @@ +# Lint as: python3 +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for diagnose_me.kubernetes_cluster.""" + +import unittest +from typing import Text +from unittest import mock + +from kfp.cli.diagnose_me import kubernetes_cluster as dkc +from kfp.cli.diagnose_me import utility + + +class KubernetesClusterTest(unittest.TestCase): + + @mock.patch.object(dkc, 'execute_kubectl_command', autospec=True) + def test_project_configuration_gcloud(self, mock_execute_kubectl_command): + """Tests gcloud commands.""" + dkc.get_kubectl_configuration(dkc.Commands.GET_PODS) + mock_execute_kubectl_command.assert_called_once_with( + ['get', 'pods', '--all-namespaces'], human_readable=False) + + dkc.get_kubectl_configuration(dkc.Commands.GET_CONFIGURED_CONTEXT) + mock_execute_kubectl_command.assert_called_with(['config', 'view'], + human_readable=False) + + dkc.get_kubectl_configuration(dkc.Commands.GET_KUBECTL_VERSION) + mock_execute_kubectl_command.assert_called_with(['version'], + human_readable=False) + + dkc.get_kubectl_configuration( + dkc.Commands.GET_PODS, kubernetes_context='test_context') + mock_execute_kubectl_command.assert_called_with( + ['get', 'pods', '--context', 'test_context', '--all-namespaces'], + human_readable=False) + + dkc.get_kubectl_configuration( + dkc.Commands.GET_PODS, kubernetes_context='test_context') + mock_execute_kubectl_command.assert_called_with( + ['get', 'pods', '--context', 'test_context', '--all-namespaces'], + human_readable=False) + + def test_Commands(self): + """Verify commands are formaated properly.""" + for command in dkc.Commands: + self.assertIsInstance(dkc._command_string[command], Text) + self.assertNotIn('\t', dkc._command_string[command]) + self.assertNotIn('\n', dkc._command_string[command]) + + @mock.patch.object(utility, 'ExecutorResponse', autospec=True) + def test_execute_kubectl_command(self, mock_executor_response): + """Test execute_gsutil_command.""" + dkc.execute_kubectl_command( + [dkc._command_string[dkc.Commands.GET_KUBECTL_VERSION]]) + mock_executor_response().execute_command.assert_called_once_with( + ['kubectl', 'version', '-o', 'json']) + + dkc.execute_kubectl_command( + [dkc._command_string[dkc.Commands.GET_KUBECTL_VERSION]], + human_readable=True) + mock_executor_response().execute_command.assert_called_with( + ['kubectl', 'version']) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/cli/diagnose_me/utility.py b/sdk/python/kfp/cli/diagnose_me/utility.py new file mode 100644 index 00000000000..f83984a091f --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/utility.py @@ -0,0 +1,91 @@ +# Lint as: python3 +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Supporting tools and classes for diagnose_me.""" + +import json +import subprocess +from typing import List, Text + + +class ExecutorResponse(object): + """Class for keeping track of output of _executor methods. + + Data model for executing commands and capturing their response. This class + defines the data model layer for execution results, based on MVC design + pattern. + + TODO() This class should be extended to contain data structure to better + represent the underlying data instaed of dict for various response types. + """ + + def execute_command(self, command_list: List[Text]): + """Executes the command in command_list. + + sets values for _stdout,_std_err, and returncode accordingly. + + TODO(): This method is kept in ExecutorResponse for simplicity, however this + deviates from MVP design pattern. It should be factored out in future. + + Args: + command_list: A List of strings that represts the command and parameters + to be executed. + + Returns: + Instance of utility.ExecutorResponse. + """ + + try: + # TODO() switch to process.run to simplify the code. + process = subprocess.Popen( + command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + self._stdout = stdout.decode('utf-8') + self._stderr = stderr.decode('utf-8') + self._returncode = process.returncode + except OSError as e: + self._stderr = e + self._stdout = '' + self._returncode = e.errno + self._parse_raw_input() + return self + + def _parse_raw_input(self): + """Parses the raw input and popluates _json and _parsed properies.""" + try: + self._parsed_output = json.loads(self._stdout) + self._json = self._stdout + except json.JSONDecodeError: + self._json = json.dumps(self._stdout) + self._parsed_output = self._stdout + + @property + def parsed_output(self) -> Text: + """Json load results of stdout or raw results if stdout was not + Json.""" + return self._parsed_output + + @property + def has_error(self) -> bool: + """Returns true if execution error code was not 0.""" + return self._returncode != 0 + + @property + def json_output(self) -> Text: + """Run results in stdout in json format.""" + return self._parsed_output + + @property + def stderr(self): + return self._stderr diff --git a/sdk/python/kfp/cli/diagnose_me/utility_test.py b/sdk/python/kfp/cli/diagnose_me/utility_test.py new file mode 100644 index 00000000000..0c3569de993 --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me/utility_test.py @@ -0,0 +1,44 @@ +# Lint as: python3 +# Copyright 2019 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License,Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing,software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for diagnose_me.utility.""" + +import unittest + +from kfp.cli.diagnose_me import utility + + +class UtilityTest(unittest.TestCase): + + def test_parse_raw_input_json(self): + """Testing json stdout is correctly parsed.""" + response = utility.ExecutorResponse() + response._stdout = '{"key":"value"}' + response._parse_raw_input() + + self.assertEqual(response._json, '{"key":"value"}') + self.assertEqual(response._parsed_output, {'key': 'value'}) + + def test_parse_raw_input_text(self): + """Testing non-json stdout is correctly parsed.""" + response = utility.ExecutorResponse() + response._stdout = 'non-json string' + response._parse_raw_input() + + self.assertEqual(response._json, '"non-json string"') + self.assertEqual(response._parsed_output, 'non-json string') + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/cli/diagnose_me_cli.py b/sdk/python/kfp/cli/diagnose_me_cli.py new file mode 100644 index 00000000000..975df4e0599 --- /dev/null +++ b/sdk/python/kfp/cli/diagnose_me_cli.py @@ -0,0 +1,107 @@ +# Lint as: python3 +"""CLI interface for KFP diagnose_me tool.""" + +import json as json_library +import sys +from typing import Dict, Text + +import click +from kfp.cli.diagnose_me import dev_env, gcp +from kfp.cli.diagnose_me import kubernetes_cluster as k8 +from kfp.cli.diagnose_me import utility + + +@click.group() +def diagnose_me(): + """Prints diagnoses information for KFP environment.""" + pass + + +@diagnose_me.command() +@click.option( + '-j', + '--json', + is_flag=True, + help='Output in Json format, human readable format is set by default.') +@click.option( + '-p', + '--project-id', + type=Text, + help='Target project id. It will use environment default if not specified.') +@click.option( + '-n', + '--namespace', + type=Text, + help='Namespace to use for Kubernetes cluster.all-namespaces is used if not specified.' +) +@click.pass_context +def diagnose_me(ctx: click.Context, json: bool, project_id: str, + namespace: str): + """Runs environment diagnostic with specified parameters. + + Feature stage: + [Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha) + """ + # validate kubectl, gcloud , and gsutil exist + local_env_gcloud_sdk = gcp.get_gcp_configuration( + gcp.Commands.GET_GCLOUD_VERSION, + project_id=project_id, + human_readable=False) + for app in ['Google Cloud SDK', 'gsutil', 'kubectl']: + if app not in local_env_gcloud_sdk.json_output: + raise RuntimeError( + '%s is not installed, gcloud, gsutil and kubectl are required ' + % app + 'for this app to run. Please follow instructions at ' + + 'https://cloud.google.com/sdk/install to install the SDK.') + + click.echo('Collecting diagnostic information ...', file=sys.stderr) + + # default behaviour dump all configurations + results = {} + for gcp_command in gcp.Commands: + results[gcp_command] = gcp.get_gcp_configuration( + gcp_command, project_id=project_id, human_readable=not json) + + for k8_command in k8.Commands: + results[k8_command] = k8.get_kubectl_configuration( + k8_command, human_readable=not json) + + for dev_env_command in dev_env.Commands: + results[dev_env_command] = dev_env.get_dev_env_configuration( + dev_env_command, human_readable=not json) + + print_to_sdtout(results, not json) + + +def print_to_sdtout(results: Dict[str, utility.ExecutorResponse], + human_readable: bool): + """Viewer to print the ExecutorResponse results to stdout. + + Args: + results: A dictionary with key:command names and val: Execution response + human_readable: Print results in human readable format. If set to True + command names will be printed as visual delimiters in new lines. If False + results are printed as a dictionary with command as key. + """ + + output_dict = {} + human_readable_result = [] + for key, val in results.items(): + if val.has_error: + output_dict[ + key. + name] = 'Following error occurred during the diagnoses: %s' % val.stderr + continue + + output_dict[key.name] = val.json_output + human_readable_result.append('================ %s ===================' % + (key.name)) + human_readable_result.append(val.parsed_output) + + if human_readable: + result = '\n'.join(human_readable_result) + else: + result = json_library.dumps( + output_dict, sort_keys=True, indent=2, separators=(',', ': ')) + + click.echo(result) diff --git a/sdk/python/kfp/cli/experiment.py b/sdk/python/kfp/cli/experiment.py new file mode 100644 index 00000000000..b896c029ea9 --- /dev/null +++ b/sdk/python/kfp/cli/experiment.py @@ -0,0 +1,144 @@ +import json +from typing import List + +import click +import kfp_server_api +from kfp.cli.output import OutputFormat, print_output +from kfp_server_api.models.api_experiment import ApiExperiment + + +@click.group() +def experiment(): + """Manage experiment resources.""" + pass + + +@experiment.command() +@click.option('-d', '--description', help="Description of the experiment.") +@click.argument("name") +@click.pass_context +def create(ctx: click.Context, description: str, name: str): + """Create an experiment.""" + client = ctx.obj["client"] + output_format = ctx.obj["output"] + + response = client.create_experiment(name, description=description) + _display_experiment(response, output_format) + + +@experiment.command() +@click.option( + '--page-token', default='', help="Token for starting of the page.") +@click.option( + '-m', '--max-size', default=100, help="Max size of the listed experiments.") +@click.option( + '--sort-by', + default="created_at desc", + help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'." +) +@click.option( + '--filter', + help=( + "filter: A url-encoded, JSON-serialized Filter protocol buffer " + "(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))." + )) +@click.pass_context +def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str, + filter: str): + """List experiments.""" + client = ctx.obj['client'] + output_format = ctx.obj['output'] + + response = client.list_experiments( + page_token=page_token, + page_size=max_size, + sort_by=sort_by, + filter=filter) + if response.experiments: + _display_experiments(response.experiments, output_format) + else: + if output_format == OutputFormat.json.name: + msg = json.dumps([]) + else: + msg = "No experiments found" + click.echo(msg) + + +@experiment.command() +@click.argument("experiment-id") +@click.pass_context +def get(ctx: click.Context, experiment_id: str): + """Get detailed information about an experiment.""" + client = ctx.obj["client"] + output_format = ctx.obj["output"] + + response = client.get_experiment(experiment_id) + _display_experiment(response, output_format) + + +@experiment.command() +@click.argument("experiment-id") +@click.pass_context +def delete(ctx: click.Context, experiment_id: str): + """Delete an experiment.""" + + confirmation = "Caution. The RunDetails page could have an issue" \ + " when it renders a run that has no experiment." \ + " Do you want to continue?" + if not click.confirm(confirmation): + return + + client = ctx.obj["client"] + + client.delete_experiment(experiment_id) + click.echo("{} is deleted.".format(experiment_id)) + + +def _display_experiments(experiments: List[ApiExperiment], + output_format: OutputFormat): + headers = ["Experiment ID", "Name", "Created at"] + data = [ + [exp.id, exp.name, exp.created_at.isoformat()] for exp in experiments + ] + print_output(data, headers, output_format, table_format="grid") + + +def _display_experiment(exp: kfp_server_api.ApiExperiment, + output_format: OutputFormat): + table = [ + ["ID", exp.id], + ["Name", exp.name], + ["Description", exp.description], + ["Created at", exp.created_at.isoformat()], + ] + if output_format == OutputFormat.table.name: + print_output([], ["Experiment Details"], output_format) + print_output(table, [], output_format, table_format="plain") + elif output_format == OutputFormat.json.name: + print_output(dict(table), [], output_format) + + +@experiment.command() +@click.option( + "--experiment-id", + default=None, + help="The ID of the experiment to archive, can only supply either an experiment ID or name." +) +@click.option( + "--experiment-name", + default=None, + help="The name of the experiment to archive, can only supply either an experiment ID or name." +) +@click.pass_context +def archive(ctx: click.Context, experiment_id: str, experiment_name: str): + """Archive an experiment.""" + client = ctx.obj["client"] + + if (experiment_id is None) == (experiment_name is None): + raise ValueError('Either experiment_id or experiment_name is required') + + if not experiment_id: + experiment = client.get_experiment(experiment_name=experiment_name) + experiment_id = experiment.id + + client.archive_experiment(experiment_id=experiment_id) diff --git a/sdk/python/kfp/cli/output.py b/sdk/python/kfp/cli/output.py new file mode 100644 index 00000000000..aa1d7bf45ad --- /dev/null +++ b/sdk/python/kfp/cli/output.py @@ -0,0 +1,63 @@ +# Copyright 2020 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from enum import Enum, unique +from typing import Union + +import click +from tabulate import tabulate + + +@unique +class OutputFormat(Enum): + """Enumerated class with the allowed output format constants.""" + table = "table" + json = "json" + + +def print_output(data: Union[list, dict], + headers: list, + output_format: str, + table_format: str = "simple"): + """Prints the output from the cli command execution based on the specified + format. + + Args: + data (Union[list, dict]): Nested list of values representing the rows to be printed. + headers (list): List of values representing the column names to be printed + for the ``data``. + output_format (str): The desired formatting of the text from the command output. + table_format (str): The formatting for the table ``output_format``. + Default value set to ``simple``. + + Returns: + None: Prints the output. + + Raises: + NotImplementedError: If the ``output_format`` is unknown. + """ + if output_format == OutputFormat.table.name: + click.echo(tabulate(data, headers=headers, tablefmt=table_format)) + elif output_format == OutputFormat.json.name: + if not headers: + output = data + else: + output = [] + for row in data: + output.append(dict(zip(headers, row))) + click.echo(json.dumps(output, indent=4)) + else: + raise NotImplementedError( + "Unknown Output Format: {}".format(output_format)) diff --git a/sdk/python/kfp/cli/pipeline.py b/sdk/python/kfp/cli/pipeline.py new file mode 100644 index 00000000000..ba0dad49e51 --- /dev/null +++ b/sdk/python/kfp/cli/pipeline.py @@ -0,0 +1,264 @@ +# Copyright 2019 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import List, Optional + +import click +import kfp_server_api +from kfp.cli.output import OutputFormat, print_output + + +@click.group() +def pipeline(): + """Manage pipeline resources.""" + pass + + +@pipeline.command() +@click.option("-p", "--pipeline-name", help="Name of the pipeline.") +@click.option("-d", "--description", help="Description for the pipeline.") +@click.argument("package-file") +@click.pass_context +def upload(ctx: click.Context, + pipeline_name: str, + package_file: str, + description: str = None): + """Upload a KFP pipeline.""" + client = ctx.obj["client"] + output_format = ctx.obj["output"] + if not pipeline_name: + pipeline_name = package_file.split(".")[0] + + pipeline = client.upload_pipeline(package_file, pipeline_name, description) + _display_pipeline(pipeline, output_format) + + +@pipeline.command() +@click.option("-p", "--pipeline-id", help="ID of the pipeline", required=False) +@click.option("-n", "--pipeline-name", help="Name of pipeline", required=False) +@click.option( + "-v", + "--pipeline-version", + help="Name of the pipeline version", + required=True) +@click.argument("package-file") +@click.pass_context +def upload_version(ctx: click.Context, + package_file: str, + pipeline_version: str, + pipeline_id: Optional[str] = None, + pipeline_name: Optional[str] = None): + """Upload a version of the KFP pipeline.""" + client = ctx.obj["client"] + output_format = ctx.obj["output"] + if bool(pipeline_id) == bool(pipeline_name): + raise ValueError("Need to supply 'pipeline-name' or 'pipeline-id'") + if pipeline_name is not None: + pipeline_id = client.get_pipeline_id(name=pipeline_name) + if pipeline_id is None: + raise ValueError("Can't find a pipeline with name: %s" % + pipeline_name) + version = client.pipeline_uploads.upload_pipeline_version( + package_file, name=pipeline_version, pipelineid=pipeline_id) + _display_pipeline_version(version, output_format) + + +@pipeline.command() +@click.option( + '--page-token', default='', help="Token for starting of the page.") +@click.option( + '-m', '--max-size', default=100, help="Max size of the listed pipelines.") +@click.option( + '--sort-by', + default="created_at desc", + help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'." +) +@click.option( + '--filter', + help=( + "filter: A url-encoded, JSON-serialized Filter protocol buffer " + "(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))." + )) +@click.pass_context +def list(ctx: click.Context, page_token: str, max_size: int, sort_by: str, + filter: str): + """List uploaded KFP pipelines.""" + client = ctx.obj["client"] + output_format = ctx.obj["output"] + + response = client.list_pipelines( + page_token=page_token, + page_size=max_size, + sort_by=sort_by, + filter=filter) + if response.pipelines: + _print_pipelines(response.pipelines, output_format) + else: + if output_format == OutputFormat.json.name: + msg = json.dumps([]) + else: + msg = "No pipelines found" + click.echo(msg) + + +@pipeline.command() +@click.argument("pipeline-id") +@click.option( + '--page-token', default='', help="Token for starting of the page.") +@click.option( + '-m', + '--max-size', + default=100, + help="Max size of the listed pipeline versions.") +@click.option( + '--sort-by', + default="created_at desc", + help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'." +) +@click.option( + '--filter', + help=( + "filter: A url-encoded, JSON-serialized Filter protocol buffer " + "(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))." + )) +@click.pass_context +def list_versions(ctx: click.Context, pipeline_id: str, page_token: str, + max_size: int, sort_by: str, filter: str): + """List versions of an uploaded KFP pipeline.""" + client = ctx.obj["client"] + output_format = ctx.obj["output"] + + response = client.list_pipeline_versions( + pipeline_id, + page_token=page_token, + page_size=max_size, + sort_by=sort_by, + filter=filter) + if response.versions: + _print_pipeline_versions(response.versions, output_format) + else: + if output_format == OutputFormat.json.name: + msg = json.dumps([]) + else: + msg = "No pipeline or version found" + click.echo(msg) + + +@pipeline.command() +@click.argument("version-id") +@click.pass_context +def delete_version(ctx: click.Context, version_id: str): + """Delete pipeline version. + + Args: + version_id: id of the pipeline version. + + Returns: + Object. If the method is called asynchronously, returns the request thread. + + Throws: + Exception if pipeline version is not found. + """ + client = ctx.obj["client"] + return client.delete_pipeline_version(version_id) + + +@pipeline.command() +@click.argument("pipeline-id") +@click.pass_context +def get(ctx: click.Context, pipeline_id: str): + """Get detailed information about an uploaded KFP pipeline.""" + client = ctx.obj["client"] + output_format = ctx.obj["output"] + + pipeline = client.get_pipeline(pipeline_id) + _display_pipeline(pipeline, output_format) + + +@pipeline.command() +@click.argument("pipeline-id") +@click.pass_context +def delete(ctx: click.Context, pipeline_id: str): + """Delete an uploaded KFP pipeline.""" + client = ctx.obj["client"] + + client.delete_pipeline(pipeline_id) + click.echo(f"{pipeline_id} is deleted") + + +def _print_pipelines(pipelines: List[kfp_server_api.ApiPipeline], + output_format: OutputFormat): + headers = ["Pipeline ID", "Name", "Uploaded at"] + data = [[pipeline.id, pipeline.name, + pipeline.created_at.isoformat()] for pipeline in pipelines] + print_output(data, headers, output_format, table_format="grid") + + +def _print_pipeline_versions(versions: List[kfp_server_api.ApiPipelineVersion], + output_format: OutputFormat): + headers = ["Version ID", "Version name", "Uploaded at", "Pipeline ID"] + data = [[ + version.id, version.name, + version.created_at.isoformat(), + next(rr + for rr in version.resource_references + if rr.key.type == kfp_server_api.ApiResourceType.PIPELINE).key.id + ] + for version in versions] + print_output(data, headers, output_format, table_format="grid") + + +def _display_pipeline(pipeline: kfp_server_api.ApiPipeline, + output_format: OutputFormat): + # Pipeline information + table = [["Pipeline ID", pipeline.id], ["Name", pipeline.name], + ["Description", pipeline.description], + ["Uploaded at", pipeline.created_at.isoformat()], + ["Version ID", pipeline.default_version.id]] + + # Pipeline parameter details + headers = ["Parameter Name", "Default Value"] + data = [] + if pipeline.parameters is not None: + data = [[param.name, param.value] for param in pipeline.parameters] + + if output_format == OutputFormat.table.name: + print_output([], ["Pipeline Details"], output_format) + print_output(table, [], output_format, table_format="plain") + print_output(data, headers, output_format, table_format="grid") + elif output_format == OutputFormat.json.name: + output = dict() + output["Pipeline Details"] = dict(table) + params = [] + for item in data: + params.append(dict(zip(headers, item))) + output["Pipeline Parameters"] = params + print_output(output, [], output_format) + + +def _display_pipeline_version(version: kfp_server_api.ApiPipelineVersion, + output_format: OutputFormat): + pipeline_id = next( + rr for rr in version.resource_references + if rr.key.type == kfp_server_api.ApiResourceType.PIPELINE).key.id + table = [["Pipeline ID", pipeline_id], ["Version name", version.name], + ["Uploaded at", version.created_at.isoformat()], + ["Version ID", version.id]] + + if output_format == OutputFormat.table.name: + print_output([], ["Pipeline Version Details"], output_format) + print_output(table, [], output_format, table_format="plain") + elif output_format == OutputFormat.json.name: + print_output(dict(table), [], output_format) diff --git a/sdk/python/kfp/cli/recurring_run.py b/sdk/python/kfp/cli/recurring_run.py new file mode 100644 index 00000000000..b9f61b8d990 --- /dev/null +++ b/sdk/python/kfp/cli/recurring_run.py @@ -0,0 +1,216 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Any, Dict, List, Optional + +import click +import kfp_server_api +from kfp.cli.output import OutputFormat, print_output + + +@click.group() +def recurring_run(): + """Manage recurring-run resources.""" + pass + + +@recurring_run.command() +@click.option( + '--catchup/--no-catchup', + help='Whether the recurring run should catch up if behind schedule.', + type=bool) +@click.option( + '--cron-expression', + help='A cron expression representing a set of times, using 6 space-separated fields, e.g. "0 0 9 ? * 2-6".' +) +@click.option('--description', help='The description of the recurring run.') +@click.option( + '--enable-caching/--disable-caching', + help='Optional. Whether or not to enable caching for the run.', + type=bool) +@click.option( + '--enabled/--disabled', + help='A bool indicating whether the recurring run is enabled or disabled.', + type=bool) +@click.option( + '--end-time', + help='The RFC3339 time string of the time when to end the job.') +@click.option( + '--experiment-id', + help='The ID of the experiment to create the recurring run under, can only supply either an experiment ID or name.' +) +@click.option( + '--experiment-name', + help='The name of the experiment to create the recurring run under, can only supply either an experiment ID or name.' +) +@click.option('--job-name', help='The name of the recurring run.') +@click.option( + '--interval-second', + help='Integer indicating the seconds between two recurring runs in for a periodic schedule.' +) +@click.option( + '--max-concurrency', + help='Integer indicating how many jobs can be run in parallel.', + type=int) +@click.option( + '--pipeline-id', + help='The ID of the pipeline to use to create the recurring run.') +@click.option( + '--pipeline-package-path', + help='Local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).' +) +@click.option( + '--start-time', + help='The RFC3339 time string of the time when to start the job.') +@click.option('--version-id', help='The id of a pipeline version.') +@click.argument("args", nargs=-1) +@click.pass_context +def create(ctx: click.Context, + job_name: str, + experiment_id: Optional[str] = None, + experiment_name: Optional[str] = None, + catchup: Optional[bool] = None, + cron_expression: Optional[str] = None, + enabled: Optional[bool] = None, + description: Optional[str] = None, + enable_caching: Optional[bool] = None, + end_time: Optional[str] = None, + interval_second: Optional[int] = None, + max_concurrency: Optional[int] = None, + params: Optional[dict] = None, + pipeline_package_path: Optional[str] = None, + pipeline_id: Optional[str] = None, + start_time: Optional[str] = None, + version_id: Optional[str] = None, + args: Optional[List[str]] = None): + """Create a recurring run.""" + client = ctx.obj["client"] + output_format = ctx.obj['output'] + + if (experiment_id is None) == (experiment_name is None): + raise ValueError('Either experiment_id or experiment_name is required') + if not experiment_id: + experiment = client.create_experiment(experiment_name) + experiment_id = experiment.id + + # Ensure we only split on the first equals char so the value can contain + # equals signs too. + split_args: List = [arg.split("=", 1) for arg in args or []] + params: Dict[str, Any] = dict(split_args) + recurring_run = client.create_recurring_run( + cron_expression=cron_expression, + description=description, + enabled=enabled, + enable_caching=enable_caching, + end_time=end_time, + experiment_id=experiment_id, + interval_second=interval_second, + job_name=job_name, + max_concurrency=max_concurrency, + no_catchup=not catchup, + params=params, + pipeline_package_path=pipeline_package_path, + pipeline_id=pipeline_id, + start_time=start_time, + version_id=version_id) + + _display_recurring_run(recurring_run, output_format) + + +@recurring_run.command() +@click.option( + '-e', + '--experiment-id', + help='Parent experiment ID of listed recurring runs.') +@click.option( + '--page-token', default='', help="Token for starting of the page.") +@click.option( + '-m', + '--max-size', + default=100, + help="Max size of the listed recurring runs.") +@click.option( + '--sort-by', + default="created_at desc", + help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'." +) +@click.option( + '--filter', + help=( + "filter: A url-encoded, JSON-serialized Filter protocol buffer " + "(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))." + )) +@click.pass_context +def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int, + sort_by: str, filter: str): + """List recurring runs.""" + client = ctx.obj['client'] + output_format = ctx.obj['output'] + + response = client.list_recurring_runs( + experiment_id=experiment_id, + page_token=page_token, + page_size=max_size, + sort_by=sort_by, + filter=filter) + if response.jobs: + _display_recurring_runs(response.jobs, output_format) + else: + if output_format == OutputFormat.json.name: + msg = json.dumps([]) + else: + msg = "No recurring runs found" + click.echo(msg) + + +@recurring_run.command() +@click.argument("job-id") +@click.pass_context +def get(ctx: click.Context, job_id: str): + """Get detailed information about an experiment.""" + client = ctx.obj["client"] + output_format = ctx.obj["output"] + + response = client.get_recurring_run(job_id) + _display_recurring_run(response, output_format) + + +@recurring_run.command() +@click.argument("job-id") +@click.pass_context +def delete(ctx: click.Context, job_id: str): + """Delete a recurring run.""" + client = ctx.obj["client"] + client.delete_job(job_id) + + +def _display_recurring_runs(recurring_runs: List[kfp_server_api.ApiJob], + output_format: OutputFormat): + headers = ["Recurring Run ID", "Name"] + data = [[rr.id, rr.name] for rr in recurring_runs] + print_output(data, headers, output_format, table_format="grid") + + +def _display_recurring_run(recurring_run: kfp_server_api.ApiJob, + output_format: OutputFormat): + table = [ + ["Recurring Run ID", recurring_run.id], + ["Name", recurring_run.name], + ] + if output_format == OutputFormat.table.name: + print_output([], ["Recurring Run Details"], output_format) + print_output(table, [], output_format, table_format="plain") + elif output_format == OutputFormat.json.name: + print_output(dict(table), [], output_format) diff --git a/sdk/python/kfp/cli/run.py b/sdk/python/kfp/cli/run.py new file mode 100644 index 00000000000..5282206abc3 --- /dev/null +++ b/sdk/python/kfp/cli/run.py @@ -0,0 +1,228 @@ +# Copyright 2018 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime +import json +import shutil +import subprocess +import sys +import time +from typing import List + +import click +import kfp_server_api +from kfp.cli.output import OutputFormat, print_output +from kfp.client import Client + + +@click.group() +def run(): + """manage run resources.""" + pass + + +@run.command() +@click.option( + '-e', '--experiment-id', help='Parent experiment ID of listed runs.') +@click.option( + '--page-token', default='', help="Token for starting of the page.") +@click.option( + '-m', '--max-size', default=100, help="Max size of the listed runs.") +@click.option( + '--sort-by', + default="created_at desc", + help="Can be '[field_name]', '[field_name] desc'. For example, 'name desc'." +) +@click.option( + '--filter', + help=( + "filter: A url-encoded, JSON-serialized Filter protocol buffer " + "(see [filter.proto](https://github.com/kubeflow/pipelines/blob/master/backend/api/filter.proto))." + )) +@click.pass_context +def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int, + sort_by: str, filter: str): + """list recent KFP runs.""" + client = ctx.obj['client'] + output_format = ctx.obj['output'] + response = client.list_runs( + experiment_id=experiment_id, + page_token=page_token, + page_size=max_size, + sort_by=sort_by, + filter=filter) + if response and response.runs: + _print_runs(response.runs, output_format) + else: + if output_format == OutputFormat.json.name: + msg = json.dumps([]) + else: + msg = 'No runs found.' + click.echo(msg) + + +@run.command() +@click.option( + '-e', + '--experiment-name', + required=True, + help='Experiment name of the run.') +@click.option('-r', '--run-name', help='Name of the run.') +@click.option( + '-f', + '--package-file', + type=click.Path(exists=True, dir_okay=False), + help='Path of the pipeline package file.') +@click.option('-p', '--pipeline-id', help='ID of the pipeline template.') +@click.option('-n', '--pipeline-name', help='Name of the pipeline template.') +@click.option( + '-w', + '--watch', + is_flag=True, + default=False, + help='Watch the run status until it finishes.') +@click.option('-v', '--version', help='ID of the pipeline version.') +@click.option( + '-t', + '--timeout', + default=0, + help='Wait for a run to complete until timeout in seconds.', + type=int) +@click.argument('args', nargs=-1) +@click.pass_context +def submit(ctx: click.Context, experiment_name: str, run_name: str, + package_file: str, pipeline_id: str, pipeline_name: str, watch: bool, + timeout: int, version: str, args: List[str]): + """submit a KFP run.""" + client = ctx.obj['client'] + namespace = ctx.obj['namespace'] + output_format = ctx.obj['output'] + if not run_name: + run_name = experiment_name + + if not pipeline_id and pipeline_name: + pipeline_id = client.get_pipeline_id(name=pipeline_name) + + if not package_file and not pipeline_id and not version: + click.echo( + 'You must provide one of [package_file, pipeline_id, version].', + err=True) + sys.exit(1) + + arg_dict = dict(arg.split('=', maxsplit=1) for arg in args) + + experiment = client.create_experiment(experiment_name) + run = client.run_pipeline( + experiment.id, + run_name, + package_file, + arg_dict, + pipeline_id, + version_id=version) + if timeout > 0: + _wait_for_run_completion(client, run.id, timeout, output_format) + else: + _display_run(client, namespace, run.id, watch, output_format) + + +@run.command() +@click.option( + '-w', + '--watch', + is_flag=True, + default=False, + help='Watch the run status until it finishes.') +@click.option( + '-d', + '--detail', + is_flag=True, + default=False, + help='Get detailed information of the run in json format.') +@click.argument('run-id') +@click.pass_context +def get(ctx: click.Context, watch: bool, detail: bool, run_id: str): + """display the details of a KFP run.""" + client = ctx.obj['client'] + namespace = ctx.obj['namespace'] + output_format = ctx.obj['output'] + + _display_run(client, namespace, run_id, watch, output_format, detail) + + +def _display_run(client: click.Context, + namespace: str, + run_id: str, + watch: bool, + output_format: OutputFormat, + detail: bool = False): + run = client.get_run(run_id).run + + if detail: + data = { + key: + value.isoformat() if isinstance(value, datetime.datetime) else value + for key, value in run.to_dict().items() + if key not in ['pipeline_spec' + ] # useless but too much detailed field + } + click.echo(data) + return + + _print_runs([run], output_format) + if not watch: + return + argo_path = shutil.which('argo') + if not argo_path: + raise RuntimeError( + "argo isn't found in $PATH. It's necessary for watch. " + "Please make sure it's installed and available. " + "Installation instructions be found here - " + "https://github.com/argoproj/argo-workflows/releases") + + argo_workflow_name = None + while True: + time.sleep(1) + run_detail = client.get_run(run_id) + run = run_detail.run + if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest: + manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest) + if manifest['metadata'] and manifest['metadata']['name']: + argo_workflow_name = manifest['metadata']['name'] + break + if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']: + click.echo('Run is finished with status {}.'.format( + run_detail.run.status)) + return + if argo_workflow_name: + subprocess.run( + [argo_path, 'watch', argo_workflow_name, '-n', namespace]) + _print_runs([run], output_format) + + +def _wait_for_run_completion(client: Client, run_id: str, timeout: int, + output_format: OutputFormat): + run_detail = client.wait_for_run_completion(run_id, timeout) + _print_runs([run_detail.run], output_format) + + +def _print_runs(runs: List[kfp_server_api.ApiRun], output_format: OutputFormat): + headers = ['run id', 'name', 'status', 'created at', 'experiment id'] + data = [[ + run.id, run.name, run.status, + run.created_at.isoformat(), + next(rr + for rr in run.resource_references + if rr.key.type == kfp_server_api.ApiResourceType.EXPERIMENT).key.id + ] + for run in runs] + print_output(data, headers, output_format, table_format='grid') diff --git a/sdk/python/kfp/deprecated/cli/components_test.py b/sdk/python/kfp/deprecated/cli/components_test.py index 396c71a23bd..b1af6d8fe8b 100644 --- a/sdk/python/kfp/deprecated/cli/components_test.py +++ b/sdk/python/kfp/deprecated/cli/components_test.py @@ -17,15 +17,17 @@ import pathlib import sys import textwrap -from typing import List, Optional, Union import unittest +from typing import List, Optional, Union from unittest import mock from typer import testing # Docker is an optional install, but we need the import to succeed for tests. # So we patch it before importing kfp.cli.components. -if importlib.util.find_spec('docker') is None: +try: + import docker # pylint: disable=unused-import +except ImportError: sys.modules['docker'] = mock.Mock() from kfp.deprecated.cli import components diff --git a/sdk/python/setup.py b/sdk/python/setup.py index e9408daf172..782b46b56f4 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -91,6 +91,6 @@ def find_version(*file_path_parts: str) -> str: 'console_scripts': [ 'dsl-compile = kfp.compiler.main:main', 'dsl-compile-deprecated = kfp.deprecated.compiler.main:main', - 'kfp=kfp.__main__:main', + 'kfp=kfp.cli.cli:main', ] })