From 31f0cc245f14cf08abffbe5990202b57bf487a5a Mon Sep 17 00:00:00 2001 From: Lingqing Gan Date: Thu, 31 Mar 2022 20:20:08 +0000 Subject: [PATCH 01/17] support IR YAML format in API --- backend/src/apiserver/server/util.go | 4 +- backend/src/apiserver/template/template.go | 3 +- .../src/apiserver/template/template_test.go | 68 ++++++++++++++++++- backend/src/apiserver/template/v2_template.go | 4 +- 4 files changed, 73 insertions(+), 6 deletions(-) diff --git a/backend/src/apiserver/server/util.go b/backend/src/apiserver/server/util.go index 48ff655959c..6b6b6fab7ee 100644 --- a/backend/src/apiserver/server/util.go +++ b/backend/src/apiserver/server/util.go @@ -13,7 +13,6 @@ import ( "net/url" "strings" - "github.com/golang/protobuf/jsonpb" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/golang/glog" @@ -23,6 +22,7 @@ import ( "github.com/kubeflow/pipelines/backend/src/common/util" "github.com/pkg/errors" authorizationv1 "k8s.io/api/authorization/v1" + "github.com/ghodss/yaml" ) // These are valid conditions of a ScheduledWorkflow. @@ -323,7 +323,7 @@ func validatePipelineManifest(pipelineManifest string) error { if pipelineManifest != "" { // Verify valid IR spec spec := &pipelinespec.PipelineSpec{} - if err := jsonpb.UnmarshalString(pipelineManifest, spec); err != nil { + if err := yaml.Unmarshal([]byte(pipelineManifest), spec); err != nil { return util.NewInvalidInputErrorWithDetails(err, "Invalid IR spec format.") } diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index 5bcd5258cc9..d76f92c6eea 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -107,7 +107,8 @@ func isArgoWorkflow(template []byte) bool { // isPipelineSpec returns whether template is in KFP api/v2alpha1/PipelineSpec format. func isPipelineSpec(template []byte) bool { var spec pipelinespec.PipelineSpec - err := protojson.Unmarshal(template, &spec) + templateJson, _ := yaml.YAMLToJSON(template) + err := protojson.Unmarshal(templateJson, &spec) return err == nil && spec.GetPipelineInfo().GetName() != "" && spec.GetRoot() != nil } diff --git a/backend/src/apiserver/template/template_test.go b/backend/src/apiserver/template/template_test.go index b7899922636..0c586a13ab2 100644 --- a/backend/src/apiserver/template/template_test.go +++ b/backend/src/apiserver/template/template_test.go @@ -72,7 +72,7 @@ apiVersion: argoproj.io/v1alpha2 kind: Workflow`, templateType: V1, }, { - template: v2SpecHelloWorld, + template: v2SpecHelloWorldJSON, templateType: V2, }, { template: "", @@ -95,7 +95,11 @@ kind: CronWorkflow`, }, { template: `{"abc": "def", "b": {"key": 3}}`, templateType: Unknown, + }, { + template: v2SpecHelloWorldYAML, + templateType: V2, }} + for _, test := range tt { format := inferTemplateFormat([]byte(test.template)) if format != test.templateType { @@ -145,7 +149,7 @@ spec: container: image: docker/whalesay:latest` -var v2SpecHelloWorld = ` +var v2SpecHelloWorldJSON = ` { "components": { "comp-hello-world": { @@ -217,6 +221,66 @@ var v2SpecHelloWorld = ` } ` +var v2SpecHelloWorldYAML = ` +# this is a comment +components: + comp-hello-world: + executorLabel: exec-hello-world + inputDefinitions: + parameters: + text: + type: STRING +deploymentSpec: + executors: + exec-hello-world: + container: + args: + - "--text" + - "{{$.inputs.parameters['text']}}" + command: + - sh + - "-ec" + - | + program_path=$(mktemp) + printf "%s" "$0" > "$program_path" + python3 -u "$program_path" "$@" + - | + def hello_world(text): + print(text) + return text + + import argparse + _parser = argparse.ArgumentParser(prog='Hello world', description='') + _parser.add_argument("--text", dest="text", type=str, required=True, default=argparse.SUPPRESS) + _parsed_args = vars(_parser.parse_args()) + + _outputs = hello_world(**_parsed_args) + image: python:3.7 +pipelineInfo: + name: namespace/n1/pipeline/hello-world +root: + dag: + tasks: + hello-world: + cachingOptions: + enableCache: true + componentRef: + name: comp-hello-world + inputs: + parameters: + text: + componentInputParameter: text + taskInfo: + name: hello-world + inputDefinitions: + parameters: + text: + type: STRING +schemaVersion: 2.0.0 +sdkVersion: kfp-1.6.5 +` + + func TestToSwfCRDResourceGeneratedName_SpecialCharsAndSpace(t *testing.T) { name, err := toSWFCRDResourceGeneratedName("! HaVe ä £unky name") assert.Nil(t, err) diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index f24300bb859..b1aa692a160 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -13,6 +13,7 @@ import ( "github.com/kubeflow/pipelines/backend/src/v2/compiler/argocompiler" "google.golang.org/protobuf/encoding/protojson" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/ghodss/yaml" ) type V2Spec struct { @@ -68,7 +69,8 @@ func (t *V2Spec) GetTemplateType() TemplateType { func NewV2SpecTemplate(template []byte) (*V2Spec, error) { var spec pipelinespec.PipelineSpec - err := protojson.Unmarshal(template, &spec) + templateJson, _ := yaml.YAMLToJSON(template) + err := protojson.Unmarshal(templateJson, &spec) if err != nil { return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("invalid v2 pipeline spec: %s", err.Error())) } From bf63df8f40f3ee6fc8add966a393cdde4cbf2838 Mon Sep 17 00:00:00 2001 From: Linchin Date: Thu, 31 Mar 2022 22:34:45 +0000 Subject: [PATCH 02/17] Check the error message and return false if it is not nil --- backend/src/apiserver/template/template.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/backend/src/apiserver/template/template.go b/backend/src/apiserver/template/template.go index d76f92c6eea..881c10cd6fe 100644 --- a/backend/src/apiserver/template/template.go +++ b/backend/src/apiserver/template/template.go @@ -107,8 +107,11 @@ func isArgoWorkflow(template []byte) bool { // isPipelineSpec returns whether template is in KFP api/v2alpha1/PipelineSpec format. func isPipelineSpec(template []byte) bool { var spec pipelinespec.PipelineSpec - templateJson, _ := yaml.YAMLToJSON(template) - err := protojson.Unmarshal(templateJson, &spec) + templateJson, err := yaml.YAMLToJSON(template) + if err != nil { + return false + } + err = protojson.Unmarshal(templateJson, &spec) return err == nil && spec.GetPipelineInfo().GetName() != "" && spec.GetRoot() != nil } From 4d97abfe4f84c85061968d452acad44c2f7cfefd Mon Sep 17 00:00:00 2001 From: Linchin Date: Fri, 1 Apr 2022 00:10:14 +0000 Subject: [PATCH 03/17] update error message --- backend/src/apiserver/template/v2_template.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index b1aa692a160..538f0844da6 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -6,6 +6,7 @@ import ( structpb "github.com/golang/protobuf/ptypes/struct" + "github.com/ghodss/yaml" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" api "github.com/kubeflow/pipelines/backend/api/go_client" "github.com/kubeflow/pipelines/backend/src/common/util" @@ -13,7 +14,6 @@ import ( "github.com/kubeflow/pipelines/backend/src/v2/compiler/argocompiler" "google.golang.org/protobuf/encoding/protojson" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/ghodss/yaml" ) type V2Spec struct { @@ -69,8 +69,11 @@ func (t *V2Spec) GetTemplateType() TemplateType { func NewV2SpecTemplate(template []byte) (*V2Spec, error) { var spec pipelinespec.PipelineSpec - templateJson, _ := yaml.YAMLToJSON(template) - err := protojson.Unmarshal(templateJson, &spec) + templateJson, err := yaml.YAMLToJSON(template) + if err != nil { + return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("cannot convert v2 pipeline spec to json format: %s", err.Error())) + } + err = protojson.Unmarshal(templateJson, &spec) if err != nil { return nil, util.NewInvalidInputErrorWithDetails(ErrorInvalidPipelineSpec, fmt.Sprintf("invalid v2 pipeline spec: %s", err.Error())) } From 927d0257d6fe4b1793ed124b6b029b739aefc9fa Mon Sep 17 00:00:00 2001 From: Linchin Date: Mon, 18 Apr 2022 21:03:30 +0000 Subject: [PATCH 04/17] fixed simple loop but need cleaning up --- backend/src/v2/compiler/argocompiler/argo.go | 2 +- backend/src/v2/driver/driver.go | 110 ++++++++++++++++++- 2 files changed, 105 insertions(+), 7 deletions(-) diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index c4fb4868e6b..a00e24c6f4a 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -107,7 +107,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow, wf: wf, templates: make(map[string]*wfapi.Template), // TODO(chensun): release process and update the images. - driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest", + driverImage: "gcr.io/ling-kfp/dev/kfp-driver:latest", launcherImage: "gcr.io/ml-pipeline-test/dev/kfp-launcher-v2:latest", job: job, spec: spec, diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 87eb211892f..f4c2724b059 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -407,13 +407,32 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E if err != nil { return nil, err } + glog.Infof("ctx value: %+v\n"+ + "dag value: %+v\n"+ + "interationIndex value: %+v\n"+ + "pipeline value: %+v\n"+ + "opts.Task value: %+v\n"+ + "opts.Component.GetInputDefinitions() value: %#+v\n"+ + "mlmd value: %+v\n"+ + "expr value: %+v\n", + ctx, + dag, + iterationIndex, + pipeline, + opts.Task, + opts.Component.GetInputDefinitions(), + mlmd, + expr, + ) inputs, err := resolveInputs(ctx, dag, iterationIndex, pipeline, opts.Task, opts.Component.GetInputDefinitions(), mlmd, expr) if err != nil { return nil, err } + // unwrap static inputs executorInput := &pipelinespec.ExecutorInput{ Inputs: inputs, } + glog.Infof("executorInput value: %+v", executorInput) execution = &Execution{ExecutorInput: executorInput} condition := opts.Task.GetTriggerPolicy().GetCondition() if condition != "" { @@ -436,15 +455,76 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E return execution, fmt.Errorf("ArtifactIterator is not implemented") } isIterator := opts.Task.GetParameterIterator() != nil && opts.IterationIndex < 0 + // Fan out iterations if execution.WillTrigger() && isIterator { iterator := opts.Task.GetParameterIterator() - value, ok := executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] report := func(err error) error { return fmt.Errorf("iterating on item input %q failed: %w", iterator.GetItemInput(), err) } - if !ok { - return execution, report(fmt.Errorf("cannot find input parameter")) + glog.Infof("iterator kind: %+v", iterator.GetItems().GetKind()) + glog.Infof("This log outputs variables used in the iterator.\n"+ + "Val of iterator: %+v\n"+ + "Val of executorInput: %+v\n", + iterator, + executorInput, + ) + glog.Infof("executorInput.GetInputs() value: %+v", executorInput.GetInputs()) + glog.Infof("iterator.GetItems() value: %+v", iterator.GetItems()) + // Check the items type of parameterIterator: + // It can be "inputParameter" or "Raw" + value := structpb.NewNullValue() + var ok bool + switch iterator.GetItems().GetKind().(type) { + case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: + glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) + value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] + // value, ok := executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetRaw()] + glog.Infof("iterating on item input %q failed: %w, \n"+ + " (--lingqing print--) map type: %T, map value: %+v, \n"+ + " (--lingqing print--) key type: %T, key val: %q, \n"+ + " (--lingqing print--) inputs type: %T, inputs val: %+v, \n"+ + " (--lingqing print--) pipelineType: %T, pipelineVal: %+v, \n"+ + " (--lingqing print--) pipelineCtx: %+v, \n"+ + " (--lingqing print--) pipelineRunCtx: %+v, ", + iterator.GetItemInput(), + err, + executorInput.GetInputs().GetParameterValues(), + executorInput.GetInputs().GetParameterValues(), + iterator.GetItems().GetInputParameter(), + iterator.GetItems().GetInputParameter(), // this is empty too + inputs, + *inputs, // parameter field is empty + pipeline, + *pipeline, + pipeline.GetRunCtxID(), // run context id + pipeline.GetCtxID(), // context id + ) + if !ok { + return execution, report(fmt.Errorf("cannot find input parameter")) + } + glog.Infof("inputParameter value: %+v", value) + case *pipelinespec.ParameterIteratorSpec_ItemsSpec_Raw: + glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) + value_raw := iterator.GetItems().GetRaw() + glog.Info("raw_string_value: ", value_raw) + var unmarshalled_raw interface{} + err = json.Unmarshal([]byte(value_raw), &unmarshalled_raw) + if err != nil { + return execution, fmt.Errorf("error unmarshall raw string: %q", err) + } + glog.Infof("unmarshalled_raw value: %+v", unmarshalled_raw) + value, err = structpb.NewValue(unmarshalled_raw) + if err != nil { + return execution, fmt.Errorf("error converting unmarshalled raw string into protobuf Value type: %q", err) + } + // Add the raw input to the executor input + execution.ExecutorInput.Inputs.ParameterValues[iterator.GetItemInput()] = value + // Add the raw input to the task + // opts.Task. + default: + return execution, fmt.Errorf("cannot find parameter iterator") } + glog.Infof("value value: %+v", value) items, err := getItems(value) if err != nil { return execution, report(err) @@ -460,6 +540,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } glog.Infof("Created execution: %s", createdExecution) execution.ID = createdExecution.GetID() + glog.Infof("created execution: %+v\n", execution) return execution, nil } @@ -482,6 +563,8 @@ func getItems(value *structpb.Value) (items []*structpb.Value, err error) { } } +// Get iteratation items from a raw string + func reuseCachedOutputs(ctx context.Context, executorInput *pipelinespec.ExecutorInput, outputDefinitions *pipelinespec.ComponentOutputsSpec, mlmd *metadata.Client, cachedMLMDExecutionID string) (*pipelinespec.ExecutorOutput, []*metadata.OutputArtifact, error) { cachedMLMDExecutionIDInt64, err := strconv.ParseInt(cachedMLMDExecutionID, 10, 64) if err != nil { @@ -594,6 +677,7 @@ func validateNonRoot(opts Options) error { } func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, pipeline *metadata.Pipeline, task *pipelinespec.PipelineTaskSpec, inputsSpec *pipelinespec.ComponentInputsSpec, mlmd *metadata.Client, expr *expression.Expr) (inputs *pipelinespec.ExecutorInput_Inputs, err error) { + glog.Infof("task: %+v", task) // task seems fine defer func() { if err != nil { err = fmt.Errorf("failed to resolve inputs: %w", err) @@ -603,12 +687,12 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if err != nil { return nil, err } - glog.Infof("parent DAG input parameters %+v", inputParams) + glog.Infof("parent DAG input parameters %+v", inputParams) // not related to us inputs = &pipelinespec.ExecutorInput_Inputs{ ParameterValues: make(map[string]*structpb.Value), Artifacts: make(map[string]*pipelinespec.ArtifactList), } - isIterationDriver := iterationIndex != nil + isIterationDriver := iterationIndex != nil // false handleParameterExpressionSelector := func() error { for name, paramSpec := range task.GetInputs().GetParameters() { @@ -724,7 +808,17 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, case task.GetArtifactIterator() != nil: return nil, fmt.Errorf("artifact iterator not implemented yet") case task.GetParameterIterator() != nil: - itemsInput := task.GetParameterIterator().GetItems().GetInputParameter() + var itemsInput string + if task.GetParameterIterator().GetItems().GetInputParameter() != "" { + itemsInput = task.GetParameterIterator().GetItems().GetInputParameter() + } else if task.GetParameterIterator().GetItemInput() != "" { + itemsInput = task.GetParameterIterator().GetItemInput() + } else { + return nil, fmt.Errorf("cannot retrieve parameter iterator.") + } + glog.Infof("task value: %+v", task) + glog.Infof("itemsInput value: %+v", itemsInput) + glog.Infof("inputs value: %+v", inputs) items, err := getItems(inputs.ParameterValues[itemsInput]) if err != nil { return nil, err @@ -737,6 +831,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, default: return nil, fmt.Errorf("bug: iteration_index>=0, but task iterator is empty") } + glog.Infof("inputs value %+v", inputs) return inputs, nil } // get executions in context on demand @@ -752,7 +847,9 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, tasksCache = tasks return tasks, nil } + glog.Infof("task.GetInputs(): %+v", task.GetInputs()) // this is empty for name, paramSpec := range task.GetInputs().GetParameters() { + glog.Infof("name: %q, paramSpec: %+v", name, paramSpec) paramError := func(err error) error { return fmt.Errorf("resolving input parameter %s with spec %s: %w", name, paramSpec, err) } @@ -853,6 +950,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, } } // TODO(Bobgy): validate executor inputs match component inputs definition + glog.Infof("inputs value: %+v", inputs) return inputs, nil } From 52760475e79f4c07b0f4ef91f13f731815998e13 Mon Sep 17 00:00:00 2001 From: Linchin Date: Mon, 18 Apr 2022 22:30:14 +0000 Subject: [PATCH 05/17] Deleted debug logs --- backend/src/v2/driver/driver.go | 66 ++------------------------------- 1 file changed, 4 insertions(+), 62 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index f4c2724b059..dff9ffefa84 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -407,23 +407,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E if err != nil { return nil, err } - glog.Infof("ctx value: %+v\n"+ - "dag value: %+v\n"+ - "interationIndex value: %+v\n"+ - "pipeline value: %+v\n"+ - "opts.Task value: %+v\n"+ - "opts.Component.GetInputDefinitions() value: %#+v\n"+ - "mlmd value: %+v\n"+ - "expr value: %+v\n", - ctx, - dag, - iterationIndex, - pipeline, - opts.Task, - opts.Component.GetInputDefinitions(), - mlmd, - expr, - ) inputs, err := resolveInputs(ctx, dag, iterationIndex, pipeline, opts.Task, opts.Component.GetInputDefinitions(), mlmd, expr) if err != nil { return nil, err @@ -461,15 +444,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E report := func(err error) error { return fmt.Errorf("iterating on item input %q failed: %w", iterator.GetItemInput(), err) } - glog.Infof("iterator kind: %+v", iterator.GetItems().GetKind()) - glog.Infof("This log outputs variables used in the iterator.\n"+ - "Val of iterator: %+v\n"+ - "Val of executorInput: %+v\n", - iterator, - executorInput, - ) - glog.Infof("executorInput.GetInputs() value: %+v", executorInput.GetInputs()) - glog.Infof("iterator.GetItems() value: %+v", iterator.GetItems()) // Check the items type of parameterIterator: // It can be "inputParameter" or "Raw" value := structpb.NewNullValue() @@ -478,27 +452,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] - // value, ok := executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetRaw()] - glog.Infof("iterating on item input %q failed: %w, \n"+ - " (--lingqing print--) map type: %T, map value: %+v, \n"+ - " (--lingqing print--) key type: %T, key val: %q, \n"+ - " (--lingqing print--) inputs type: %T, inputs val: %+v, \n"+ - " (--lingqing print--) pipelineType: %T, pipelineVal: %+v, \n"+ - " (--lingqing print--) pipelineCtx: %+v, \n"+ - " (--lingqing print--) pipelineRunCtx: %+v, ", - iterator.GetItemInput(), - err, - executorInput.GetInputs().GetParameterValues(), - executorInput.GetInputs().GetParameterValues(), - iterator.GetItems().GetInputParameter(), - iterator.GetItems().GetInputParameter(), // this is empty too - inputs, - *inputs, // parameter field is empty - pipeline, - *pipeline, - pipeline.GetRunCtxID(), // run context id - pipeline.GetCtxID(), // context id - ) if !ok { return execution, report(fmt.Errorf("cannot find input parameter")) } @@ -519,12 +472,9 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } // Add the raw input to the executor input execution.ExecutorInput.Inputs.ParameterValues[iterator.GetItemInput()] = value - // Add the raw input to the task - // opts.Task. default: return execution, fmt.Errorf("cannot find parameter iterator") } - glog.Infof("value value: %+v", value) items, err := getItems(value) if err != nil { return execution, report(err) @@ -540,7 +490,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } glog.Infof("Created execution: %s", createdExecution) execution.ID = createdExecution.GetID() - glog.Infof("created execution: %+v\n", execution) return execution, nil } @@ -563,8 +512,6 @@ func getItems(value *structpb.Value) (items []*structpb.Value, err error) { } } -// Get iteratation items from a raw string - func reuseCachedOutputs(ctx context.Context, executorInput *pipelinespec.ExecutorInput, outputDefinitions *pipelinespec.ComponentOutputsSpec, mlmd *metadata.Client, cachedMLMDExecutionID string) (*pipelinespec.ExecutorOutput, []*metadata.OutputArtifact, error) { cachedMLMDExecutionIDInt64, err := strconv.ParseInt(cachedMLMDExecutionID, 10, 64) if err != nil { @@ -677,7 +624,6 @@ func validateNonRoot(opts Options) error { } func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, pipeline *metadata.Pipeline, task *pipelinespec.PipelineTaskSpec, inputsSpec *pipelinespec.ComponentInputsSpec, mlmd *metadata.Client, expr *expression.Expr) (inputs *pipelinespec.ExecutorInput_Inputs, err error) { - glog.Infof("task: %+v", task) // task seems fine defer func() { if err != nil { err = fmt.Errorf("failed to resolve inputs: %w", err) @@ -687,12 +633,12 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, if err != nil { return nil, err } - glog.Infof("parent DAG input parameters %+v", inputParams) // not related to us + glog.Infof("parent DAG input parameters %+v", inputParams) inputs = &pipelinespec.ExecutorInput_Inputs{ ParameterValues: make(map[string]*structpb.Value), Artifacts: make(map[string]*pipelinespec.ArtifactList), } - isIterationDriver := iterationIndex != nil // false + isIterationDriver := iterationIndex != nil handleParameterExpressionSelector := func() error { for name, paramSpec := range task.GetInputs().GetParameters() { @@ -810,15 +756,14 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, case task.GetParameterIterator() != nil: var itemsInput string if task.GetParameterIterator().GetItems().GetInputParameter() != "" { + // input comes from outside the component itemsInput = task.GetParameterIterator().GetItems().GetInputParameter() } else if task.GetParameterIterator().GetItemInput() != "" { + // input comes from static input itemsInput = task.GetParameterIterator().GetItemInput() } else { return nil, fmt.Errorf("cannot retrieve parameter iterator.") } - glog.Infof("task value: %+v", task) - glog.Infof("itemsInput value: %+v", itemsInput) - glog.Infof("inputs value: %+v", inputs) items, err := getItems(inputs.ParameterValues[itemsInput]) if err != nil { return nil, err @@ -847,9 +792,7 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, tasksCache = tasks return tasks, nil } - glog.Infof("task.GetInputs(): %+v", task.GetInputs()) // this is empty for name, paramSpec := range task.GetInputs().GetParameters() { - glog.Infof("name: %q, paramSpec: %+v", name, paramSpec) paramError := func(err error) error { return fmt.Errorf("resolving input parameter %s with spec %s: %w", name, paramSpec, err) } @@ -950,7 +893,6 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, } } // TODO(Bobgy): validate executor inputs match component inputs definition - glog.Infof("inputs value: %+v", inputs) return inputs, nil } From 483248f00a7bb92254f560718eb9505c553c48a5 Mon Sep 17 00:00:00 2001 From: Linchin Date: Tue, 19 Apr 2022 23:00:01 +0000 Subject: [PATCH 06/17] remove logs and fix some format --- backend/src/v2/driver/driver.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index dff9ffefa84..34d622073eb 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -411,7 +411,6 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E if err != nil { return nil, err } - // unwrap static inputs executorInput := &pipelinespec.ExecutorInput{ Inputs: inputs, } @@ -455,17 +454,14 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E if !ok { return execution, report(fmt.Errorf("cannot find input parameter")) } - glog.Infof("inputParameter value: %+v", value) case *pipelinespec.ParameterIteratorSpec_ItemsSpec_Raw: glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) value_raw := iterator.GetItems().GetRaw() - glog.Info("raw_string_value: ", value_raw) var unmarshalled_raw interface{} err = json.Unmarshal([]byte(value_raw), &unmarshalled_raw) if err != nil { return execution, fmt.Errorf("error unmarshall raw string: %q", err) } - glog.Infof("unmarshalled_raw value: %+v", unmarshalled_raw) value, err = structpb.NewValue(unmarshalled_raw) if err != nil { return execution, fmt.Errorf("error converting unmarshalled raw string into protobuf Value type: %q", err) From 94a4459a42cb5053735be0a9eecd7ca75f069b1d Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Wed, 20 Apr 2022 07:36:39 -0700 Subject: [PATCH 07/17] Add feature transform engine component definition. PiperOrigin-RevId: 443091002 --- .../experimental/automl/tabular/__init__.py | 4 +- .../tabular/feature_transform_engine.yaml | 123 ++++++++++++++++++ 2 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/feature_transform_engine.yaml diff --git a/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/__init__.py b/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/__init__.py index d2d960c7625..a4b8b796d4a 100644 --- a/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/__init__.py +++ b/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/__init__.py @@ -24,7 +24,7 @@ 'CvTrainerOp', 'InfraValidatorOp', 'Stage1TunerOp', 'EnsembleOp', 'StatsAndExampleGenOp', 'FeatureSelectionOp', 'TransformOp', 'FinalizerOp', 'WideAndDeepTrainerOp', 'BuiltinAlgorithmHyperparameterTuningJobOp', - 'TabNetTrainerOp' + 'TabNetTrainerOp', 'FeatureTransformEngineOp' ] CvTrainerOp = load_component_from_file( @@ -41,6 +41,8 @@ os.path.join(os.path.dirname(__file__), 'feature_selection.yaml')) TransformOp = load_component_from_file( os.path.join(os.path.dirname(__file__), 'transform.yaml')) +FeatureTransformEngineOp = load_component_from_file( + os.path.join(os.path.dirname(__file__), 'feature_transform_engine.yaml')) FinalizerOp = load_component_from_file( os.path.join(os.path.dirname(__file__), 'finalizer.yaml')) WideAndDeepTrainerOp = load_component_from_file( diff --git a/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/feature_transform_engine.yaml b/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/feature_transform_engine.yaml new file mode 100644 index 00000000000..efdd2a9216e --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/experimental/automl/tabular/feature_transform_engine.yaml @@ -0,0 +1,123 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: feature_transform_engine +description: | + Feature transform engine to transform raw data to engineered features. + + Args: + project (str): + Required. Project to run feature transform engine. + location (Optional[str]): + Location for running the feature transform engine. If not set, + default to us-central1. + root_dir (str): The Cloud Storage location to store the output. + analyze_data (Dataset): Configuration of the dataset to be analyzed. + transform_data (Dataset): Configuration of the dataset to be transformed. + transform_config (str): Feature transformation configurations. + dataflow_machine_type (Optional[str]): + The machine type used for dataflow jobs. If not set, default to n1-standard-16. + dataflow_max_num_workers (Optional[int]): + The number of workers to run the dataflow job. If not set, default to 25. + dataflow_disk_size_gb (Optional[int]): + The disk size, in gigabytes, to use on each Dataflow worker instance. If not set, + default to 40. + dataflow_subnetwork (Optional[str]): + Dataflow's fully qualified subnetwork name, when empty the default subnetwork will be + used. More details: + https://cloud.google.com/dataflow/docs/guides/specifying-networks#example_network_and_subnetwork_specifications + dataflow_use_public_ips (Optional[bool]): + Specifies whether Dataflow workers use public IP addresses. + encryption_spec_key_name (Optional[str]): + Customer-managed encryption key. + + Returns: + materialized_data (Dataset): The materialized dataset. + transform_output (TransformOutput): The transform output artifact. + gcp_resources (str): + GCP resources created by this component. + For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md. +inputs: +- {name: project, type: String} +- {name: location, type: String, default: "us-central1"} +- {name: root_dir, type: String} +- {name: analyze_data, type: Dataset} +- {name: transform_data, type: Dataset} +- {name: transform_config, type: String} +- {name: dataflow_machine_type, type: String, default: "n1-standard-16"} +- {name: dataflow_max_num_workers, type: Integer, default: "25"} +- {name: dataflow_disk_size_gb, type: Integer, default: "40"} +- {name: dataflow_subnetwork, type: String, default: ""} +- {name: dataflow_use_public_ips, type: Boolean, default: "true"} +- {name: encryption_spec_key_name, type: String, default: ""} + +outputs: +- {name: materialized_data, type: Dataset} +- {name: transform_output, type: TransformOutput} +- {name: gcp_resources, type: String} + +implementation: + container: + image: gcr.io/ml-pipeline/google-cloud-pipeline-components:latest + command: [python3, -u, -m, google_cloud_pipeline_components.container.v1.gcp_launcher.launcher] + args: [ + --type, CustomJob, + --project, {inputValue: project}, + --location, {inputValue: location}, + --gcp_resources, {outputPath: gcp_resources}, + --payload, + concat: [ + '{"display_name": "feature-transform-engine-{{$.pipeline_job_uuid}}-{{$.pipeline_task_uuid}}", "encryption_spec": {"kms_key_name":"', + {inputValue: encryption_spec_key_name}, + '"}, "job_spec": {"worker_pool_specs": [{"replica_count": 1, "machine_spec": {"machine_type": "n1-standard-8"}, "container_spec": {"image_uri":"', + 'us-docker.pkg.dev/vertex-ai-restricted/automl-tabular/training:prod', + '", "args": ["feature_transform_engine", "--transform_output_artifact_path=', + {outputUri: transform_output}, + '", "--transform_output_path=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/transform", "--materialized_data_path=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/materialized_data","--materialized_data_artifact_path=', + {outputUri: materialized_data}, + '", "--transform_config=', + {inputValue: transform_config}, + '", "--analyze_data_path=', + {inputUri: analyze_data}, + '", "--transform_data_path=', + {inputUri: transform_data}, + '", "--job_name=feature-transform-engine-{{$.pipeline_job_uuid}}-{{$.pipeline_task_uuid}}', + '", "--dataflow_project=', + {inputValue: project}, + '", "--error_file_path=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/error.pb", "--dataflow_staging_dir=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/dataflow_staging", "--dataflow_tmp_dir=', + {inputValue: root_dir}, + '/{{$.pipeline_job_uuid}}/{{$.pipeline_task_uuid}}/dataflow_tmp", "--dataflow_max_num_workers=', + {inputValue: dataflow_max_num_workers}, + '", "--dataflow_machine_type=', + {inputValue: dataflow_machine_type}, + '", "--dataflow_worker_container_image=', + 'us-docker.pkg.dev/vertex-ai/automl-tabular/dataflow-worker:prod', + '", "--dataflow_disk_size_gb=', + {inputValue: dataflow_disk_size_gb}, + '", "--dataflow_subnetwork_fully_qualified=', + {inputValue: dataflow_subnetwork}, + '", "--dataflow_use_public_ips=', + {inputValue: dataflow_use_public_ips}, + '", "--dataflow_kms_key=', + {inputValue: encryption_spec_key_name}, + '"]}}]}}' + ]] From 265e9f55a780ba80bc74e3d085cc665530dd45e7 Mon Sep 17 00:00:00 2001 From: Yang Pan Date: Wed, 20 Apr 2022 08:54:53 -0700 Subject: [PATCH 08/17] chore(components): add email notification doc PiperOrigin-RevId: 443107567 --- .../source/google_cloud_pipeline_components.experimental.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/components/google-cloud/docs/source/google_cloud_pipeline_components.experimental.rst b/components/google-cloud/docs/source/google_cloud_pipeline_components.experimental.rst index 5261df82bca..b1871f88740 100644 --- a/components/google-cloud/docs/source/google_cloud_pipeline_components.experimental.rst +++ b/components/google-cloud/docs/source/google_cloud_pipeline_components.experimental.rst @@ -16,4 +16,5 @@ Components google_cloud_pipeline_components.experimental.forecasting google_cloud_pipeline_components.experimental.hyperparameter_tuning_job google_cloud_pipeline_components.experimental.tensorflow_probability + google_cloud_pipeline_components.experimental.vertex_notification_email google_cloud_pipeline_components.experimental.wait_gcp_resources From b58a31b30787ab51e81f5d745f9fb3090578cb6d Mon Sep 17 00:00:00 2001 From: Connor McCarthy Date: Wed, 20 Apr 2022 10:40:38 -0600 Subject: [PATCH 09/17] chore(sdk): clean up v2 CLI (#7558) * remove linting comments * move global variable to local * simplify diagnose_me command * use f-strings * idiomatic python refactorings * remove unused import * add typestub library * silence unresolvable mypy error * update type information * remove unused variable * remove unused import * use __main__.py for cli entrypoint * expand imports --- mypy.ini | 4 + sdk/python/kfp/cli/__main__.py | 41 +++++ sdk/python/kfp/cli/cli.py | 30 +--- sdk/python/kfp/cli/components.py | 143 +++++++++--------- sdk/python/kfp/cli/components_test.py | 19 +-- sdk/python/kfp/cli/diagnose_me/dev_env.py | 1 - .../kfp/cli/diagnose_me/dev_env_test.py | 1 - sdk/python/kfp/cli/diagnose_me/gcp.py | 5 +- sdk/python/kfp/cli/diagnose_me/gcp_test.py | 1 - .../kfp/cli/diagnose_me/kubernetes_cluster.py | 4 +- .../diagnose_me/kubernetes_cluster_test.py | 1 - sdk/python/kfp/cli/diagnose_me/utility.py | 3 +- .../kfp/cli/diagnose_me/utility_test.py | 1 - sdk/python/kfp/cli/diagnose_me_cli.py | 39 +++-- sdk/python/kfp/cli/experiment.py | 2 +- sdk/python/kfp/cli/output.py | 12 +- sdk/python/kfp/cli/pipeline.py | 14 +- sdk/python/kfp/cli/recurring_run.py | 1 - sdk/python/kfp/cli/run.py | 9 +- sdk/python/requirements-dev.txt | 1 + sdk/python/setup.py | 2 +- 21 files changed, 160 insertions(+), 174 deletions(-) create mode 100644 mypy.ini create mode 100644 sdk/python/kfp/cli/__main__.py diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000000..19bad04b634 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,4 @@ +# Global options: + +[mypy] +ignore_missing_imports = true diff --git a/sdk/python/kfp/cli/__main__.py b/sdk/python/kfp/cli/__main__.py new file mode 100644 index 00000000000..3f77eac5205 --- /dev/null +++ b/sdk/python/kfp/cli/__main__.py @@ -0,0 +1,41 @@ +# Copyright 2018-2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import sys + +import click +import typer +from kfp.cli import cli +from kfp.cli import components +from kfp.cli import diagnose_me_cli +from kfp.cli import experiment +from kfp.cli import pipeline +from kfp.cli import recurring_run +from kfp.cli import run + + +def main(): + logging.basicConfig(format='%(message)s', level=logging.INFO) + cli.cli.add_command(run.run) + cli.cli.add_command(recurring_run.recurring_run) + cli.cli.add_command(pipeline.pipeline) + cli.cli.add_command(diagnose_me_cli.diagnose_me) + cli.cli.add_command(experiment.experiment) + cli.cli.add_command(typer.main.get_command(components.app)) + try: + cli.cli(obj={}, auto_envvar_prefix='KFP') + except Exception as e: + click.echo(str(e), err=True) + sys.exit(1) \ No newline at end of file diff --git a/sdk/python/kfp/cli/cli.py b/sdk/python/kfp/cli/cli.py index 87f30fb1254..dd42ceba536 100644 --- a/sdk/python/kfp/cli/cli.py +++ b/sdk/python/kfp/cli/cli.py @@ -12,22 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import sys - import click -import typer -from kfp.cli import components -from kfp.cli.diagnose_me_cli import diagnose_me -from kfp.cli.experiment import experiment from kfp.cli.output import OutputFormat -from kfp.cli.pipeline import pipeline -from kfp.cli.recurring_run import recurring_run -from kfp.cli.run import run from kfp.client import Client -_NO_CLIENT_COMMANDS = ['diagnose_me', 'components'] - @click.group() @click.option('--endpoint', help='Endpoint of the KFP API service to connect.') @@ -58,25 +46,11 @@ def cli(ctx: click.Context, endpoint: str, iap_client_id: str, namespace: str, Feature stage: [Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha) """ - if ctx.invoked_subcommand in _NO_CLIENT_COMMANDS: + NO_CLIENT_COMMANDS = ['diagnose_me', 'components'] + if ctx.invoked_subcommand in NO_CLIENT_COMMANDS: # Do not create a client for these subcommands return ctx.obj['client'] = Client(endpoint, iap_client_id, namespace, other_client_id, other_client_secret) ctx.obj['namespace'] = namespace ctx.obj['output'] = output - - -def main(): - logging.basicConfig(format='%(message)s', level=logging.INFO) - cli.add_command(run) - cli.add_command(recurring_run) - cli.add_command(pipeline) - cli.add_command(diagnose_me, 'diagnose_me') - cli.add_command(experiment) - cli.add_command(typer.main.get_command(components.app)) - try: - cli(obj={}, auto_envvar_prefix='KFP') - except Exception as e: - click.echo(str(e), err=True) - sys.exit(1) diff --git a/sdk/python/kfp/cli/components.py b/sdk/python/kfp/cli/components.py index 81f25df6ae1..8e9be563c9e 100644 --- a/sdk/python/kfp/cli/components.py +++ b/sdk/python/kfp/cli/components.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import configparser import contextlib import enum import pathlib @@ -50,9 +49,9 @@ # Location in which to write out shareable YAML for components. _COMPONENT_METADATA_DIR = 'component_metadata' -_DOCKERIGNORE_TEMPLATE = ''' -{}/ -'''.format(_COMPONENT_METADATA_DIR) +_DOCKERIGNORE_TEMPLATE = f''' +{_COMPONENT_METADATA_DIR}/ +''' # Location at which v2 Python function-based components will stored # in containerized components. @@ -81,17 +80,17 @@ class _Engine(str, enum.Enum): def _info(message: Any): info = typer.style('INFO', fg=typer.colors.GREEN) - typer.echo('{}: {}'.format(info, message)) + typer.echo(f'{info}: {message}') def _warning(message: Any): info = typer.style('WARNING', fg=typer.colors.YELLOW) - typer.echo('{}: {}'.format(info, message)) + typer.echo(f'{info}: {message}') def _error(message: Any): info = typer.style('ERROR', fg=typer.colors.RED) - typer.echo('{}: {}'.format(info, message)) + typer.echo(f'{info}: {message}') class _ComponentBuilder(): @@ -116,17 +115,17 @@ def __init__( self._context_directory = context_directory self._dockerfile = self._context_directory / _DOCKERFILE self._component_filepattern = component_filepattern - self._components: List[ - component_factory.component_factory.ComponentInfo] = [] + self._components: List[component_factory.ComponentInfo] = [] # This is only set if we need to install KFP from local copy. self._maybe_copy_kfp_package = '' if kfp_package_path is None: - self._kfp_package_path = 'kfp=={}'.format(kfp.__version__) + self._kfp_package_path = f'kfp=={kfp.__version__}' elif kfp_package_path.is_dir(): - _info('Building KFP package from local directory {}'.format( - typer.style(str(kfp_package_path), fg=typer.colors.CYAN))) + _info( + f'Building KFP package from local directory {typer.style(str(kfp_package_path), fg=typer.colors.CYAN)}' + ) temp_dir = pathlib.Path(tempfile.mkdtemp()) try: subprocess.run([ @@ -139,8 +138,7 @@ def __init__( cwd=kfp_package_path) wheel_files = list(temp_dir.glob('*.whl')) if len(wheel_files) != 1: - _error('Failed to find built KFP wheel under {}'.format( - temp_dir)) + _error(f'Failed to find built KFP wheel under {temp_dir}') raise typer.Exit(1) wheel_file = wheel_files[0] @@ -149,16 +147,17 @@ def __init__( self._maybe_copy_kfp_package = 'COPY {wheel_name} {wheel_name}'.format( wheel_name=self._kfp_package_path) except subprocess.CalledProcessError as e: - _error('Failed to build KFP wheel locally:\n{}'.format(e)) + _error(f'Failed to build KFP wheel locally:\n{e}') raise typer.Exit(1) finally: - _info('Cleaning up temporary directory {}'.format(temp_dir)) + _info(f'Cleaning up temporary directory {temp_dir}') shutil.rmtree(temp_dir) else: - self._kfp_package_path = kfp_package_path + self._kfp_package_path = str(kfp_package_path) - _info('Building component using KFP package path: {}'.format( - typer.style(str(self._kfp_package_path), fg=typer.colors.CYAN))) + _info( + f'Building component using KFP package path: {typer.style(self._kfp_package_path, fg=typer.colors.CYAN)}' + ) self._context_directory_files = [ file.name @@ -178,8 +177,8 @@ def __init__( def _load_components(self): if not self._component_files: _error( - 'No component files found matching pattern `{}` in directory {}' - .format(self._component_filepattern, self._context_directory)) + f'No component files found matching pattern `{self._component_filepattern}` in directory {self._context_directory}' + ) raise typer.Exit(1) for python_file in self._component_files: @@ -192,23 +191,25 @@ def _load_components(self): formatted_module_file = typer.style( str(python_file), fg=typer.colors.CYAN) if not component_modules: - _error('No KFP components found in file {}'.format( - formatted_module_file)) + _error( + f'No KFP components found in file {formatted_module_file}' + ) raise typer.Exit(1) - _info('Found {} component(s) in file {}:'.format( - len(component_modules), formatted_module_file)) + _info( + f'Found {len(component_modules)} component(s) in file {formatted_module_file}:' + ) for name, component in component_modules.items(): - _info('{}: {}'.format(name, component)) + _info(f'{name}: {component}') self._components.append(component) - base_images = set([info.base_image for info in self._components]) - target_images = set([info.target_image for info in self._components]) + base_images = {info.base_image for info in self._components} + target_images = {info.target_image for info in self._components} if len(base_images) != 1: - _error('Found {} unique base_image values {}. Components' - ' must specify the same base_image and target_image.'.format( - len(base_images), base_images)) + _error( + f'Found {len(base_images)} unique base_image values {base_images}. Components must specify the same base_image and target_image.' + ) raise typer.Exit(1) self._base_image = base_images.pop() @@ -217,13 +218,14 @@ def _load_components(self): ' components. A base_image must be specified in order to' ' build the component.') raise typer.Exit(1) - _info('Using base image: {}'.format( - typer.style(self._base_image, fg=typer.colors.YELLOW))) + _info( + f'Using base image: {typer.style(self._base_image, fg=typer.colors.YELLOW)}' + ) if len(target_images) != 1: - _error('Found {} unique target_image values {}. Components' - ' must specify the same base_image and' - ' target_image.'.format(len(target_images), target_images)) + _error( + f'Found {len(target_images)} unique target_image values {target_images}. Components must specify the same base_image and target_image.' + ) raise typer.Exit(1) self._target_image = target_images.pop() @@ -232,8 +234,9 @@ def _load_components(self): ' components. A target_image must be specified in order' ' to build the component.') raise typer.Exit(1) - _info('Using target image: {}'.format( - typer.style(self._target_image, fg=typer.colors.YELLOW))) + _info( + f'Using target image: {typer.style(self._target_image, fg=typer.colors.YELLOW)}' + ) def _maybe_write_file(self, filename: str, @@ -241,22 +244,23 @@ def _maybe_write_file(self, overwrite: bool = False): formatted_filename = typer.style(filename, fg=typer.colors.CYAN) if filename in self._context_directory_files: - _info('Found existing file {} under {}.'.format( - formatted_filename, self._context_directory)) + _info( + f'Found existing file {formatted_filename} under {self._context_directory}.' + ) if not overwrite: _info('Leaving this file untouched.') return else: - _warning( - 'Overwriting existing file {}'.format(formatted_filename)) + _warning(f'Overwriting existing file {formatted_filename}') else: - _warning('{} not found under {}. Creating one.'.format( - formatted_filename, self._context_directory)) + _warning( + f'{formatted_filename} not found under {self._context_directory}. Creating one.' + ) filepath = self._context_directory / filename with open(filepath, 'w') as f: - f.write('# Generated by KFP.\n{}'.format(contents)) - _info('Generated file {}.'.format(filepath)) + f.write(f'# Generated by KFP.\n{contents}') + _info(f'Generated file {filepath}.') def maybe_generate_requirements_txt(self): self._maybe_write_file(_REQUIREMENTS_TXT, '') @@ -266,9 +270,8 @@ def maybe_generate_dockerignore(self): def write_component_files(self): for component_info in self._components: - filename = ( - component_info.output_component_file or - component_info.function_name + '.yaml') + filename = component_info.output_component_file or f'{component_info.function_name}.yaml' + container_filename = ( self._context_directory / _COMPONENT_METADATA_DIR / filename) container_filename.parent.mkdir(exist_ok=True, parents=True) @@ -295,8 +298,9 @@ def maybe_generate_dockerfile(self, overwrite_dockerfile: bool = False): overwrite_dockerfile) def build_image(self, push_image: bool = True): - _info('Building image {} using Docker...'.format( - typer.style(self._target_image, fg=typer.colors.YELLOW))) + _info( + f'Building image {typer.style(self._target_image, fg=typer.colors.YELLOW)} using Docker...' + ) client = docker.from_env() docker_log_prefix = typer.style('Docker', fg=typer.colors.CYAN) @@ -312,21 +316,22 @@ def build_image(self, push_image: bool = True): for log in logs: message = log.get('stream', '').rstrip('\n') if message: - _info('{}: {}'.format(docker_log_prefix, message)) + _info(f'{docker_log_prefix}: {message}') except docker.errors.BuildError as e: for log in e.build_log: message = log.get('message', '').rstrip('\n') if message: - _error('{}: {}'.format(docker_log_prefix, message)) - _error('{}: {}'.format(docker_log_prefix, e)) + _error(f'{docker_log_prefix}: {message}') + _error(f'{docker_log_prefix}: {e}') raise typer.Exit(1) if not push_image: return - _info('Pushing image {}...'.format( - typer.style(self._target_image, fg=typer.colors.YELLOW))) + _info( + f'Pushing image {typer.style(self._target_image, fg=typer.colors.YELLOW)}...' + ) try: response = client.images.push( @@ -335,13 +340,14 @@ def build_image(self, push_image: bool = True): status = log.get('status', '').rstrip('\n') layer = log.get('id', '') if status: - _info('{}: {} {}'.format(docker_log_prefix, layer, status)) + _info(f'{docker_log_prefix}: {layer} {status}') except docker.errors.BuildError as e: - _error('{}: {}'.format(docker_log_prefix, e)) + _error(f'{docker_log_prefix}: {e}') raise e - _info('Built and pushed component container {}'.format( - typer.style(self._target_image, fg=typer.colors.YELLOW))) + _info( + f'Built and pushed component container {typer.style(self._target_image, fg=typer.colors.YELLOW)}' + ) @app.callback() @@ -374,22 +380,19 @@ def build(components_directory: pathlib.Path = typer.Argument( """Builds containers for KFP v2 Python-based components.""" components_directory = components_directory.resolve() if not components_directory.is_dir(): - _error('{} does not seem to be a valid directory.'.format( - components_directory)) + _error(f'{components_directory} does not seem to be a valid directory.') raise typer.Exit(1) if engine != _Engine.DOCKER: _error('Currently, only `docker` is supported for --engine.') raise typer.Exit(1) - if engine == _Engine.DOCKER: - if not _DOCKER_IS_PRESENT: - _error( - 'The `docker` Python package was not found in the current' - ' environment. Please run `pip install docker` to install it.' - ' Optionally, you can also install KFP with all of its' - ' optional dependencies by running `pip install kfp[all]`.') - raise typer.Exit(1) + if not _DOCKER_IS_PRESENT: + _error('The `docker` Python package was not found in the current' + ' environment. Please run `pip install docker` to install it.' + ' Optionally, you can also install KFP with all of its' + ' optional dependencies by running `pip install kfp[all]`.') + raise typer.Exit(1) builder = _ComponentBuilder( context_directory=components_directory, diff --git a/sdk/python/kfp/cli/components_test.py b/sdk/python/kfp/cli/components_test.py index bc76a765e1a..f34159435b8 100644 --- a/sdk/python/kfp/cli/components_test.py +++ b/sdk/python/kfp/cli/components_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Tests for `components` command group in KFP CLI.""" import contextlib -import importlib import pathlib import sys import textwrap @@ -30,18 +29,6 @@ except ImportError: sys.modules['docker'] = mock.Mock() from kfp.cli import components -from kfp.deprecated.cli import components - -_COMPONENT_TEMPLATE = ''' -from kfp.dsl import * - -@component( - base_image={base_image}, - target_image={target_image}, - output_component_file={output_component_file}) -def {func_name}(): - pass -''' def _make_component(func_name: str, @@ -222,7 +209,7 @@ def testTargetImageMustBeTheSameInAllComponents(self): ) self.assertEqual(result.exit_code, 1) - def testTargetImageMustBeTheSameInAllComponents(self): + def testTargetImageMustBeTheSameInAllComponentsWithBaseImage(self): component_one = _make_component( func_name='one', base_image='image-1', target_image='target-image') component_two = _make_component( @@ -412,7 +399,7 @@ def testDockerfileIsCreatedCorrectly(self): COPY requirements.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txt - RUN pip install --no-cache-dir kfp==1.8.11 + RUN pip install --no-cache-dir kfp==1.2.3 COPY . . ''')) @@ -455,7 +442,7 @@ def testExistingDockerfileCanBeOverwritten(self): COPY requirements.txt requirements.txt RUN pip install --no-cache-dir -r requirements.txt - RUN pip install --no-cache-dir kfp==1.8.11 + RUN pip install --no-cache-dir kfp==1.2.3 COPY . . ''')) diff --git a/sdk/python/kfp/cli/diagnose_me/dev_env.py b/sdk/python/kfp/cli/diagnose_me/dev_env.py index e32dc85a012..0a3d5999866 100644 --- a/sdk/python/kfp/cli/diagnose_me/dev_env.py +++ b/sdk/python/kfp/cli/diagnose_me/dev_env.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me/dev_env_test.py b/sdk/python/kfp/cli/diagnose_me/dev_env_test.py index f859e724c45..34ba85da387 100644 --- a/sdk/python/kfp/cli/diagnose_me/dev_env_test.py +++ b/sdk/python/kfp/cli/diagnose_me/dev_env_test.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me/gcp.py b/sdk/python/kfp/cli/diagnose_me/gcp.py index 5dc47b1baae..7adcde813da 100644 --- a/sdk/python/kfp/cli/diagnose_me/gcp.py +++ b/sdk/python/kfp/cli/diagnose_me/gcp.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); @@ -98,9 +97,7 @@ def execute_gsutil_command( Returns: utility.ExecutorResponse with outputs from stdout,stderr and execution code. """ - command_list = ['gsutil'] - command_list.extend(gsutil_command_list) - + command_list = ['gsutil', *gsutil_command_list] if project_id is not None: command_list.extend(['-p', project_id]) diff --git a/sdk/python/kfp/cli/diagnose_me/gcp_test.py b/sdk/python/kfp/cli/diagnose_me/gcp_test.py index de441559868..7edeceec874 100644 --- a/sdk/python/kfp/cli/diagnose_me/gcp_test.py +++ b/sdk/python/kfp/cli/diagnose_me/gcp_test.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py index 9d0f4e0cc73..0dcae1595e9 100644 --- a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py +++ b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); @@ -58,8 +57,7 @@ def execute_kubectl_command( Returns: utility.ExecutorResponse with outputs from stdout,stderr and execution code. """ - command_list = ['kubectl'] - command_list.extend(kubectl_command_list) + command_list = ['kubectl', *kubectl_command_list] if not human_readable: command_list.extend(['-o', 'json']) diff --git a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py index 74999738b75..6b2376476aa 100644 --- a/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py +++ b/sdk/python/kfp/cli/diagnose_me/kubernetes_cluster_test.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me/utility.py b/sdk/python/kfp/cli/diagnose_me/utility.py index f83984a091f..13b666cf32f 100644 --- a/sdk/python/kfp/cli/diagnose_me/utility.py +++ b/sdk/python/kfp/cli/diagnose_me/utility.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); @@ -55,7 +54,7 @@ def execute_command(self, command_list: List[Text]): self._stderr = stderr.decode('utf-8') self._returncode = process.returncode except OSError as e: - self._stderr = e + self._stderr = str(e) self._stdout = '' self._returncode = e.errno self._parse_raw_input() diff --git a/sdk/python/kfp/cli/diagnose_me/utility_test.py b/sdk/python/kfp/cli/diagnose_me/utility_test.py index 0c3569de993..4430dfcfade 100644 --- a/sdk/python/kfp/cli/diagnose_me/utility_test.py +++ b/sdk/python/kfp/cli/diagnose_me/utility_test.py @@ -1,4 +1,3 @@ -# Lint as: python3 # Copyright 2019 The Kubeflow Authors. All Rights Reserved. # # Licensed under the Apache License,Version 2.0 (the "License"); diff --git a/sdk/python/kfp/cli/diagnose_me_cli.py b/sdk/python/kfp/cli/diagnose_me_cli.py index 975df4e0599..8a3564b0b33 100644 --- a/sdk/python/kfp/cli/diagnose_me_cli.py +++ b/sdk/python/kfp/cli/diagnose_me_cli.py @@ -1,23 +1,20 @@ -# Lint as: python3 """CLI interface for KFP diagnose_me tool.""" import json as json_library import sys -from typing import Dict, Text +from typing import Dict, List, Text, Union import click from kfp.cli.diagnose_me import dev_env, gcp +from kfp.cli.diagnose_me import kubernetes_cluster from kfp.cli.diagnose_me import kubernetes_cluster as k8 from kfp.cli.diagnose_me import utility +ResultsType = Dict[Union[gcp.Commands, dev_env.Commands, + kubernetes_cluster.Commands], utility.ExecutorResponse] -@click.group() -def diagnose_me(): - """Prints diagnoses information for KFP environment.""" - pass - -@diagnose_me.command() +@click.command() @click.option( '-j', '--json', @@ -37,7 +34,7 @@ def diagnose_me(): @click.pass_context def diagnose_me(ctx: click.Context, json: bool, project_id: str, namespace: str): - """Runs environment diagnostic with specified parameters. + """Runs KFP environment diagnostic with specified parameters. Feature stage: [Alpha](https://github.com/kubeflow/pipelines/blob/07328e5094ac2981d3059314cc848fbb71437a76/docs/release/feature-stages.md#alpha) @@ -50,17 +47,18 @@ def diagnose_me(ctx: click.Context, json: bool, project_id: str, for app in ['Google Cloud SDK', 'gsutil', 'kubectl']: if app not in local_env_gcloud_sdk.json_output: raise RuntimeError( - '%s is not installed, gcloud, gsutil and kubectl are required ' - % app + 'for this app to run. Please follow instructions at ' + + f'{app} is not installed, gcloud, gsutil and kubectl are required ' + + 'for this app to run. Please follow instructions at ' + 'https://cloud.google.com/sdk/install to install the SDK.') click.echo('Collecting diagnostic information ...', file=sys.stderr) # default behaviour dump all configurations - results = {} - for gcp_command in gcp.Commands: - results[gcp_command] = gcp.get_gcp_configuration( + results: ResultsType = { + gcp_command: gcp.get_gcp_configuration( gcp_command, project_id=project_id, human_readable=not json) + for gcp_command in gcp.Commands + } for k8_command in k8.Commands: results[k8_command] = k8.get_kubectl_configuration( @@ -73,8 +71,7 @@ def diagnose_me(ctx: click.Context, json: bool, project_id: str, print_to_sdtout(results, not json) -def print_to_sdtout(results: Dict[str, utility.ExecutorResponse], - human_readable: bool): +def print_to_sdtout(results: ResultsType, human_readable: bool): """Viewer to print the ExecutorResponse results to stdout. Args: @@ -85,18 +82,18 @@ def print_to_sdtout(results: Dict[str, utility.ExecutorResponse], """ output_dict = {} - human_readable_result = [] + human_readable_result: List[str] = [] for key, val in results.items(): if val.has_error: output_dict[ key. - name] = 'Following error occurred during the diagnoses: %s' % val.stderr + name] = f'Following error occurred during the diagnoses: {val.stderr}' continue output_dict[key.name] = val.json_output - human_readable_result.append('================ %s ===================' % - (key.name)) - human_readable_result.append(val.parsed_output) + human_readable_result.extend( + (f'================ {key.name} ===================', + val.parsed_output)) if human_readable: result = '\n'.join(human_readable_result) diff --git a/sdk/python/kfp/cli/experiment.py b/sdk/python/kfp/cli/experiment.py index b896c029ea9..c8e292653a9 100644 --- a/sdk/python/kfp/cli/experiment.py +++ b/sdk/python/kfp/cli/experiment.py @@ -91,7 +91,7 @@ def delete(ctx: click.Context, experiment_id: str): client = ctx.obj["client"] client.delete_experiment(experiment_id) - click.echo("{} is deleted.".format(experiment_id)) + click.echo(f"{experiment_id} is deleted.") def _display_experiments(experiments: List[ApiExperiment], diff --git a/sdk/python/kfp/cli/output.py b/sdk/python/kfp/cli/output.py index aa1d7bf45ad..e818c857f1c 100644 --- a/sdk/python/kfp/cli/output.py +++ b/sdk/python/kfp/cli/output.py @@ -29,7 +29,7 @@ class OutputFormat(Enum): def print_output(data: Union[list, dict], headers: list, - output_format: str, + output_format: OutputFormat, table_format: str = "simple"): """Prints the output from the cli command execution based on the specified format. @@ -51,13 +51,7 @@ def print_output(data: Union[list, dict], if output_format == OutputFormat.table.name: click.echo(tabulate(data, headers=headers, tablefmt=table_format)) elif output_format == OutputFormat.json.name: - if not headers: - output = data - else: - output = [] - for row in data: - output.append(dict(zip(headers, row))) + output = [dict(zip(headers, row)) for row in data] if headers else data click.echo(json.dumps(output, indent=4)) else: - raise NotImplementedError( - "Unknown Output Format: {}".format(output_format)) + raise NotImplementedError(f"Unknown Output Format: {output_format}") diff --git a/sdk/python/kfp/cli/pipeline.py b/sdk/python/kfp/cli/pipeline.py index ba0dad49e51..f2e752cc72e 100644 --- a/sdk/python/kfp/cli/pipeline.py +++ b/sdk/python/kfp/cli/pipeline.py @@ -13,7 +13,7 @@ # limitations under the License. import json -from typing import List, Optional +from typing import Any, Dict, List, Optional, Union import click import kfp_server_api @@ -68,8 +68,8 @@ def upload_version(ctx: click.Context, if pipeline_name is not None: pipeline_id = client.get_pipeline_id(name=pipeline_name) if pipeline_id is None: - raise ValueError("Can't find a pipeline with name: %s" % - pipeline_name) + raise ValueError( + f"Can't find a pipeline with name: {pipeline_name}") version = client.pipeline_uploads.upload_pipeline_version( package_file, name=pipeline_version, pipelineid=pipeline_id) _display_pipeline_version(version, output_format) @@ -239,11 +239,9 @@ def _display_pipeline(pipeline: kfp_server_api.ApiPipeline, print_output(table, [], output_format, table_format="plain") print_output(data, headers, output_format, table_format="grid") elif output_format == OutputFormat.json.name: - output = dict() - output["Pipeline Details"] = dict(table) - params = [] - for item in data: - params.append(dict(zip(headers, item))) + OutputType = Dict[str, Union[Dict[str, str], List[Dict[str, Any]]]] + output: OutputType = {"Pipeline Details": dict(table)} + params = [dict(zip(headers, item)) for item in data] output["Pipeline Parameters"] = params print_output(output, [], output_format) diff --git a/sdk/python/kfp/cli/recurring_run.py b/sdk/python/kfp/cli/recurring_run.py index b9f61b8d990..3b66c42aa9b 100644 --- a/sdk/python/kfp/cli/recurring_run.py +++ b/sdk/python/kfp/cli/recurring_run.py @@ -89,7 +89,6 @@ def create(ctx: click.Context, end_time: Optional[str] = None, interval_second: Optional[int] = None, max_concurrency: Optional[int] = None, - params: Optional[dict] = None, pipeline_package_path: Optional[str] = None, pipeline_id: Optional[str] = None, start_time: Optional[str] = None, diff --git a/sdk/python/kfp/cli/run.py b/sdk/python/kfp/cli/run.py index 5282206abc3..e686db7def4 100644 --- a/sdk/python/kfp/cli/run.py +++ b/sdk/python/kfp/cli/run.py @@ -21,8 +21,8 @@ import click import kfp_server_api +from kfp import client from kfp.cli.output import OutputFormat, print_output -from kfp.client import Client @click.group() @@ -159,7 +159,7 @@ def get(ctx: click.Context, watch: bool, detail: bool, run_id: str): _display_run(client, namespace, run_id, watch, output_format, detail) -def _display_run(client: click.Context, +def _display_run(client: client.Client, namespace: str, run_id: str, watch: bool, @@ -200,8 +200,7 @@ def _display_run(client: click.Context, argo_workflow_name = manifest['metadata']['name'] break if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']: - click.echo('Run is finished with status {}.'.format( - run_detail.run.status)) + click.echo(f'Run is finished with status {run_detail.run.status}.') return if argo_workflow_name: subprocess.run( @@ -209,7 +208,7 @@ def _display_run(client: click.Context, _print_runs([run], output_format) -def _wait_for_run_completion(client: Client, run_id: str, timeout: int, +def _wait_for_run_completion(client: client.Client, run_id: str, timeout: int, output_format: OutputFormat): run_detail = client.wait_for_run_completion(run_id, timeout) _print_runs([run_detail.run], output_format) diff --git a/sdk/python/requirements-dev.txt b/sdk/python/requirements-dev.txt index d97a64fa2f2..9552a937755 100644 --- a/sdk/python/requirements-dev.txt +++ b/sdk/python/requirements-dev.txt @@ -4,4 +4,5 @@ pylint==2.12.2 types-protobuf==3.19.15 types-PyYAML==6.0.5 types-requests==2.27.14 +types-tabulate==0.8.6 yapf==0.32.0 \ No newline at end of file diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 782b46b56f4..93b0071a359 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -91,6 +91,6 @@ def find_version(*file_path_parts: str) -> str: 'console_scripts': [ 'dsl-compile = kfp.compiler.main:main', 'dsl-compile-deprecated = kfp.deprecated.compiler.main:main', - 'kfp=kfp.cli.cli:main', + 'kfp=kfp.cli.__main__:main', ] }) From 00eb97c9cb24a49f98dec42903449df2ae66c4e3 Mon Sep 17 00:00:00 2001 From: Xiaowen Xie Date: Wed, 20 Apr 2022 11:23:16 -0700 Subject: [PATCH 10/17] feat(components): Create new import model evaluation component PiperOrigin-RevId: 443146550 --- .../aiplatform/evaluation/import.yaml | 34 ++++++ .../experimental/evaluation/__init__.py | 14 +++ .../evaluation/import_model_evaluation.py | 112 ++++++++++++++++++ 3 files changed, 160 insertions(+) create mode 100644 components/google-cloud/google_cloud_pipeline_components/aiplatform/evaluation/import.yaml create mode 100644 components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/__init__.py create mode 100644 components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/import_model_evaluation.py diff --git a/components/google-cloud/google_cloud_pipeline_components/aiplatform/evaluation/import.yaml b/components/google-cloud/google_cloud_pipeline_components/aiplatform/evaluation/import.yaml new file mode 100644 index 00000000000..4ee11d9e1e4 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/aiplatform/evaluation/import.yaml @@ -0,0 +1,34 @@ +name: import_model_evaluation +description: | + Calls ModelService.ImportModelEvaluation to import a model evaluation file to Vertex + + Args: + metrics (system.Metrics): + Path of metrics generated from an evaluation component. + explanation (system.Metrics): + Path for model explanation metrics generated from an evaluation comonent. + model (google.VertexModel): + Vertex model resource that will be the parent resource of the uploaded evaluation. + metrics_schema_uri (str): + GCS link to the schema URI for model evaluation metrics. +inputs: + - {name: metrics, type: Metrics} + - {name: explanation, type: Metrics, optional: True} + - {name: model, type: google.VertexModel} + - {name: metrics_schema_uri, type: String} +implementation: + container: + image: gcr.io/ml-pipeline/google-cloud-pipeline-components:latest + command: [python3, -u, -m, google_cloud_pipeline_components.container.experimental.evaluation.import_model_evaluation] + args: + - --metrics + - {inputPath: metrics} + - if: + cond: {isPresent: explanation} + then: + - --explanation + - "{{$.inputs.artifacts['explanation'].metadata['explanation_gcs_path']}}" + - --metrics_schema_uri + - {inputValue: metrics_schema_uri} + - --model_name + - "{{$.inputs.artifacts['model'].metadata['resourceName']}}" diff --git a/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/__init__.py b/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/__init__.py new file mode 100644 index 00000000000..e73b72155a1 --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2021 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Google Cloud Pipeline Evaluation Components root.""" diff --git a/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/import_model_evaluation.py b/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/import_model_evaluation.py new file mode 100644 index 00000000000..c646d13a78b --- /dev/null +++ b/components/google-cloud/google_cloud_pipeline_components/container/experimental/evaluation/import_model_evaluation.py @@ -0,0 +1,112 @@ +# Copyright 2022 The Kubeflow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module for importing a model evaluation to an existing Vertex model resource.""" + +import sys +import argparse +import json +import six + +from google.cloud import aiplatform +from google.api_core import gapic_v1 +from google.protobuf.struct_pb2 import Value, Struct, NULL_VALUE, ListValue + + +def main(argv): + """Calls ModelService.ImportModelEvaluation""" + parser = argparse.ArgumentParser( + prog='Vertex Model Service evaluation importer', description='') + parser.add_argument( + '--metrics', + dest='metrics', + type=str, + required=True, + default=argparse.SUPPRESS) + parser.add_argument( + '--explanation', dest='explanation', type=str, default=None) + parser.add_argument( + '--metrics_schema_uri', + dest='metrics_schema_uri', + type=str, + required=True, + default=argparse.SUPPRESS) + parser.add_argument( + '--model_name', + dest='model_name', + type=str, + required=True, + default=argparse.SUPPRESS) + + parsed_args, _ = parser.parse_known_args(argv) + + _, project_id, _, location, _, model_id = parsed_args.model_name.split('/') + + with open(parsed_args.metrics) as metrics_file: + model_evaluation = { + 'metrics': + to_value( + next( + iter( + json.loads(metrics_file.read())['slicedMetrics'][0] + ['metrics'].values()))), + 'metrics_schema_uri': + parsed_args.metrics_schema_uri, + } + + if parsed_args.explanation: + with open('/gcs' + parsed_args.explanation[4:]) as explanation_file: + model_evaluation['model_explanation'] = { + 'mean_attributions': [{ + 'feature_attributions': + to_value( + json.loads(explanation_file.read())['explanation'] + ['attributions'][0]['featureAttributions']) + }] + } + print(model_evaluation) + aiplatform.gapic.ModelServiceClient( + client_info=gapic_v1.client_info.ClientInfo( + user_agent='google-cloud-pipeline-components',), + client_options={ + 'api_endpoint': location + '-aiplatform.googleapis.com', + }).import_model_evaluation( + parent=parsed_args.model_name, + model_evaluation=model_evaluation, + ) + + +def to_value(value): + if value is None: + return Value(null_value=NULL_VALUE) + elif isinstance(value, bool): + # This check needs to happen before isinstance(value, int), + # isinstance(value, int) returns True when value is bool. + return Value(bool_value=value) + elif isinstance(value, six.integer_types) or isinstance(value, float): + return Value(number_value=value) + elif isinstance(value, six.string_types) or isinstance(value, six.text_type): + return Value(string_value=value) + elif isinstance(value, dict): + return Value( + struct_value=Struct(fields={k: to_value(v) for k, v in value.items()})) + elif isinstance(value, list): + return Value( + list_value=ListValue(values=[to_value(item) for item in value])) + else: + raise ValueError('Unsupported data type: {}'.format(type(value))) + + +if __name__ == '__main__': + print(sys.argv) + main(sys.argv[1:]) From 27d5404cda92577d4f380d84e1a358ddd0fee81a Mon Sep 17 00:00:00 2001 From: hilcj Date: Wed, 20 Apr 2022 13:47:54 -0700 Subject: [PATCH 11/17] feat(api): Update IR with RetryPolicy (#7581) --- api/v2alpha1/pipeline_spec.proto | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto index 1e6bf932201..c5c8f45f7f7 100644 --- a/api/v2alpha1/pipeline_spec.proto +++ b/api/v2alpha1/pipeline_spec.proto @@ -484,6 +484,29 @@ message PipelineTaskSpec { // Iterator to iterate over a parameter input. ParameterIteratorSpec parameter_iterator = 10; } + + // User-configured task-level retry. + message RetryPolicy { + // Number of retries before considering a task as failed. Set to 0 or + // unspecified to disallow retry." + int32 max_retry_count = 1; + + // The time interval between retries. Defaults to zero (an immediate retry). + google.protobuf.Duration backoff_duration = 2; + + // The exponential backoff factor applied to backoff_duration. If + // unspecified, will default to 2. + double backoff_factor = 3; + + // The maximum duration during which the task will be retried according to + // the backoff strategy. Max allowed is 1 hour - higher value will be capped + // to this limit. If unspecified, will set to 1 hour. + google.protobuf.Duration backoff_max_duration = 4; + } + + // User-configured task-level retry. + // Applicable only to component tasks. + RetryPolicy retry_policy = 11; } // The spec of an artifact iterator. It supports fan-out a workflow from a list From 51c6c8c1b09379719090ef01e516cbb771fecadd Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 20 Apr 2022 20:50:17 +0000 Subject: [PATCH 12/17] fix static_loop_arguments --- samples/core/loop_static/loop_static_v2.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/samples/core/loop_static/loop_static_v2.py b/samples/core/loop_static/loop_static_v2.py index a5c1eafe9c0..62f92b6e814 100644 --- a/samples/core/loop_static/loop_static_v2.py +++ b/samples/core/loop_static/loop_static_v2.py @@ -19,15 +19,12 @@ def concat_op(a: str, b: str) -> str: return a + b -_DEFAULT_LOOP_ARGUMENTS = [{'a': '1', 'b': '2'}, {'a': '10', 'b': '20'}] - - @dsl.pipeline(name='pipeline-with-loop-static') def my_pipeline( - static_loop_arguments: List[dict] = _DEFAULT_LOOP_ARGUMENTS, greeting: str = 'this is a test for looping through parameters', ): print_task = print_op(text=greeting) + static_loop_arguments = [{'a': '1', 'b': '2'}, {'a': '10', 'b': '20'}] with dsl.ParallelFor(static_loop_arguments) as item: concat_task = concat_op(a=item.a, b=item.b) From 02eb7b175385012aaba9782838604b1a2bdf01ee Mon Sep 17 00:00:00 2001 From: Lingqing Gan Date: Wed, 20 Apr 2022 14:00:32 -0700 Subject: [PATCH 13/17] change the driver image change the driver image back to the kfp container registry. --- backend/src/v2/compiler/argocompiler/argo.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index a00e24c6f4a..c4fb4868e6b 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -107,7 +107,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow, wf: wf, templates: make(map[string]*wfapi.Template), // TODO(chensun): release process and update the images. - driverImage: "gcr.io/ling-kfp/dev/kfp-driver:latest", + driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest", launcherImage: "gcr.io/ml-pipeline-test/dev/kfp-launcher-v2:latest", job: job, spec: spec, From 04d5b2ce03e57e364352cf6c992ee44353e6298b Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 20 Apr 2022 21:08:53 +0000 Subject: [PATCH 14/17] change variable declaration --- backend/src/v2/driver/driver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 34d622073eb..bb7f7deb799 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -445,7 +445,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E } // Check the items type of parameterIterator: // It can be "inputParameter" or "Raw" - value := structpb.NewNullValue() + var value *structpb.Value var ok bool switch iterator.GetItems().GetKind().(type) { case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: From ca6def8d01c9cfed01c0bfb1b5407d9978e56cf1 Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 20 Apr 2022 22:20:08 +0000 Subject: [PATCH 15/17] remove logs --- backend/src/v2/driver/driver.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index bb7f7deb799..33e69631f0d 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -449,13 +449,11 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E var ok bool switch iterator.GetItems().GetKind().(type) { case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter: - glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()] if !ok { return execution, report(fmt.Errorf("cannot find input parameter")) } case *pipelinespec.ParameterIteratorSpec_ItemsSpec_Raw: - glog.Infof("ParameterIterator type: %T", iterator.GetItems().GetKind()) value_raw := iterator.GetItems().GetRaw() var unmarshalled_raw interface{} err = json.Unmarshal([]byte(value_raw), &unmarshalled_raw) From 4ebab0a8f85e946d144c7ed9fb0791c9312e43ed Mon Sep 17 00:00:00 2001 From: Linchin Date: Wed, 20 Apr 2022 22:22:41 +0000 Subject: [PATCH 16/17] remove log --- backend/src/v2/driver/driver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 33e69631f0d..1ca2c077e23 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -770,7 +770,6 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int, default: return nil, fmt.Errorf("bug: iteration_index>=0, but task iterator is empty") } - glog.Infof("inputs value %+v", inputs) return inputs, nil } // get executions in context on demand From 7f0160445c30b55e02261b7e0e5337f0a7245b0e Mon Sep 17 00:00:00 2001 From: Linchin Date: Fri, 22 Apr 2022 02:50:02 +0000 Subject: [PATCH 17/17] changed argo template --- backend/src/apiserver/template/v2_template.go | 3 +++ backend/src/v2/cmd/compiler/main.go | 4 ++++ backend/src/v2/compiler/argocompiler/argo.go | 15 ++++++++++++++- backend/src/v2/driver/driver.go | 1 + 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/backend/src/apiserver/template/v2_template.go b/backend/src/apiserver/template/v2_template.go index 538f0844da6..bea2afa5e7f 100644 --- a/backend/src/apiserver/template/v2_template.go +++ b/backend/src/apiserver/template/v2_template.go @@ -7,6 +7,7 @@ import ( structpb "github.com/golang/protobuf/ptypes/struct" "github.com/ghodss/yaml" + "github.com/golang/glog" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" api "github.com/kubeflow/pipelines/backend/api/go_client" "github.com/kubeflow/pipelines/backend/src/common/util" @@ -36,6 +37,7 @@ func (t *V2Spec) ScheduledWorkflow(apiJob *api.Job) (*scheduledworkflow.Schedule } job.RuntimeConfig = jobRuntimeConfig wf, err := argocompiler.Compile(job, nil) + glog.Infof("wf value: %+v", wf) if err != nil { return nil, util.Wrap(err, "Failed to compile job") } @@ -147,6 +149,7 @@ func (t *V2Spec) RunWorkflow(apiRun *api.Run, options RunWorkflowOptions) (*util } job.RuntimeConfig = jobRuntimeConfig wf, err := argocompiler.Compile(job, nil) + glog.Infof("wf value: %+v", wf) if err != nil { return nil, util.Wrap(err, "Failed to compile job") } diff --git a/backend/src/v2/cmd/compiler/main.go b/backend/src/v2/cmd/compiler/main.go index 94f94f55912..9f755307be5 100644 --- a/backend/src/v2/cmd/compiler/main.go +++ b/backend/src/v2/cmd/compiler/main.go @@ -42,6 +42,9 @@ func main() { flag.Parse() noSpec := specPath == nil || *specPath == "" noJob := jobPath == nil || *jobPath == "" + glog.Info("Hiiiiiiiiiiiiiiii!!!!!!!!!!!!!!!!!!!!!!!!!!!") + noSpec = true + noJob = true if noSpec && noJob { glog.Exitf("spec or job must be specified") } @@ -70,6 +73,7 @@ func compile(job *pipelinespec.PipelineJob) error { LauncherImage: *launcher, PipelineRoot: *pipelineRoot, }) + glog.Infof("wf value: %+v", wf) if err != nil { return err } diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index c4fb4868e6b..eb798d0e2dd 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -19,6 +19,7 @@ import ( "strings" wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/golang/glog" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" "github.com/kubeflow/pipelines/backend/src/v2/compiler" "google.golang.org/protobuf/proto" @@ -73,6 +74,16 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow, } } } + // fill in exit handler + var exit_task string + all_root_tasks := spec.GetRoot().GetDag().GetTasks() + for task_name, task_spec := range all_root_tasks { + glog.Infof("task name, task spec: %s, %+v", task_name, task_spec) + if task_spec.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" { + exit_task = task_name + glog.Infof("exit task: %s", exit_task) + } + } // initialization wf := &wfapi.Workflow{ TypeMeta: k8smeta.TypeMeta{ @@ -101,13 +112,15 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow, }, ServiceAccountName: "pipeline-runner", Entrypoint: tmplEntrypoint, + OnExit: exit_task, }, } c := &workflowCompiler{ wf: wf, templates: make(map[string]*wfapi.Template), // TODO(chensun): release process and update the images. - driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest", + // driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest", + driverImage: "gcr.io/ling-kfp/dev/kfp-driver:latest", launcherImage: "gcr.io/ml-pipeline-test/dev/kfp-launcher-v2:latest", job: job, spec: spec, diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 1ca2c077e23..d5d2b62f783 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -122,6 +122,7 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio } // TODO(Bobgy): fill in run resource. pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, opts.Namespace, "run-resource", pipelineRoot) + glog.Infof("metadata pipeline: %+v", pipeline) if err != nil { return nil, err }