diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto index 1e6bf932201..c5c8f45f7f7 100644 --- a/api/v2alpha1/pipeline_spec.proto +++ b/api/v2alpha1/pipeline_spec.proto @@ -484,6 +484,29 @@ message PipelineTaskSpec { // Iterator to iterate over a parameter input. ParameterIteratorSpec parameter_iterator = 10; } + + // User-configured task-level retry. + message RetryPolicy { + // Number of retries before considering a task as failed. Set to 0 or + // unspecified to disallow retry." + int32 max_retry_count = 1; + + // The time interval between retries. Defaults to zero (an immediate retry). + google.protobuf.Duration backoff_duration = 2; + + // The exponential backoff factor applied to backoff_duration. If + // unspecified, will default to 2. + double backoff_factor = 3; + + // The maximum duration during which the task will be retried according to + // the backoff strategy. Max allowed is 1 hour - higher value will be capped + // to this limit. If unspecified, will set to 1 hour. + google.protobuf.Duration backoff_max_duration = 4; + } + + // User-configured task-level retry. + // Applicable only to component tasks. + RetryPolicy retry_policy = 11; } // The spec of an artifact iterator. It supports fan-out a workflow from a list diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index 538f0844da6..bea2afa5e7f 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -7,6 +7,7 @@ import ( structpb "github.com/golang/protobuf/ptypes/struct" "github.com/ghodss/yaml" + "github.com/golang/glog" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" api "github.com/kubeflow/pipelines/backend/api/go_client" "github.com/kubeflow/pipelines/backend/src/common/util" @@ -36,6 +37,7 @@ func (t *V2Spec) ScheduledWorkflow(apiJob *api.Job) (*scheduledworkflow.Schedule } job.RuntimeConfig = jobRuntimeConfig wf, err := argocompiler.Compile(job, nil) + glog.Infof("wf value: %+v", wf) if err != nil { return nil, util.Wrap(err, "Failed to compile job") } @@ -147,6 +149,7 @@ func (t *V2Spec) RunWorkflow(apiRun *api.Run, options RunWorkflowOptions) (*util } job.RuntimeConfig = jobRuntimeConfig wf, err := argocompiler.Compile(job, nil) + glog.Infof("wf value: %+v", wf) if err != nil { return nil, util.Wrap(err, "Failed to compile job") } diff --git a/backend/src/v2/cmd/compiler/main.go b/backend/src/v2/cmd/compiler/main.go index 94f94f55912..9f755307be5 100644 --- a/backend/src/v2/cmd/compiler/main.go +++ b/backend/src/v2/cmd/compiler/main.go @@ -42,6 +42,9 @@ func main() { flag.Parse() noSpec := specPath == nil || *specPath == "" noJob := jobPath == nil || *jobPath == "" + glog.Info("Hiiiiiiiiiiiiiiii!!!!!!!!!!!!!!!!!!!!!!!!!!!") + noSpec = true + noJob = true if noSpec && noJob { glog.Exitf("spec or job must be specified") } @@ -70,6 +73,7 @@ func compile(job *pipelinespec.PipelineJob) error { LauncherImage: *launcher, PipelineRoot: *pipelineRoot, }) + glog.Infof("wf value: %+v", wf) if err != nil { return err } diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index c4fb4868e6b..eb798d0e2dd 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -19,6 +19,7 @@ import ( "strings" wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/golang/glog" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/kubeflow/pipelines/backend/src/v2/compiler" "google.golang.org/protobuf/proto" @@ -73,6 +74,16 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow, } } } + // fill in exit handler + var exit_task string + all_root_tasks := spec.GetRoot().GetDag().GetTasks() + for task_name, task_spec := range all_root_tasks { + glog.Infof("task name, task spec: %s, %+v", task_name, task_spec) + if task_spec.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { + exit_task = task_name + glog.Infof("exit task: %s", exit_task) + } + } // initialization wf := &wfapi.Workflow{ TypeMeta: k8smeta.TypeMeta{ @@ -101,13 +112,15 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow, }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, + OnExit: exit_task, }, } c := &workflowCompiler{ wf: wf, templates: make(map[string]*wfapi.Template), // TODO(chensun): release process and update the images. - driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest", + // driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest", + driverImage: "gcr.io/ling-kfp/dev/kfp-driver:latest", launcherImage: "gcr.io/ml-pipeline-test/dev/kfp-launcher-v2:latest", job: job, spec: spec, diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 87eb211892f..d5d2b62f783 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -122,6 +122,7 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio } // TODO(Bobgy): fill in run resource. pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, opts.Namespace, "run-resource", pipelineRoot) + glog.Infof("metadata pipeline: %+v", pipeline) if err != nil { return nil, err } @@ -414,6 +415,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E executorInput := &pipelinespec.ExecutorInput{ Inputs: inputs, } + glog.Infof("executorInput value: %+v", executorInput) execution = &Execution{ExecutorInput: executorInput} condition := opts.Task.GetTriggerPolicy().GetCondition() if condition != "" { @@ -436,14 +438,37 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E return execution, fmt.Errorf("ArtifactIterator is not implemented") } isIterator := opts.Task.GetParameterIterator() != nil && opts.IterationIndex < 0 + // Fan out iterations if execution.WillTrigger() && isIterator { iterator := opts.Task.GetParameterIterator() - value, ok := executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] report := func(err error) error { return fmt.Errorf("iterating on item input %q failed: %w", iterator.GetItemInput(), err) } - if !ok { - return execution, report(fmt.Errorf("cannot find input parameter")) + // Check the items type of parameterIterator: + // It can be "inputParameter" or "Raw" + var value *structpb.Value + var ok bool + switch iterator.GetItems().GetKind().(type) { + case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: + value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] + if !ok { + return execution, report(fmt.Errorf("cannot find input parameter")) + } + case *pipelinespec.ParameterIteratorSpec_ItemsSpec_Raw: + value_raw := iterator.GetItems().GetRaw() + var unmarshalled_raw interface{} + err = json.Unmarshal([]byte(value_raw), &unmarshalled_raw) + if err != nil { + return execution, fmt.Errorf("error unmarshall raw string: %q", err) + } + value, err = structpb.NewValue(unmarshalled_raw) + if err != nil { + return execution, fmt.Errorf("error converting unmarshalled raw string into protobuf Value type: %q", err) + } + // Add the raw input to the executor input + execution.ExecutorInput.Inputs.ParameterValues[iterator.GetItemInput()] = value + default: + return execution, fmt.Errorf("cannot find parameter iterator") } items, err := getItems(value) if err != nil { @@ -724,7 +749,16 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, case task.GetArtifactIterator() != nil: return nil, fmt.Errorf("artifact iterator not implemented yet") case task.GetParameterIterator() != nil: - itemsInput := task.GetParameterIterator().GetItems().GetInputParameter() + var itemsInput string + if task.GetParameterIterator().GetItems().GetInputParameter() != "" { + // input comes from outside the component + itemsInput = task.GetParameterIterator().GetItems().GetInputParameter() + } else if task.GetParameterIterator().GetItemInput() != "" { + // input comes from static input + itemsInput = task.GetParameterIterator().GetItemInput() + } else { + return nil, fmt.Errorf("cannot retrieve parameter iterator.") + } items, err := getItems(inputs.ParameterValues[itemsInput]) if err != nil { return nil, err diff --git a/components/google-cloud/docs/source/google_cloud_pipeline_components.experimental.rst b/components/google-cloud/docs/source/google_cloud_pipeline_components.experimental.rst index 5261df82bca..b1871f88740 100644 --- a/components/google-cloud/docs/source/google_cloud_pipeline_components.experimental.rst +++ b/components/google-cloud/docs/source/google_cloud_pipeline_components.experimental.rst @@ -16,4 +16,5 @@ Components google_cloud_pipeline_components.experimental.forecasting google_cloud_pipeline_components.experimental.hyperparameter_tuning_job google_cloud_pipeline_components.experimental.tensorflow_probability + google_cloud_pipeline_components.experimental.vertex_notification_email google_cloud_pipeline_components.experimental.wait_gcp_resources diff --git a/components/google-cloud/google_cloud_pipeline_components/aiplatform/evaluation/import.yaml b/components/google-cloud/google_cloud_pipeline_components/aiplatform/evaluation/import.yaml new file mode 100644 index 00000000000..4ee11d9e1e4 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/aiplatform/evaluation/import.yaml @@ -0,0 +1,34 @@ +name: import_model_evaluation +description: | + Calls ModelService.ImportModelEvaluation to import a model evaluation file to Vertex + + Args: + metrics (system.Metrics): + Path of metrics generated from an evaluation component. + explanation (system.Metrics): + Path for model explanation metrics generated from an evaluation comonent. + model (google.VertexModel): + Vertex model resource that will be the parent resource of the uploaded evaluation. + metrics_schema_uri (str): + GCS link to the schema URI for model evaluation metrics. +inputs: + - {name: metrics, type: Metrics} + - {name: explanation, type: Metrics, optional: True} + - {name: model, type: google.VertexModel} + - {name: metrics_schema_uri, type: String} +implementation: + container: + image: gcr.io/ml-pipeline/google-cloud-pipeline-components:latest + command: [python3, -u, -m, google_cloud_pipeline_components.container.experimental.evaluation.import_model_evaluation] + args: + - --metrics + - {inputPath: metrics} + - if: + cond: {isPresent: explanation} + then: + - --explanation + - "{{$.inputs.artifacts['explanation'].metadata['explanation_gcs_path']}}" + - --metrics_schema_uri + - {inputValue: metrics_schema_uri} + - --model_name + - "{{$.inputs.artifacts['model'].metadata['resourceName']}}" diff --git a/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/__init__.py b/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/__init__.py new file mode 100644 index 00000000000..e73b72155a1 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Google Cloud Pipeline Evaluation Components root.""" diff --git a/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/import_model_evaluation.py b/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/import_model_evaluation.py new file mode 100644 index 00000000000..c646d13a78b --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/import_model_evaluation.py @@ -0,0 +1,112 @@ +# Copyright 2022 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module for importing a model evaluation to an existing Vertex model resource.""" + +import sys +import argparse +import json +import six + +from google.cloud import aiplatform +from google.api_core import gapic_v1 +from google.protobuf.struct_pb2 import Value, Struct, NULL_VALUE, ListValue + + +def main(argv): + """Calls ModelService.ImportModelEvaluation""" + parser = argparse.ArgumentParser( + prog='Vertex Model Service evaluation importer', description='') + parser.add_argument( + '--metrics', + dest='metrics', + type=str, + required=True, + default=argparse.SUPPRESS) + parser.add_argument( + '--explanation', dest='explanation', type=str, default=None) + parser.add_argument( + '--metrics_schema_uri', + dest='metrics_schema_uri', + type=str, + required=True, + default=argparse.SUPPRESS) + parser.add_argument( + '--model_name', + dest='model_name', + type=str, + required=True, + default=argparse.SUPPRESS) + + parsed_args, _ = parser.parse_known_args(argv) + + _, project_id, _, location, _, model_id = parsed_args.model_name.split('/') + + with open(parsed_args.metrics) as metrics_file: + model_evaluation = { + 'metrics': + to_value( + next( + iter( + json.loads(metrics_file.read())['slicedMetrics'][0] + ['metrics'].values()))), + 'metrics_schema_uri': + parsed_args.metrics_schema_uri, + } + + if parsed_args.explanation: + with open('/gcs' + parsed_args.explanation[4:]) as explanation_file: + model_evaluation['model_explanation'] = { + 'mean_attributions': [{ + 'feature_attributions': + to_value( + json.loads(explanation_file.read())['explanation'] + ['attributions'][0]['featureAttributions']) + }] + } + print(model_evaluation) + aiplatform.gapic.ModelServiceClient( + client_info=gapic_v1.client_info.ClientInfo( + user_agent='google-cloud-pipeline-components',), + client_options={ + 'api_endpoint': location + '-aiplatform.googleapis.com', + }).import_model_evaluation( + parent=parsed_args.model_name, + model_evaluation=model_evaluation, + ) + + +def to_value(value): + if value is None: + return Value(null_value=NULL_VALUE) + elif isinstance(value, bool): + # This check needs to happen before isinstance(value, int), + # isinstance(value, int) returns True when value is bool. + return Value(bool_value=value) + elif isinstance(value, six.integer_types) or isinstance(value, float): + return Value(number_value=value) + elif isinstance(value, six.string_types) or isinstance(value, six.text_type): + return Value(string_value=value) + elif isinstance(value, dict): + return Value( + struct_value=Struct(fields={k: to_value(v) for k, v in value.items()})) + elif isinstance(value, list): + return Value( + list_value=ListValue(values=[to_value(item) for item in value])) + else: + raise ValueError('Unsupported data type: {}'.format(type(value))) + + +if __name__ == '__main__': + print(sys.argv) + main(sys.argv[1:]) diff --git a/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/__init__.py b/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/__init__.py index d2d960c7625..a4b8b796d4a 100644 --- a/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/__init__.py +++ b/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/__init__.py @@ -24,7 +24,7 @@ 'CvTrainerOp', 'InfraValidatorOp', 'Stage1TunerOp', 'EnsembleOp', 'StatsAndExampleGenOp', 'FeatureSelectionOp', 'TransformOp', 'FinalizerOp', 'WideAndDeepTrainerOp', 'BuiltinAlgorithmHyperparameterTuningJobOp', - 'TabNetTrainerOp' + 'TabNetTrainerOp', 'FeatureTransformEngineOp' ] CvTrainerOp = load_component_from_file( @@ -41,6 +41,8 @@ os.path.join(os.path.dirname(__file__), 'feature_selection.yaml')) TransformOp = load_component_from_file( os.path.join(os.path.dirname(__file__), 'transform.yaml')) +FeatureTransformEngineOp = load_component_from_file( + os.path.join(os.path.dirname(__file__), 'feature_transform_engine.yaml')) FinalizerOp = load_component_from_file( os.path.join(os.path.dirname(__file__), 'finalizer.yaml')) WideAndDeepTrainerOp = load_component_from_file( diff --git a/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/feature_transform_engine.yaml b/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/feature_transform_engine.yaml new file mode 100644 index 00000000000..efdd2a9216e --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/feature_transform_engine.yaml @@ -0,0 +1,123 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: feature_transform_engine +description: | + Feature transform engine to transform raw data to engineered features. + + Args: + project (str): + Required. Project to run feature transform engine. + location (Optional[str]): + Location for running the feature transform engine. If not set, + default to us-central1. + root_dir (str): The Cloud Storage location to store the output. + analyze_data (Dataset): Configuration of the dataset to be analyzed. + transform_data (Dataset): Configuration of the dataset to be transformed. + transform_config (str): Feature transformation configurations. + dataflow_machine_type (Optional[str]): + The machine type used for dataflow jobs. If not set, default to n1-standard-16. + dataflow_max_num_workers (Optional[int]): + The number of workers to run the dataflow job. If not set, default to 25. + dataflow_disk_size_gb (Optional[int]): + The disk size, in gigabytes, to use on each Dataflow worker instance. If not set, + default to 40. + dataflow_subnetwork (Optional[str]): + Dataflow's fully qualified subnetwork name, when empty the default subnetwork will be + used. More details: + https://cloud.google.com/dataflow/docs/guides/specifying-networks#example_network_and_subnetwork_specifications + dataflow_use_public_ips (Optional[bool]): + Specifies whether Dataflow workers use public IP addresses. + encryption_spec_key_name (Optional[str]): + Customer-managed encryption key. + + Returns: + materialized_data (Dataset): The materialized dataset. + transform_output (TransformOutput): The transform output artifact. + gcp_resources (str): + GCP resources created by this component. + For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md. +inputs: +- {name: project, type: String} +- {name: location, type: String, default: "us-central1"} +- {name: root_dir, type: String} +- {name: analyze_data, type: Dataset} +- {name: transform_data, type: Dataset} +- {name: transform_config, type: String} +- {name: dataflow_machine_type, type: String, default: "n1-standard-16"} +- {name: dataflow_max_num_workers, type: Integer, default: "25"} +- {name: dataflow_disk_size_gb, type: Integer, default: "40"} +- {name: dataflow_subnetwork, type: String, default: ""} +- {name: dataflow_use_public_ips, type: Boolean, default: "true"} +- {name: encryption_spec_key_name, type: String, default: ""} + +outputs: +- {name: materialized_data, type: Dataset} +- {name: transform_output, type: TransformOutput} +- {name: gcp_resources, type: String} + +implementation: + container: + image: gcr.io/ml-pipeline/google-cloud-pipeline-components:latest + command: [python3, -u, -m, google_cloud_pipeline_components.container.v1.gcp_launcher.launcher] + args: [ + --type, CustomJob, + --project, {inputValue: project}, + --location, {inputValue: location}, + --gcp_resources, {outputPath: gcp_resources}, + --payload, + concat: [ + '{"display_name": "feature-transform-engine-{{$.pipeline_job_uuid}}-{{$.pipeline_task_uuid}}", "encryption_spec": {"kms_key_name":"', + {inputValue: encryption_spec_key_name}, + '"}, "job_spec": {"worker_pool_specs": [{"replica_count": 1, "machine_spec": {"machine_type": "n1-standard-8"}, "container_spec": {"image_uri":"', + 'us-docker.pkg.dev/vertex-ai-restricted/automl-tabular/training:prod', + '", "args": ["feature_transform_engine", "--transform_output_artifact_path=', + {outputUri: transform_output}, + '", "--transform_output_path=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/transform", "--materialized_data_path=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/materialized_data","--materialized_data_artifact_path=', + {outputUri: materialized_data}, + '", "--transform_config=', + {inputValue: transform_config}, + '", "--analyze_data_path=', + {inputUri: analyze_data}, + '", "--transform_data_path=', + {inputUri: transform_data}, + '", "--job_name=feature-transform-engine-{{$.pipeline_job_uuid}}-{{$.pipeline_task_uuid}}', + '", "--dataflow_project=', + {inputValue: project}, + '", "--error_file_path=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/error.pb", "--dataflow_staging_dir=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/dataflow_staging", "--dataflow_tmp_dir=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/dataflow_tmp", "--dataflow_max_num_workers=', + {inputValue: dataflow_max_num_workers}, + '", "--dataflow_machine_type=', + {inputValue: dataflow_machine_type}, + '", "--dataflow_worker_container_image=', + 'us-docker.pkg.dev/vertex-ai/automl-tabular/dataflow-worker:prod', + '", "--dataflow_disk_size_gb=', + {inputValue: dataflow_disk_size_gb}, + '", "--dataflow_subnetwork_fully_qualified=', + {inputValue: dataflow_subnetwork}, + '", "--dataflow_use_public_ips=', + {inputValue: dataflow_use_public_ips}, + '", "--dataflow_kms_key=', + {inputValue: encryption_spec_key_name}, + '"]}}]}}' + ]] diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000000..19bad04b634 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,4 @@ +# Global options: + +[mypy] +ignore_missing_imports = true diff --git a/samples/core/loop_static/loop_static_v2.py b/samples/core/loop_static/loop_static_v2.py index a5c1eafe9c0..62f92b6e814 100644 --- a/samples/core/loop_static/loop_static_v2.py +++ b/samples/core/loop_static/loop_static_v2.py @@ -19,15 +19,12 @@ def concat_op(a: str, b: str) -> str: return a + b -_DEFAULT_LOOP_ARGUMENTS = [{'a': '1', 'b': '2'}, {'a': '10', 'b': '20'}] - - @dsl.pipeline(name='pipeline-with-loop-static') def my_pipeline( - static_loop_arguments: List[dict] = _DEFAULT_LOOP_ARGUMENTS, greeting: str = 'this is a test for looping through parameters', ): print_task = print_op(text=greeting) + static_loop_arguments = [{'a': '1', 'b': '2'}, {'a': '10', 'b': '20'}] with dsl.ParallelFor(static_loop_arguments) as item: concat_task = concat_op(a=item.a, b=item.b) diff --git a/sdk/python/kfp/cli/__main__.py b/sdk/python/kfp/cli/__main__.py new file mode 100644 index 00000000000..3f77eac5205 --- /dev/null +++ b/sdk/python/kfp/cli/__main__.py @@ -0,0 +1,41 @@ +# Copyright 2018-2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys + +import click +import typer +from kfp.cli import cli +from kfp.cli import components +from kfp.cli import diagnose_me_cli +from kfp.cli import experiment +from kfp.cli import pipeline +from kfp.cli import recurring_run +from kfp.cli import run + + +def main(): + logging.basicConfig(format='%(message)s', level=logging.INFO) + cli.cli.add_command(run.run) + cli.cli.add_command(recurring_run.recurring_run) + cli.cli.add_command(pipeline.pipeline) + cli.cli.add_command(diagnose_me_cli.diagnose_me) + cli.cli.add_command(experiment.experiment) + cli.cli.add_command(typer.main.get_command(components.app)) + try: + cli.cli(obj={}, auto_envvar_prefix='KFP') + except Exception as e: + click.echo(str(e), err=True) + sys.exit(1) \ No newline at end of file diff --git a/sdk/python/kfp/cli/cli.py b/sdk/python/kfp/cli/cli.py index 87f30fb1254..dd42ceba536 100644 --- a/sdk/python/kfp/cli/cli.py +++ b/sdk/python/kfp/cli/cli.py @@ -12,22 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - import click -import typer -from kfp.cli import components -from kfp.cli.diagnose_me_cli import diagnose_me -from kfp.cli.experiment import experiment from kfp.cli.output import OutputFormat -from kfp.cli.pipeline import pipeline -from kfp.cli.recurring_run import recurring_run -from kfp.cli.run import run from kfp.client import Client -_NO_CLIENT_COMMANDS = ['diagnose_me', 'components'] - @click.group() @click.option('--endpoint', help='Endpoint of the KFP API service to connect.') @@ -58,25 +46,11 @@ def cli(ctx: click.Context, endpoint: str, iap_client_id: str, namespace: str, Feature stage: [Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha) """ - if ctx.invoked_subcommand in _NO_CLIENT_COMMANDS: + NO_CLIENT_COMMANDS = ['diagnose_me', 'components'] + if ctx.invoked_subcommand in NO_CLIENT_COMMANDS: # Do not create a client for these subcommands return ctx.obj['client'] = Client(endpoint, iap_client_id, namespace, other_client_id, other_client_secret) ctx.obj['namespace'] = namespace ctx.obj['output'] = output - - -def main(): - logging.basicConfig(format='%(message)s', level=logging.INFO) - cli.add_command(run) - cli.add_command(recurring_run) - cli.add_command(pipeline) - cli.add_command(diagnose_me, 'diagnose_me') - cli.add_command(experiment) - cli.add_command(typer.main.get_command(components.app)) - try: - cli(obj={}, auto_envvar_prefix='KFP') - except Exception as e: - click.echo(str(e), err=True) - sys.exit(1) diff --git a/sdk/python/kfp/cli/components.py b/sdk/python/kfp/cli/components.py index 81f25df6ae1..8e9be563c9e 100644 --- a/sdk/python/kfp/cli/components.py +++ b/sdk/python/kfp/cli/components.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import configparser import contextlib import enum import pathlib @@ -50,9 +49,9 @@ # Location in which to write out shareable YAML for components. _COMPONENT_METADATA_DIR = 'component_metadata' -_DOCKERIGNORE_TEMPLATE = ''' -{}/ -'''.format(_COMPONENT_METADATA_DIR) +_DOCKERIGNORE_TEMPLATE = f''' +{_COMPONENT_METADATA_DIR}/ +''' # Location at which v2 Python function-based components will stored # in containerized components. @@ -81,17 +80,17 @@ class _Engine(str, enum.Enum): def _info(message: Any): info = typer.style('INFO', fg=typer.colors.GREEN) - typer.echo('{}: {}'.format(info, message)) + typer.echo(f'{info}: {message}') def _warning(message: Any): info = typer.style('WARNING', fg=typer.colors.YELLOW) - typer.echo('{}: {}'.format(info, message)) + typer.echo(f'{info}: {message}') def _error(message: Any): info = typer.style('ERROR', fg=typer.colors.RED) - typer.echo('{}: {}'.format(info, message)) + typer.echo(f'{info}: {message}') class _ComponentBuilder(): @@ -116,17 +115,17 @@ def __init__( self._context_directory = context_directory self._dockerfile = self._context_directory / _DOCKERFILE self._component_filepattern = component_filepattern - self._components: List[ - component_factory.component_factory.ComponentInfo] = [] + self._components: List[component_factory.ComponentInfo] = [] # This is only set if we need to install KFP from local copy. self._maybe_copy_kfp_package = '' if kfp_package_path is None: - self._kfp_package_path = 'kfp=={}'.format(kfp.__version__) + self._kfp_package_path = f'kfp=={kfp.__version__}' elif kfp_package_path.is_dir(): - _info('Building KFP package from local directory {}'.format( - typer.style(str(kfp_package_path), fg=typer.colors.CYAN))) + _info( + f'Building KFP package from local directory {typer.style(str(kfp_package_path), fg=typer.colors.CYAN)}' + ) temp_dir = pathlib.Path(tempfile.mkdtemp()) try: subprocess.run([ @@ -139,8 +138,7 @@ def __init__( cwd=kfp_package_path) wheel_files = list(temp_dir.glob('*.whl')) if len(wheel_files) != 1: - _error('Failed to find built KFP wheel under {}'.format( - temp_dir)) + _error(f'Failed to find built KFP wheel under {temp_dir}') raise typer.Exit(1) wheel_file = wheel_files[0] @@ -149,16 +147,17 @@ def __init__( self._maybe_copy_kfp_package = 'COPY {wheel_name} {wheel_name}'.format( wheel_name=self._kfp_package_path) except subprocess.CalledProcessError as e: - _error('Failed to build KFP wheel locally:\n{}'.format(e)) + _error(f'Failed to build KFP wheel locally:\n{e}') raise typer.Exit(1) finally: - _info('Cleaning up temporary directory {}'.format(temp_dir)) + _info(f'Cleaning up temporary directory {temp_dir}') shutil.rmtree(temp_dir) else: - self._kfp_package_path = kfp_package_path + self._kfp_package_path = str(kfp_package_path) - _info('Building component using KFP package path: {}'.format( - typer.style(str(self._kfp_package_path), fg=typer.colors.CYAN))) + _info( + f'Building component using KFP package path: {typer.style(self._kfp_package_path, fg=typer.colors.CYAN)}' + ) self._context_directory_files = [ file.name @@ -178,8 +177,8 @@ def __init__( def _load_components(self): if not self._component_files: _error( - 'No component files found matching pattern `{}` in directory {}' - .format(self._component_filepattern, self._context_directory)) + f'No component files found matching pattern `{self._component_filepattern}` in directory {self._context_directory}' + ) raise typer.Exit(1) for python_file in self._component_files: @@ -192,23 +191,25 @@ def _load_components(self): formatted_module_file = typer.style( str(python_file), fg=typer.colors.CYAN) if not component_modules: - _error('No KFP components found in file {}'.format( - formatted_module_file)) + _error( + f'No KFP components found in file {formatted_module_file}' + ) raise typer.Exit(1) - _info('Found {} component(s) in file {}:'.format( - len(component_modules), formatted_module_file)) + _info( + f'Found {len(component_modules)} component(s) in file {formatted_module_file}:' + ) for name, component in component_modules.items(): - _info('{}: {}'.format(name, component)) + _info(f'{name}: {component}') self._components.append(component) - base_images = set([info.base_image for info in self._components]) - target_images = set([info.target_image for info in self._components]) + base_images = {info.base_image for info in self._components} + target_images = {info.target_image for info in self._components} if len(base_images) != 1: - _error('Found {} unique base_image values {}. Components' - ' must specify the same base_image and target_image.'.format( - len(base_images), base_images)) + _error( + f'Found {len(base_images)} unique base_image values {base_images}. Components must specify the same base_image and target_image.' + ) raise typer.Exit(1) self._base_image = base_images.pop() @@ -217,13 +218,14 @@ def _load_components(self): ' components. A base_image must be specified in order to' ' build the component.') raise typer.Exit(1) - _info('Using base image: {}'.format( - typer.style(self._base_image, fg=typer.colors.YELLOW))) + _info( + f'Using base image: {typer.style(self._base_image, fg=typer.colors.YELLOW)}' + ) if len(target_images) != 1: - _error('Found {} unique target_image values {}. Components' - ' must specify the same base_image and' - ' target_image.'.format(len(target_images), target_images)) + _error( + f'Found {len(target_images)} unique target_image values {target_images}. Components must specify the same base_image and target_image.' + ) raise typer.Exit(1) self._target_image = target_images.pop() @@ -232,8 +234,9 @@ def _load_components(self): ' components. A target_image must be specified in order' ' to build the component.') raise typer.Exit(1) - _info('Using target image: {}'.format( - typer.style(self._target_image, fg=typer.colors.YELLOW))) + _info( + f'Using target image: {typer.style(self._target_image, fg=typer.colors.YELLOW)}' + ) def _maybe_write_file(self, filename: str, @@ -241,22 +244,23 @@ def _maybe_write_file(self, overwrite: bool = False): formatted_filename = typer.style(filename, fg=typer.colors.CYAN) if filename in self._context_directory_files: - _info('Found existing file {} under {}.'.format( - formatted_filename, self._context_directory)) + _info( + f'Found existing file {formatted_filename} under {self._context_directory}.' + ) if not overwrite: _info('Leaving this file untouched.') return else: - _warning( - 'Overwriting existing file {}'.format(formatted_filename)) + _warning(f'Overwriting existing file {formatted_filename}') else: - _warning('{} not found under {}. Creating one.'.format( - formatted_filename, self._context_directory)) + _warning( + f'{formatted_filename} not found under {self._context_directory}. Creating one.' + ) filepath = self._context_directory / filename with open(filepath, 'w') as f: - f.write('# Generated by KFP.\n{}'.format(contents)) - _info('Generated file {}.'.format(filepath)) + f.write(f'# Generated by KFP.\n{contents}') + _info(f'Generated file {filepath}.') def maybe_generate_requirements_txt(self): self._maybe_write_file(_REQUIREMENTS_TXT, '') @@ -266,9 +270,8 @@ def maybe_generate_dockerignore(self): def write_component_files(self): for component_info in self._components: - filename = ( - component_info.output_component_file or - component_info.function_name + '.yaml') + filename = component_info.output_component_file or f'{component_info.function_name}.yaml' + container_filename = ( self._context_directory / _COMPONENT_METADATA_DIR / filename) container_filename.parent.mkdir(exist_ok=True, parents=True) @@ -295,8 +298,9 @@ def maybe_generate_dockerfile(self, overwrite_dockerfile: bool = False): overwrite_dockerfile) def build_image(self, push_image: bool = True): - _info('Building image {} using Docker...'.format( - typer.style(self._target_image, fg=typer.colors.YELLOW))) + _info( + f'Building image {typer.style(self._target_image, fg=typer.colors.YELLOW)} using Docker...' + ) client = docker.from_env() docker_log_prefix = typer.style('Docker', fg=typer.colors.CYAN) @@ -312,21 +316,22 @@ def build_image(self, push_image: bool = True): for log in logs: message = log.get('stream', '').rstrip('\n') if message: - _info('{}: {}'.format(docker_log_prefix, message)) + _info(f'{docker_log_prefix}: {message}') except docker.errors.BuildError as e: for log in e.build_log: message = log.get('message', '').rstrip('\n') if message: - _error('{}: {}'.format(docker_log_prefix, message)) - _error('{}: {}'.format(docker_log_prefix, e)) + _error(f'{docker_log_prefix}: {message}') + _error(f'{docker_log_prefix}: {e}') raise typer.Exit(1) if not push_image: return - _info('Pushing image {}...'.format( - typer.style(self._target_image, fg=typer.colors.YELLOW))) + _info( + f'Pushing image {typer.style(self._target_image, fg=typer.colors.YELLOW)}...' + ) try: response = client.images.push( @@ -335,13 +340,14 @@ def build_image(self, push_image: bool = True): status = log.get('status', '').rstrip('\n') layer = log.get('id', '') if status: - _info('{}: {} {}'.format(docker_log_prefix, layer, status)) + _info(f'{docker_log_prefix}: {layer} {status}') except docker.errors.BuildError as e: - _error('{}: {}'.format(docker_log_prefix, e)) + _error(f'{docker_log_prefix}: {e}') raise e - _info('Built and pushed component container {}'.format( - typer.style(self._target_image, fg=typer.colors.YELLOW))) + _info( + f'Built and pushed component container {typer.style(self._target_image, fg=typer.colors.YELLOW)}' + ) @app.callback() @@ -374,22 +380,19 @@ def build(components_directory: pathlib.Path = typer.Argument( """Builds containers for KFP v2 Python-based components.""" components_directory = components_directory.resolve() if not components_directory.is_dir(): - _error('{} does not seem to be a valid directory.'.format( - components_directory)) + _error(f'{components_directory} does not seem to be a valid directory.') raise typer.Exit(1) if engine != _Engine.DOCKER: _error('Currently, only `docker` is supported for --engine.') raise typer.Exit(1) - if engine == _Engine.DOCKER: - if not _DOCKER_IS_PRESENT: - _error( - 'The `docker` Python package was not found in the current' - ' environment. Please run `pip install docker` to install it.' - ' Optionally, you can also install KFP with all of its' - ' optional dependencies by running `pip install kfp[all]`.') - raise typer.Exit(1) + if not _DOCKER_IS_PRESENT: + _error('The `docker` Python package was not found in the current' + ' environment. Please run `pip install docker` to install it.' + ' Optionally, you can also install KFP with all of its' + ' optional dependencies by running `pip install kfp[all]`.') + raise typer.Exit(1) builder = _ComponentBuilder( context_directory=components_directory, diff --git a/sdk/python/kfp/cli/components_test.py b/sdk/python/kfp/cli/components_test.py index bc76a765e1a..f34159435b8 100644 --- a/sdk/python/kfp/cli/components_test.py +++ b/sdk/python/kfp/cli/components_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Tests for `components` command group in KFP CLI.""" import contextlib -import importlib import pathlib import sys import textwrap @@ -30,18 +29,6 @@ except ImportError: sys.modules['docker'] = mock.Mock() from kfp.cli import components -from kfp.deprecated.cli import components - -_COMPONENT_TEMPLATE = ''' -from kfp.dsl import * - -@component( - base_image={base_image}, - target_image={target_image}, - output_component_file={output_component_file}) -def {func_name}(): - pass -''' def _make_component(func_name: str, @@ -222,7 +209,7 @@ def testTargetImageMustBeTheSameInAllComponents(self): ) self.assertEqual(result.exit_code, 1) - def testTargetImageMustBeTheSameInAllComponents(self): + def testTargetImageMustBeTheSameInAllComponentsWithBaseImage(self): component_one = _make_component( func_name='one', base_image='image-1', target_image='target-image') component_two = _make_component( @@ -412,7 +399,7 @@ def testDockerfileIsCreatedCorrectly(self): COPY requirements.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txt - RUN pip install --no-cache-dir kfp==1.8.11 + RUN pip install --no-cache-dir kfp==1.2.3 COPY . . ''')) @@ -455,7 +442,7 @@ def testExistingDockerfileCanBeOverwritten(self): COPY requirements.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txt - RUN pip install --no-cache-dir kfp==1.8.11 + RUN pip install --no-cache-dir kfp==1.2.3 COPY . . ''')) diff --git a/sdk/python/kfp/cli/diagnose_me/dev_env.py b/sdk/python/kfp/cli/diagnose_me/dev_env.py index e32dc85a012..0a3d5999866 100644 --- a/sdk/python/kfp/cli/diagnose_me/dev_env.py +++ b/sdk/python/kfp/cli/diagnose_me/dev_env.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me/dev_env_test.py b/sdk/python/kfp/cli/diagnose_me/dev_env_test.py index f859e724c45..34ba85da387 100644 --- a/sdk/python/kfp/cli/diagnose_me/dev_env_test.py +++ b/sdk/python/kfp/cli/diagnose_me/dev_env_test.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me/gcp.py b/sdk/python/kfp/cli/diagnose_me/gcp.py index 5dc47b1baae..7adcde813da 100644 --- a/sdk/python/kfp/cli/diagnose_me/gcp.py +++ b/sdk/python/kfp/cli/diagnose_me/gcp.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); @@ -98,9 +97,7 @@ def execute_gsutil_command( Returns: utility.ExecutorResponse with outputs from stdout,stderr and execution code. """ - command_list = ['gsutil'] - command_list.extend(gsutil_command_list) - + command_list = ['gsutil', *gsutil_command_list] if project_id is not None: command_list.extend(['-p', project_id]) diff --git a/sdk/python/kfp/cli/diagnose_me/gcp_test.py b/sdk/python/kfp/cli/diagnose_me/gcp_test.py index de441559868..7edeceec874 100644 --- a/sdk/python/kfp/cli/diagnose_me/gcp_test.py +++ b/sdk/python/kfp/cli/diagnose_me/gcp_test.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py index 9d0f4e0cc73..0dcae1595e9 100644 --- a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py +++ b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); @@ -58,8 +57,7 @@ def execute_kubectl_command( Returns: utility.ExecutorResponse with outputs from stdout,stderr and execution code. """ - command_list = ['kubectl'] - command_list.extend(kubectl_command_list) + command_list = ['kubectl', *kubectl_command_list] if not human_readable: command_list.extend(['-o', 'json']) diff --git a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py index 74999738b75..6b2376476aa 100644 --- a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py +++ b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me/utility.py b/sdk/python/kfp/cli/diagnose_me/utility.py index f83984a091f..13b666cf32f 100644 --- a/sdk/python/kfp/cli/diagnose_me/utility.py +++ b/sdk/python/kfp/cli/diagnose_me/utility.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); @@ -55,7 +54,7 @@ def execute_command(self, command_list: List[Text]): self._stderr = stderr.decode('utf-8') self._returncode = process.returncode except OSError as e: - self._stderr = e + self._stderr = str(e) self._stdout = '' self._returncode = e.errno self._parse_raw_input() diff --git a/sdk/python/kfp/cli/diagnose_me/utility_test.py b/sdk/python/kfp/cli/diagnose_me/utility_test.py index 0c3569de993..4430dfcfade 100644 --- a/sdk/python/kfp/cli/diagnose_me/utility_test.py +++ b/sdk/python/kfp/cli/diagnose_me/utility_test.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me_cli.py b/sdk/python/kfp/cli/diagnose_me_cli.py index 975df4e0599..8a3564b0b33 100644 --- a/sdk/python/kfp/cli/diagnose_me_cli.py +++ b/sdk/python/kfp/cli/diagnose_me_cli.py @@ -1,23 +1,20 @@ -# Lint as: python3 """CLI interface for KFP diagnose_me tool.""" import json as json_library import sys -from typing import Dict, Text +from typing import Dict, List, Text, Union import click from kfp.cli.diagnose_me import dev_env, gcp +from kfp.cli.diagnose_me import kubernetes_cluster from kfp.cli.diagnose_me import kubernetes_cluster as k8 from kfp.cli.diagnose_me import utility +ResultsType = Dict[Union[gcp.Commands, dev_env.Commands, + kubernetes_cluster.Commands], utility.ExecutorResponse] -@click.group() -def diagnose_me(): - """Prints diagnoses information for KFP environment.""" - pass - -@diagnose_me.command() +@click.command() @click.option( '-j', '--json', @@ -37,7 +34,7 @@ def diagnose_me(): @click.pass_context def diagnose_me(ctx: click.Context, json: bool, project_id: str, namespace: str): - """Runs environment diagnostic with specified parameters. + """Runs KFP environment diagnostic with specified parameters. Feature stage: [Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha) @@ -50,17 +47,18 @@ def diagnose_me(ctx: click.Context, json: bool, project_id: str, for app in ['Google Cloud SDK', 'gsutil', 'kubectl']: if app not in local_env_gcloud_sdk.json_output: raise RuntimeError( - '%s is not installed, gcloud, gsutil and kubectl are required ' - % app + 'for this app to run. Please follow instructions at ' + + f'{app} is not installed, gcloud, gsutil and kubectl are required ' + + 'for this app to run. Please follow instructions at ' + 'https://cloud.google.com/sdk/install to install the SDK.') click.echo('Collecting diagnostic information ...', file=sys.stderr) # default behaviour dump all configurations - results = {} - for gcp_command in gcp.Commands: - results[gcp_command] = gcp.get_gcp_configuration( + results: ResultsType = { + gcp_command: gcp.get_gcp_configuration( gcp_command, project_id=project_id, human_readable=not json) + for gcp_command in gcp.Commands + } for k8_command in k8.Commands: results[k8_command] = k8.get_kubectl_configuration( @@ -73,8 +71,7 @@ def diagnose_me(ctx: click.Context, json: bool, project_id: str, print_to_sdtout(results, not json) -def print_to_sdtout(results: Dict[str, utility.ExecutorResponse], - human_readable: bool): +def print_to_sdtout(results: ResultsType, human_readable: bool): """Viewer to print the ExecutorResponse results to stdout. Args: @@ -85,18 +82,18 @@ def print_to_sdtout(results: Dict[str, utility.ExecutorResponse], """ output_dict = {} - human_readable_result = [] + human_readable_result: List[str] = [] for key, val in results.items(): if val.has_error: output_dict[ key. - name] = 'Following error occurred during the diagnoses: %s' % val.stderr + name] = f'Following error occurred during the diagnoses: {val.stderr}' continue output_dict[key.name] = val.json_output - human_readable_result.append('================ %s ===================' % - (key.name)) - human_readable_result.append(val.parsed_output) + human_readable_result.extend( + (f'================ {key.name} ===================', + val.parsed_output)) if human_readable: result = '\n'.join(human_readable_result) diff --git a/sdk/python/kfp/cli/experiment.py b/sdk/python/kfp/cli/experiment.py index b896c029ea9..c8e292653a9 100644 --- a/sdk/python/kfp/cli/experiment.py +++ b/sdk/python/kfp/cli/experiment.py @@ -91,7 +91,7 @@ def delete(ctx: click.Context, experiment_id: str): client = ctx.obj["client"] client.delete_experiment(experiment_id) - click.echo("{} is deleted.".format(experiment_id)) + click.echo(f"{experiment_id} is deleted.") def _display_experiments(experiments: List[ApiExperiment], diff --git a/sdk/python/kfp/cli/output.py b/sdk/python/kfp/cli/output.py index aa1d7bf45ad..e818c857f1c 100644 --- a/sdk/python/kfp/cli/output.py +++ b/sdk/python/kfp/cli/output.py @@ -29,7 +29,7 @@ class OutputFormat(Enum): def print_output(data: Union[list, dict], headers: list, - output_format: str, + output_format: OutputFormat, table_format: str = "simple"): """Prints the output from the cli command execution based on the specified format. @@ -51,13 +51,7 @@ def print_output(data: Union[list, dict], if output_format == OutputFormat.table.name: click.echo(tabulate(data, headers=headers, tablefmt=table_format)) elif output_format == OutputFormat.json.name: - if not headers: - output = data - else: - output = [] - for row in data: - output.append(dict(zip(headers, row))) + output = [dict(zip(headers, row)) for row in data] if headers else data click.echo(json.dumps(output, indent=4)) else: - raise NotImplementedError( - "Unknown Output Format: {}".format(output_format)) + raise NotImplementedError(f"Unknown Output Format: {output_format}") diff --git a/sdk/python/kfp/cli/pipeline.py b/sdk/python/kfp/cli/pipeline.py index ba0dad49e51..f2e752cc72e 100644 --- a/sdk/python/kfp/cli/pipeline.py +++ b/sdk/python/kfp/cli/pipeline.py @@ -13,7 +13,7 @@ # limitations under the License. import json -from typing import List, Optional +from typing import Any, Dict, List, Optional, Union import click import kfp_server_api @@ -68,8 +68,8 @@ def upload_version(ctx: click.Context, if pipeline_name is not None: pipeline_id = client.get_pipeline_id(name=pipeline_name) if pipeline_id is None: - raise ValueError("Can't find a pipeline with name: %s" % - pipeline_name) + raise ValueError( + f"Can't find a pipeline with name: {pipeline_name}") version = client.pipeline_uploads.upload_pipeline_version( package_file, name=pipeline_version, pipelineid=pipeline_id) _display_pipeline_version(version, output_format) @@ -239,11 +239,9 @@ def _display_pipeline(pipeline: kfp_server_api.ApiPipeline, print_output(table, [], output_format, table_format="plain") print_output(data, headers, output_format, table_format="grid") elif output_format == OutputFormat.json.name: - output = dict() - output["Pipeline Details"] = dict(table) - params = [] - for item in data: - params.append(dict(zip(headers, item))) + OutputType = Dict[str, Union[Dict[str, str], List[Dict[str, Any]]]] + output: OutputType = {"Pipeline Details": dict(table)} + params = [dict(zip(headers, item)) for item in data] output["Pipeline Parameters"] = params print_output(output, [], output_format) diff --git a/sdk/python/kfp/cli/recurring_run.py b/sdk/python/kfp/cli/recurring_run.py index b9f61b8d990..3b66c42aa9b 100644 --- a/sdk/python/kfp/cli/recurring_run.py +++ b/sdk/python/kfp/cli/recurring_run.py @@ -89,7 +89,6 @@ def create(ctx: click.Context, end_time: Optional[str] = None, interval_second: Optional[int] = None, max_concurrency: Optional[int] = None, - params: Optional[dict] = None, pipeline_package_path: Optional[str] = None, pipeline_id: Optional[str] = None, start_time: Optional[str] = None, diff --git a/sdk/python/kfp/cli/run.py b/sdk/python/kfp/cli/run.py index 5282206abc3..e686db7def4 100644 --- a/sdk/python/kfp/cli/run.py +++ b/sdk/python/kfp/cli/run.py @@ -21,8 +21,8 @@ import click import kfp_server_api +from kfp import client from kfp.cli.output import OutputFormat, print_output -from kfp.client import Client @click.group() @@ -159,7 +159,7 @@ def get(ctx: click.Context, watch: bool, detail: bool, run_id: str): _display_run(client, namespace, run_id, watch, output_format, detail) -def _display_run(client: click.Context, +def _display_run(client: client.Client, namespace: str, run_id: str, watch: bool, @@ -200,8 +200,7 @@ def _display_run(client: click.Context, argo_workflow_name = manifest['metadata']['name'] break if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']: - click.echo('Run is finished with status {}.'.format( - run_detail.run.status)) + click.echo(f'Run is finished with status {run_detail.run.status}.') return if argo_workflow_name: subprocess.run( @@ -209,7 +208,7 @@ def _display_run(client: click.Context, _print_runs([run], output_format) -def _wait_for_run_completion(client: Client, run_id: str, timeout: int, +def _wait_for_run_completion(client: client.Client, run_id: str, timeout: int, output_format: OutputFormat): run_detail = client.wait_for_run_completion(run_id, timeout) _print_runs([run_detail.run], output_format) diff --git a/sdk/python/requirements-dev.txt b/sdk/python/requirements-dev.txt index d97a64fa2f2..9552a937755 100644 --- a/sdk/python/requirements-dev.txt +++ b/sdk/python/requirements-dev.txt @@ -4,4 +4,5 @@ pylint==2.12.2 types-protobuf==3.19.15 types-PyYAML==6.0.5 types-requests==2.27.14 +types-tabulate==0.8.6 yapf==0.32.0 \ No newline at end of file diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 782b46b56f4..93b0071a359 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -91,6 +91,6 @@ def find_version(*file_path_parts: str) -> str: 'console_scripts': [ 'dsl-compile = kfp.compiler.main:main', 'dsl-compile-deprecated = kfp.deprecated.compiler.main:main', - 'kfp=kfp.cli.cli:main', + 'kfp=kfp.cli.__main__:main', ] })