Skip to content

Commit

Permalink
use click for dsl-compile command (kubeflow#7560)
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy authored and abaland committed May 29, 2022
1 parent fe37e57 commit c48c036
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 88 deletions.
57 changes: 34 additions & 23 deletions samples/test/utils/kfp/samples/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@
import json
import logging
import os
import sys
import time
import random
import tempfile
import subprocess
from dataclasses import dataclass, asdict
import sys
import tempfile
import time
import unittest
from dataclasses import asdict
from dataclasses import dataclass
from pprint import pprint
from typing import Callable, Optional
import unittest
from google.protobuf.json_format import MessageToDict
import nbformat
from nbconvert import PythonExporter

import kfp
from kfp.deprecated.onprem import add_default_resource_spec
import kfp.compiler
import kfp_server_api
import nbformat
from google.protobuf.json_format import MessageToDict
from kfp.deprecated.onprem import add_default_resource_spec
from ml_metadata import metadata_store
from ml_metadata.metadata_store.metadata_store import ListOptions
from ml_metadata.proto import Event, Execution, metadata_store_pb2
from ml_metadata.proto import Event
from ml_metadata.proto import Execution
from ml_metadata.proto import metadata_store_pb2
from nbconvert import PythonExporter

MINUTE = 60

Expand Down Expand Up @@ -66,7 +69,8 @@ def NEEDS_A_FIX(run_id, run, **kwargs):

@dataclass
class TestCase:
"""Test case for running a KFP sample. One of pipeline_func or pipeline_file is required.
"""Test case for running a KFP sample. One of pipeline_func or
pipeline_file is required.
:param run_pipeline: when False, it means the test case just runs the python file.
:param pipeline_file_compile_path: when specified, the pipeline file can compile
Expand Down Expand Up @@ -96,7 +100,8 @@ def run_pipeline_func(test_cases: list[TestCase]):

def test_wrapper(
run_pipeline: Callable[[
Callable, str, str, kfp.deprecated.dsl.PipelineExecutionMode, bool, dict, bool
Callable, str, str, kfp.deprecated.dsl
.PipelineExecutionMode, bool, dict, bool
], kfp_server_api.ApiRunDetail],
mlmd_connection_config: metadata_store_pb2.MetadataStoreClientConfig,
):
Expand Down Expand Up @@ -173,7 +178,8 @@ def test_wrapper(


def debug_verify(run_id: str, verify_func: Verifier):
'''Debug a verify function quickly using MLMD data from an existing KFP run ID.'''
"""Debug a verify function quickly using MLMD data from an existing KFP run
ID."""
t = unittest.TestCase()
t.maxDiff = None # we always want to see full diff
client = KfpMlmdClient()
Expand Down Expand Up @@ -216,10 +222,11 @@ def main(
metadata_service_host: Optional[str] = None,
metadata_service_port: int = 8080,
):
"""Test file CLI entrypoint used by Fire.
To configure KFP endpoint, configure env vars following:
https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api/#configure-sdk-client-by-environment-variables.
KFP UI endpoint can be configured by KF_PIPELINES_UI_ENDPOINT env var.
"""Test file CLI entrypoint used by Fire. To configure KFP endpoint,
configure env vars following:
https://www.kubeflow.org/docs/components/pipelines/sdk/connect-
api/#configure-sdk-client-by-environment-variables. KFP UI endpoint can
be configured by KF_PIPELINES_UI_ENDPOINT env var.
:param pipeline_root: pipeline root that holds intermediate
artifacts, example gs://your-bucket/path/to/workdir.
Expand Down Expand Up @@ -271,8 +278,8 @@ def run_pipeline(
pipeline_func: Optional[Callable],
pipeline_file: Optional[str],
pipeline_file_compile_path: Optional[str],
mode: kfp.deprecated.dsl.PipelineExecutionMode = kfp.deprecated.dsl.PipelineExecutionMode
.V2_ENGINE,
mode: kfp.deprecated.dsl.PipelineExecutionMode = kfp.deprecated.dsl
.PipelineExecutionMode.V2_ENGINE,
enable_caching: bool = False,
arguments: Optional[dict] = None,
dry_run: bool = False, # just compile the pipeline without running it
Expand Down Expand Up @@ -329,7 +336,8 @@ def _create_run():
else:
package_path = tempfile.mktemp(
suffix='.yaml', prefix="kfp_package")
from kfp.deprecated.compiler.main import compile_pyfile
from kfp.deprecated.compiler.main import \
compile_pyfile
compile_pyfile(
pyfile=pyfile,
output_path=package_path,
Expand Down Expand Up @@ -396,8 +404,8 @@ def run_v2_pipeline(
if file.endswith(".ipynb"):
pyfile = tempfile.mktemp(suffix='.py', prefix="pipeline_py_code")
_nb_sample_to_py(file, pyfile)
from kfp.compiler.main import compile_pyfile
compile_pyfile(pyfile=pyfile, package_path=original_pipeline_spec)
from kfp.cli.compile import dsl_compile
dsl_compile(py=pyfile, output=original_pipeline_spec)

# remove following overriding logic once we use create_run_from_job_spec to trigger kfp pipeline run
with open(original_pipeline_spec) as f:
Expand Down Expand Up @@ -721,7 +729,10 @@ def _disable_cache(task):


def _nb_sample_to_py(notebook_path: str, output_path: str):
"""nb_sample_to_py converts notebook kfp sample to a python file. Cells with tag "skip-in-test" will be omitted."""
"""nb_sample_to_py converts notebook kfp sample to a python file.
Cells with tag "skip-in-test" will be omitted.
"""
with open(notebook_path, 'r') as f:
nb = nbformat.read(f, as_version=4)
# Cells with skip-in-test tag will be omitted.
Expand Down
117 changes: 52 additions & 65 deletions sdk/python/kfp/compiler/main.py → sdk/python/kfp/cli/compile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 The Kubeflow Authors
# Copyright 2020-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,50 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""KFP SDK compiler CLI tool."""

import argparse
import json
import logging
import os
import sys
from typing import Any, Callable, List, Mapping, Optional

import click
from kfp import compiler
from kfp.components import pipeline_context


def parse_arguments() -> argparse.Namespace:
"""Parse command line arguments."""

parser = argparse.ArgumentParser()
parser.add_argument(
'--py',
type=str,
required=True,
help='local absolute path to a py file.')
parser.add_argument(
'--function',
type=str,
help='The name of the function to compile if there are multiple.')
parser.add_argument(
'--pipeline-parameters',
type=json.loads,
help='The pipeline parameters in JSON dict format.')
parser.add_argument(
'--namespace', type=str, help='The namespace for the pipeline function')
parser.add_argument(
'--output',
type=str,
required=True,
help='local path to the output PipelineJob json file.')
parser.add_argument(
'--disable-type-check',
action='store_true',
help='disable the type check, default is enabled.')

args = parser.parse_args()
return args


def _compile_pipeline_function(
pipeline_funcs: List[Callable],
function_name: Optional[str],
Expand Down Expand Up @@ -119,46 +86,66 @@ def __exit__(self, *args):
pipeline_context.pipeline_decorator_handler = self.old_handler


def compile_pyfile(
pyfile: str,
package_path: str,
@click.command()
@click.option(
'--py', type=str, required=True, help='Local absolute path to a py file.')
@click.option(
'--output',
type=str,
required=True,
help='Local path to the output PipelineJob JSON file.')
@click.option(
'--function',
'function_name',
type=str,
default=None,
help='The name of the function to compile if there are multiple.')
@click.option(
'--pipeline-parameters',
type=str,
default=None,
help='The pipeline parameters in JSON dict format.')
@click.option(
'--disable-type-check',
is_flag=True,
default=False,
help='Disable the type check. Default: type check is not disabled.')
def dsl_compile(
py: str,
output: str,
function_name: Optional[str] = None,
pipeline_parameters: Optional[Mapping[str, Any]] = None,
type_check: bool = True,
pipeline_parameters: str = None,
disable_type_check: bool = True,
) -> None:
"""Compiles a pipeline written in a .py file.
Args:
pyfile: The path to the .py file that contains the pipeline definition.
function_name: The name of the pipeline function.
pipeline_parameters: The pipeline parameters as a dict of {name: value}.
package_path: The output path of the compiled result.
type_check: Whether to enable the type checking.
"""
sys.path.insert(0, os.path.dirname(pyfile))
"""Compiles a pipeline written in a .py file."""
sys.path.insert(0, os.path.dirname(py))
try:
filename = os.path.basename(pyfile)
filename = os.path.basename(py)
with PipelineCollectorContext() as pipeline_funcs:
__import__(os.path.splitext(filename)[0])
try:
parsed_parameters = json.loads(
pipeline_parameters) if pipeline_parameters is not None else {}
except json.JSONDecodeError as e:
logging.error(
f"Failed to parse --pipeline-parameters argument: {pipeline_parameters}"
)
raise e
_compile_pipeline_function(
pipeline_funcs=pipeline_funcs,
function_name=function_name,
pipeline_parameters=pipeline_parameters,
package_path=package_path,
type_check=type_check,
pipeline_parameters=parsed_parameters,
package_path=output,
type_check=not disable_type_check,
)
finally:
del sys.path[0]


def main():
args = parse_arguments()
if args.py is None:
raise ValueError('The --py option must be specified.')
compile_pyfile(
pyfile=args.py,
function_name=args.function,
pipeline_parameters=args.pipeline_parameters,
package_path=args.output,
type_check=not args.disable_type_check,
)
logging.basicConfig(format='%(message)s', level=logging.INFO)
try:
dsl_compile(obj={}, auto_envvar_prefix='KFP')
except Exception as e:
click.echo(str(e), err=True)
sys.exit(1)

0 comments on commit c48c036

Please sign in to comment.