diff --git a/sdk/python/kfp_tekton/compiler/main.py b/sdk/python/kfp_tekton/compiler/main.py index b24fb45804..e0baa4e16f 100644 --- a/sdk/python/kfp_tekton/compiler/main.py +++ b/sdk/python/kfp_tekton/compiler/main.py @@ -82,6 +82,7 @@ def compile_pyfile(pyfile, function_name, output_path, type_check, tekton_pipeli _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check, tekton_pipeline_conf) finally: del sys.path[0] + sys.modules.pop(os.path.splitext(filename)[0]) def main(): diff --git a/sdk/python/tests/perf_test_config.yaml b/sdk/python/tests/perf_test_config.yaml new file mode 100644 index 0000000000..308d2db7d8 --- /dev/null +++ b/sdk/python/tests/perf_test_config.yaml @@ -0,0 +1,39 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pipelines to be loaded in performance_test.py +# relative paths are assumed to be relative to the project root +pipeline_scripts: +# - name: "Sequential" +# path: "sdk/python/tests/compiler/testdata/sequential.py" +# - name: "Flip-Coin-testdata" +# path: "sdk/python/tests/compiler/testdata/condition.py" +# - name: "Retry" +# path: "sdk/python/tests/compiler/testdata/retry.py" +# - name: "Loop-Static" +# path: "sdk/python/tests/compiler/testdata/loop_static.py" +# - name: "Conditions-and-Loops" +# path: "sdk/python/tests/compiler/testdata/conditions_and_loops.py" +# - name: "With-item-Nested" +# path: "sdk/python/tests/compiler/testdata/withitem_nested.py" +# - name: "Condition-and-recur" +# path: "sdk/python/tests/compiler/testdata/cond_recur.py" + - name: "Flip-Coin-samples" + path: "samples/flip-coin/condition.py" # condition with simple python + - name: "Flip-Coin-Custom-Task" + path: "samples/flip-coin-custom-task/condition.py" # custom task condition with simple python + - path: "samples/lightweight-component/calc_pipeline.py" # python in script with package install + - path: "samples/trusted-ai/trusted-ai.py" # long-running python component with S3 file passing + - path: "samples/nested-loops/withitem_nested.py" # nested loop pipelines, many CRDs +# - path: "sdk/python/tests/compiler/testdata/cond_recur.yaml" # never finishes, calling itself 42 times recursively, controller cannot estimate duration diff --git a/sdk/python/tests/compiler/performance_tests.py b/sdk/python/tests/performance_tests.py similarity index 62% rename from sdk/python/tests/compiler/performance_tests.py rename to sdk/python/tests/performance_tests.py index 8e6f10f22f..0eac09a355 100755 --- a/sdk/python/tests/compiler/performance_tests.py +++ b/sdk/python/tests/performance_tests.py @@ -18,20 +18,22 @@ import datetime import functools import os -import sys # noqa import tempfile import time import threading import json +import yaml from collections import defaultdict from datetime import datetime as dt from datetime import timedelta from os import environ as env -from typing import Callable, Dict, Mapping +from os.path import pathsep +from pathlib import Path +from typing import Dict, Mapping from kfp_server_api import ApiException, ApiRun, ApiRunDetail -from kfp_tekton.compiler import TektonCompiler +from kfp_tekton.compiler.main import compile_pyfile from kfp_tekton._client import TektonClient from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf @@ -40,12 +42,13 @@ # load test settings from environment variables # ============================================================================= -# TODO: turn env vars into script parameters +# TODO: turn env vars into script parameters, use argparse PUBLIC_IP = env.get("PUBLIC_IP") NAMESPACE = env.get("NAMESPACE", None) USER_INFO = env.get("USER_INFO") CONNECT_SID = env.get("CONNECT_SID") NUM_WORKERS = int(env.get("NUM_WORKERS", 1)) +TEST_CONFIG = env.get("TEST_CONFIG") or Path(__file__).parents[0].joinpath("perf_test_config.yaml") EXPERIMENT = env.get("EXPERIMENT_NAME", "PERF_TEST") OUTPUT_FILE = env.get("OUTPUT_FILE", f"perf_test_{dt.now().strftime('%Y%m%d_%H%M%S')}_N{NUM_WORKERS}_{PUBLIC_IP}.csv") OUTPUT_SEP = env.get("OUTPUT_SEP", ",") @@ -57,6 +60,7 @@ f" USER_INFO: {USER_INFO}\n" f" CONNECT_SID: {CONNECT_SID}\n" f" NUM_WORKERS: {NUM_WORKERS}\n" + f" TEST_CONFIG: {TEST_CONFIG}\n" f" EXPERIMENT: {EXPERIMENT}\n" f" OUTPUT_FILE: {OUTPUT_FILE}\n" f" OUTPUT_SEP: {OUTPUT_SEP}\n") @@ -66,8 +70,6 @@ # local variables # ============================================================================= -# kfp_tekton_root_dir = os.path.abspath(__file__).replace("sdk/python/tests/compiler/performance_tests.py", "") - execution_times: Dict[str, Dict[str, timedelta]] = defaultdict(dict) @@ -131,15 +133,24 @@ def _synchronized_function(*args, **kwargs): @time_it # time_it inside the synchronized block so idle wait is not recorded def compile_pipeline(*, # force kwargs for time_it decorator to get pipeline_name pipeline_name: str, - pipeline_func: Callable) -> str: + pipeline_script: Path) -> str: file_name = pipeline_name + '.yaml' tmpdir = tempfile.gettempdir() # TODO: keep compiled pipelines? pipeline_package_path = os.path.join(tmpdir, file_name) pipeline_conf = TektonPipelineConf() - TektonCompiler().compile(pipeline_func=pipeline_func, - package_path=pipeline_package_path, - pipeline_conf=pipeline_conf) + + try: + compile_pyfile(pyfile=pipeline_script, + function_name=None, + output_path=pipeline_package_path, + type_check=True, + tekton_pipeline_conf=pipeline_conf) + + except ValueError as e: + print(f"{e.__class__.__name__} trying to compile {pipeline_script}: {str(e)}") + + # TODO: delete those files after running test or keep for inspection? return pipeline_package_path @@ -151,27 +162,35 @@ def submit_pipeline_run(*, # force kwargs for time_it decorator to get pipeline client = get_client() experiment = client.create_experiment(EXPERIMENT) # get or create - run_result = None - while run_result is None: # TODO: add timeout or max retries on ApiException - try: - run_result: ApiRun = client.run_pipeline( - experiment_id=experiment.id, - job_name=pipeline_name, - pipeline_package_path=pipeline_file, - params=arguments) - except ApiException as e: - print(f"KFP Server Exception: '{e.reason}' {e.status} '{e.body}'" - f" {datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')}") - time.sleep(1) - client = get_client() - return run_result.id + try: + run_result: ApiRun = client.run_pipeline( + experiment_id=experiment.id, + job_name=pipeline_name, + pipeline_package_path=pipeline_file, + params=arguments) + return run_result.id + + except ApiException as e: + print(f"KFP Server Exception trying to submit pipeline {pipeline_file}:" + f" '{e.reason}' {e.status} '{e.body}'" + f" {datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')}") + + except Exception as e: + print(f"Exception trying to submit pipeline {pipeline_file}:" + f" '{str(e)}'" + f" {datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')}") + + return None @time_it def wait_for_run_to_complete(*, # force kwargs so the time_it decorator can get pipeline_name pipeline_name: str, run_id: str) -> ApiRunDetail: + if not run_id: + return None + client = get_client() status = None @@ -182,8 +201,8 @@ def wait_for_run_to_complete(*, # force kwargs so the time_it decorator can get run: ApiRun = run_detail.run status = run.status except ApiException as e: # TODO: add timeout or max retries on ApiError - print(f"KFP Server Exception: {e.reason}") - time.sleep(1) + print(f"KFP Server Exception waiting for {pipeline_name} run {run_id}: {e.reason}") + time.sleep(10) time.sleep(0.1) @@ -205,91 +224,111 @@ def get_client() -> TektonClient: return client -def load_pipeline_functions() -> [(Callable, str)]: - pipeline_functions = [] +def get_project_root_dir() -> Path: + + script_path_presumed = "sdk/python/tests/performance_tests.py" + script_path_actually = Path(__file__) + project_root_folder = script_path_actually.parents[3] + + assert script_path_actually == project_root_folder.joinpath(script_path_presumed), \ + "Can not determine project root folder. Was this script file moved or renamed?" + + return project_root_folder + + +def load_test_config() -> dict: - from testdata.sequential import sequential_pipeline - pipeline_functions.append((sequential_pipeline, "sequential_pipeline")) + # script_path = Path(__file__) + # script_dir = script_path.parents[0] + # config_file = script_dir.joinpath("perf_test_config.yaml") - from testdata.condition import flipcoin - pipeline_functions.append((flipcoin, "flipcoin")) + with open(TEST_CONFIG, "r") as f: + test_config = yaml.safe_load(f) - from testdata.compose import save_most_frequent_word - pipeline_functions.append((save_most_frequent_word, "compose")) + return test_config - from testdata.retry import retry_sample_pipeline - pipeline_functions.append((retry_sample_pipeline, "retry")) - from testdata.loop_static import pipeline as loop_static - pipeline_functions.append((loop_static, "loop_static")) +def load_pipeline_scripts() -> [(Path, str)]: - from testdata.conditions_and_loops import conditions_and_loops - pipeline_functions.append((conditions_and_loops, "conditions_and_loops")) + pipeline_files_with_name = [] + test_config = load_test_config() + project_dir = get_project_root_dir() - # from testdata.loop_in_recursion import flipcoin as loop_in_loop - # pipeline_functions.append((loop_in_loop, "loop_in_recursion")) - # - # from testdata.condition_custom_task import flipcoin_pipeline - # pipeline_functions.append((flipcoin_pipeline, "condition_custom_task")) + for path_name_dict in test_config["pipeline_scripts"]: - # TODO: add more pipelines + path = path_name_dict["path"] + name = path_name_dict.get("name") or Path(path).stem - # NOTE: loading samples from outside package scope is hacky - # sys.path.insert(1, '/Users/dummy/projects/kfp-tekton/samples/lightweight-component') - # from calc_pipeline import calc_pipeline - # pipeline_functions.append(calc_pipeline) + if not path.startswith(pathsep): + # path assumed to be relative to project root + fp: Path = project_dir.joinpath(path) + else: + # path is absolute + fp = Path(path) - return pipeline_functions + assert fp.exists(), f"Cannot find file: {fp.resolve()}" + pipeline_files_with_name.append((fp, name)) -def run_concurrently(pipelinefunc_name_tuples: [(Callable, str)]) -> [(str, str)]: - pipeline_status = [] + print(f"Loaded {len(pipeline_files_with_name)} pipelines from {TEST_CONFIG}\n") + + return pipeline_files_with_name + + +def run_concurrently(pipelinescript_name_tuples: [(Path, str)]): with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: performance_tests = ( - executor.submit(run_single_pipeline_performance_test, func, name) - for (func, name) in pipelinefunc_name_tuples + executor.submit(run_single_pipeline_performance_test, pipeline_script, name) + for (pipeline_script, name) in pipelinescript_name_tuples ) for performance_test in concurrent.futures.as_completed(performance_tests): try: - run_details = performance_test.result().run - pipeline_status.append((run_details.name, run_details.status)) + run_details = performance_test.result() # noqa F841 except Exception as e: error = f"{e.__class__.__name__}: {str(e)}" print(error) - pipeline_status.append(("unknown pipeline", error)) - - return pipeline_status -def run_single_pipeline_performance_test(pipeline_func: Callable, - pipeline_name: str) -> ApiRunDetail: +def run_single_pipeline_performance_test(pipeline_script: Path, + pipeline_name: str): + try: + pipeline_file = compile_pipeline(pipeline_name=pipeline_name, pipeline_script=pipeline_script) + run_id = submit_pipeline_run(pipeline_name=pipeline_name, pipeline_file=pipeline_file) + run_details = wait_for_run_to_complete(pipeline_name=pipeline_name, run_id=run_id) - pipeline_file = compile_pipeline(pipeline_name=pipeline_name, pipeline_func=pipeline_func) - run_id = submit_pipeline_run(pipeline_name=pipeline_name, pipeline_file=pipeline_file) - run_details = wait_for_run_to_complete(pipeline_name=pipeline_name, run_id=run_id) - task_details = parse_run_details(run_details) + status = run_details.run.status if run_details else "Error" + task_details = parse_run_details(run_details) - append_exec_times_to_output_file(pipeline_name, task_details) + append_exec_times_to_output_file(pipeline_name, status, task_details) - return run_details + except Exception as e: + error = f"{e.__class__.__name__} while testing '{pipeline_name}': {str(e)}" + print(error) def parse_run_details(run_details: ApiRunDetail) -> dict: - rev = {} + task_details = {} + + if not run_details: + return {} + pipelinerun = json.loads(run_details.to_dict()["pipeline_runtime"]["workflow_manifest"]) status = pipelinerun["status"] def get_details(data): + info = {} total = timedelta(0) count = 0 + dt_fmt = "%Y-%m-%dT%H:%M:%SZ" + for key in data.keys(): run = data[key] status = run["status"] conditions = status["conditions"] state = conditions[len(conditions) - 1]['type'] - elapsed = dt.strptime(status['completionTime'], "%Y-%m-%dT%H:%M:%SZ") - dt.strptime(status['startTime'], "%Y-%m-%dT%H:%M:%SZ") + elapsed = dt.strptime(status['completionTime'], dt_fmt) - dt.strptime(status['startTime'], dt_fmt) info[run['pipelineTaskName']] = { "elapsed": elapsed, "status": state @@ -302,33 +341,37 @@ def get_details(data): return info if "taskRuns" in status: - rev["taskRuns"] = get_details(status["taskRuns"]) + task_details["taskRuns"] = get_details(status["taskRuns"]) if "runs" in status: - rev["run"] = get_details(status["runs"]) + task_details["runs"] = get_details(status["runs"]) - return rev + return task_details -def append_exec_times_to_output_file(pipeline_name: str, tasks: dict): +def append_exec_times_to_output_file(pipeline_name: str, + status: str = "", + tasks: dict = {}): - compile_time = str(execution_times[pipeline_name][compile_pipeline.__name__]) - submit_time = str(execution_times[pipeline_name][submit_pipeline_run.__name__]) - run_time = str(execution_times[pipeline_name][wait_for_run_to_complete.__name__]) + compile_time = execution_times[pipeline_name][compile_pipeline.__name__] + submit_time = execution_times[pipeline_name][submit_pipeline_run.__name__] + run_time = execution_times[pipeline_name][wait_for_run_to_complete.__name__] taskruns = 0 taskrun_elapsed = timedelta(0) runs = 0 run_elapsed = timedelta(0) + if "taskRuns" in tasks: taskruns = tasks["taskRuns"]["count"] taskrun_elapsed = tasks["taskRuns"]["total_elapsed"] + if "runs" in tasks: runs = tasks["runs"]["count"] run_elapsed = tasks["runs"]["total_elapsed"] with open(OUTPUT_FILE, "a") as f: f.write(OUTPUT_SEP.join([ - pipeline_name, compile_time, submit_time, run_time, + pipeline_name, status, str(compile_time), str(submit_time), str(run_time), str(taskruns), str(runs), str(taskrun_elapsed), str(run_elapsed) ])) f.write("\n") @@ -337,20 +380,24 @@ def append_exec_times_to_output_file(pipeline_name: str, tasks: dict): def create_output_file(): with open(OUTPUT_FILE, "w") as f: - f.write(OUTPUT_SEP.join(["Pipeline", "Compile", "Submit", "Run", - "Num_TaskRuns", "Num_Runs", "Total_TaskRun_Time", "Total_Run_Time"]) + "\n") + + f.write(OUTPUT_SEP.join([ + "Pipeline", "Status", "Compile_Time", "Submit_Time", "Run_Time", + "Num_TaskRuns", "Num_Runs", "Total_TaskRun_Time", "Total_Run_Time" + ])) + f.write("\n") def run_performance_tests(): create_output_file() - pipeline_functions = load_pipeline_functions() + pipeline_scripts = load_pipeline_scripts() if NUM_WORKERS == 1: # TODO: use `run_concurrently()` even with 1 worker - for func, name in pipeline_functions: - run_single_pipeline_performance_test(func, name) + for script, name in pipeline_scripts: + run_single_pipeline_performance_test(script, name) else: - run_concurrently(pipeline_functions) + run_concurrently(pipeline_scripts) if __name__ == '__main__':