From c48c03601077f83a28ddcdcb051d8001c2dffb22 Mon Sep 17 00:00:00 2001 From: Connor McCarthy Date: Wed, 20 Apr 2022 14:57:10 -0600 Subject: [PATCH] use click for dsl-compile command (#7560) --- samples/test/utils/kfp/samples/test/utils.py | 57 +++++---- .../kfp/{compiler/main.py => cli/compile.py} | 117 ++++++++---------- 2 files changed, 86 insertions(+), 88 deletions(-) rename sdk/python/kfp/{compiler/main.py => cli/compile.py} (59%) diff --git a/samples/test/utils/kfp/samples/test/utils.py b/samples/test/utils/kfp/samples/test/utils.py index 61168144ea89..e33627ae0c13 100644 --- a/samples/test/utils/kfp/samples/test/utils.py +++ b/samples/test/utils/kfp/samples/test/utils.py @@ -17,26 +17,29 @@ import json import logging import os -import sys -import time import random -import tempfile import subprocess -from dataclasses import dataclass, asdict +import sys +import tempfile +import time +import unittest +from dataclasses import asdict +from dataclasses import dataclass from pprint import pprint from typing import Callable, Optional -import unittest -from google.protobuf.json_format import MessageToDict -import nbformat -from nbconvert import PythonExporter import kfp -from kfp.deprecated.onprem import add_default_resource_spec import kfp.compiler import kfp_server_api +import nbformat +from google.protobuf.json_format import MessageToDict +from kfp.deprecated.onprem import add_default_resource_spec from ml_metadata import metadata_store from ml_metadata.metadata_store.metadata_store import ListOptions -from ml_metadata.proto import Event, Execution, metadata_store_pb2 +from ml_metadata.proto import Event +from ml_metadata.proto import Execution +from ml_metadata.proto import metadata_store_pb2 +from nbconvert import PythonExporter MINUTE = 60 @@ -66,7 +69,8 @@ def NEEDS_A_FIX(run_id, run, **kwargs): @dataclass class TestCase: - """Test case for running a KFP sample. One of pipeline_func or pipeline_file is required. + """Test case for running a KFP sample. One of pipeline_func or + pipeline_file is required. :param run_pipeline: when False, it means the test case just runs the python file. :param pipeline_file_compile_path: when specified, the pipeline file can compile @@ -96,7 +100,8 @@ def run_pipeline_func(test_cases: list[TestCase]): def test_wrapper( run_pipeline: Callable[[ - Callable, str, str, kfp.deprecated.dsl.PipelineExecutionMode, bool, dict, bool + Callable, str, str, kfp.deprecated.dsl + .PipelineExecutionMode, bool, dict, bool ], kfp_server_api.ApiRunDetail], mlmd_connection_config: metadata_store_pb2.MetadataStoreClientConfig, ): @@ -173,7 +178,8 @@ def test_wrapper( def debug_verify(run_id: str, verify_func: Verifier): - '''Debug a verify function quickly using MLMD data from an existing KFP run ID.''' + """Debug a verify function quickly using MLMD data from an existing KFP run + ID.""" t = unittest.TestCase() t.maxDiff = None # we always want to see full diff client = KfpMlmdClient() @@ -216,10 +222,11 @@ def main( metadata_service_host: Optional[str] = None, metadata_service_port: int = 8080, ): - """Test file CLI entrypoint used by Fire. - To configure KFP endpoint, configure env vars following: - https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/#configure-sdk-client-by-environment-variables. - KFP UI endpoint can be configured by KF_PIPELINES_UI_ENDPOINT env var. + """Test file CLI entrypoint used by Fire. To configure KFP endpoint, + configure env vars following: + https://www.kubeflow.org/docs/components/pipelines/sdk/connect- + api/#configure-sdk-client-by-environment-variables. KFP UI endpoint can + be configured by KF_PIPELINES_UI_ENDPOINT env var. :param pipeline_root: pipeline root that holds intermediate artifacts, example gs://your-bucket/path/to/workdir. @@ -271,8 +278,8 @@ def run_pipeline( pipeline_func: Optional[Callable], pipeline_file: Optional[str], pipeline_file_compile_path: Optional[str], - mode: kfp.deprecated.dsl.PipelineExecutionMode = kfp.deprecated.dsl.PipelineExecutionMode - .V2_ENGINE, + mode: kfp.deprecated.dsl.PipelineExecutionMode = kfp.deprecated.dsl + .PipelineExecutionMode.V2_ENGINE, enable_caching: bool = False, arguments: Optional[dict] = None, dry_run: bool = False, # just compile the pipeline without running it @@ -329,7 +336,8 @@ def _create_run(): else: package_path = tempfile.mktemp( suffix='.yaml', prefix="kfp_package") - from kfp.deprecated.compiler.main import compile_pyfile + from kfp.deprecated.compiler.main import \ + compile_pyfile compile_pyfile( pyfile=pyfile, output_path=package_path, @@ -396,8 +404,8 @@ def run_v2_pipeline( if file.endswith(".ipynb"): pyfile = tempfile.mktemp(suffix='.py', prefix="pipeline_py_code") _nb_sample_to_py(file, pyfile) - from kfp.compiler.main import compile_pyfile - compile_pyfile(pyfile=pyfile, package_path=original_pipeline_spec) + from kfp.cli.compile import dsl_compile + dsl_compile(py=pyfile, output=original_pipeline_spec) # remove following overriding logic once we use create_run_from_job_spec to trigger kfp pipeline run with open(original_pipeline_spec) as f: @@ -721,7 +729,10 @@ def _disable_cache(task): def _nb_sample_to_py(notebook_path: str, output_path: str): - """nb_sample_to_py converts notebook kfp sample to a python file. Cells with tag "skip-in-test" will be omitted.""" + """nb_sample_to_py converts notebook kfp sample to a python file. + + Cells with tag "skip-in-test" will be omitted. + """ with open(notebook_path, 'r') as f: nb = nbformat.read(f, as_version=4) # Cells with skip-in-test tag will be omitted. diff --git a/sdk/python/kfp/compiler/main.py b/sdk/python/kfp/cli/compile.py similarity index 59% rename from sdk/python/kfp/compiler/main.py rename to sdk/python/kfp/cli/compile.py index 2038a9f262be..4f6b55bc30a7 100644 --- a/sdk/python/kfp/compiler/main.py +++ b/sdk/python/kfp/cli/compile.py @@ -1,4 +1,4 @@ -# Copyright 2020 The Kubeflow Authors +# Copyright 2020-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,50 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. """KFP SDK compiler CLI tool.""" - -import argparse import json +import logging import os import sys from typing import Any, Callable, List, Mapping, Optional +import click from kfp import compiler from kfp.components import pipeline_context -def parse_arguments() -> argparse.Namespace: - """Parse command line arguments.""" - - parser = argparse.ArgumentParser() - parser.add_argument( - '--py', - type=str, - required=True, - help='local absolute path to a py file.') - parser.add_argument( - '--function', - type=str, - help='The name of the function to compile if there are multiple.') - parser.add_argument( - '--pipeline-parameters', - type=json.loads, - help='The pipeline parameters in JSON dict format.') - parser.add_argument( - '--namespace', type=str, help='The namespace for the pipeline function') - parser.add_argument( - '--output', - type=str, - required=True, - help='local path to the output PipelineJob json file.') - parser.add_argument( - '--disable-type-check', - action='store_true', - help='disable the type check, default is enabled.') - - args = parser.parse_args() - return args - - def _compile_pipeline_function( pipeline_funcs: List[Callable], function_name: Optional[str], @@ -119,46 +86,66 @@ def __exit__(self, *args): pipeline_context.pipeline_decorator_handler = self.old_handler -def compile_pyfile( - pyfile: str, - package_path: str, +@click.command() +@click.option( + '--py', type=str, required=True, help='Local absolute path to a py file.') +@click.option( + '--output', + type=str, + required=True, + help='Local path to the output PipelineJob JSON file.') +@click.option( + '--function', + 'function_name', + type=str, + default=None, + help='The name of the function to compile if there are multiple.') +@click.option( + '--pipeline-parameters', + type=str, + default=None, + help='The pipeline parameters in JSON dict format.') +@click.option( + '--disable-type-check', + is_flag=True, + default=False, + help='Disable the type check. Default: type check is not disabled.') +def dsl_compile( + py: str, + output: str, function_name: Optional[str] = None, - pipeline_parameters: Optional[Mapping[str, Any]] = None, - type_check: bool = True, + pipeline_parameters: str = None, + disable_type_check: bool = True, ) -> None: - """Compiles a pipeline written in a .py file. - - Args: - pyfile: The path to the .py file that contains the pipeline definition. - function_name: The name of the pipeline function. - pipeline_parameters: The pipeline parameters as a dict of {name: value}. - package_path: The output path of the compiled result. - type_check: Whether to enable the type checking. - """ - sys.path.insert(0, os.path.dirname(pyfile)) + """Compiles a pipeline written in a .py file.""" + sys.path.insert(0, os.path.dirname(py)) try: - filename = os.path.basename(pyfile) + filename = os.path.basename(py) with PipelineCollectorContext() as pipeline_funcs: __import__(os.path.splitext(filename)[0]) + try: + parsed_parameters = json.loads( + pipeline_parameters) if pipeline_parameters is not None else {} + except json.JSONDecodeError as e: + logging.error( + f"Failed to parse --pipeline-parameters argument: {pipeline_parameters}" + ) + raise e _compile_pipeline_function( pipeline_funcs=pipeline_funcs, function_name=function_name, - pipeline_parameters=pipeline_parameters, - package_path=package_path, - type_check=type_check, + pipeline_parameters=parsed_parameters, + package_path=output, + type_check=not disable_type_check, ) finally: del sys.path[0] def main(): - args = parse_arguments() - if args.py is None: - raise ValueError('The --py option must be specified.') - compile_pyfile( - pyfile=args.py, - function_name=args.function, - pipeline_parameters=args.pipeline_parameters, - package_path=args.output, - type_check=not args.disable_type_check, - ) + logging.basicConfig(format='%(message)s', level=logging.INFO) + try: + dsl_compile(obj={}, auto_envvar_prefix='KFP') + except Exception as e: + click.echo(str(e), err=True) + sys.exit(1)