Skip to content

Commit

Permalink
feat(backend): Implement RuntimeConfig in backend (kubeflow#8085)
Browse files Browse the repository at this point in the history
* add runtime config model

* add run store and tests

* add runtime_config to model converter and tests

* add runtime_config to api converter and its tests

* change api server and related tests

* remove v2 runtime_config test

* add runtimeconfig to upgrade test

* fix test values

* upgrade test debug

* tests

* add more info for debug

* use NullString instead of String, remove debug

* fix type conversion

* change function and add unit tests

* run go fmt

* Add comments for model

* marshal params using v2 structpb values

* fix small bug

* Revert "run go fmt"

This reverts commit 251c3a9.

* No longer sort keys

* test values and explain comparison using .String()

* func toApiRuntimeConfig

* tests updates

* add api converter tests

* change test

* fix format

* change test

* simplify marshalling parameters
  • Loading branch information
Linchin authored and bokleung committed Aug 15, 2022
1 parent 9750922 commit d852e8b
Show file tree
Hide file tree
Showing 11 changed files with 491 additions and 42 deletions.
5 changes: 5 additions & 0 deletions backend/src/apiserver/model/pipeline_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,10 @@ type PipelineSpec struct {
WorkflowSpecManifest string `gorm:"column:WorkflowSpecManifest; not null; size:65535"`

// Store parameters key-value pairs as serialized string.
// This field is only used for V1 API. For V2, use the `Parameters` field in RuntimeConfig.
// At most one of the fields `Parameters` and `RuntimeConfig` can be non-empty
Parameters string `gorm:"column:Parameters; size:65535"`

// Runtime config of the pipeline, only used for v2 API.
RuntimeConfig
}
25 changes: 25 additions & 0 deletions backend/src/apiserver/model/runtime_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2022 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

type RuntimeConfig struct {
// Store parameters key-value pairs as serialized string.
Parameters string `gorm:"column:RuntimeParameters; size:65535"`

// A path in a object store bucket which will be treated as the root
// output directory of the pipeline. It is used by the system to
// generate the paths of output artifacts. Ref:(https://www.kubeflow.org/docs/components/pipelines/pipeline-root/)
PipelineRoot string `gorm:"column:PipelineRoot; size:65535"`
}
41 changes: 9 additions & 32 deletions backend/src/apiserver/resource/model_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package resource

import (
"encoding/json"
"fmt"

"github.com/kubeflow/pipelines/backend/src/apiserver/template"
"google.golang.org/protobuf/types/known/structpb"

api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
Expand Down Expand Up @@ -96,7 +96,7 @@ func (r *ResourceManager) ToModelRunDetail(run *api.Run, runId string, workflow
if templateType == template.V1 {
params, err := apiParametersToModelParameters(run.GetPipelineSpec().GetParameters())
if err != nil {
return nil, util.Wrap(err, "Unable to parse the parameter.")
return nil, util.Wrap(err, "Unable to parse the V1 parameter.")
}
runDetail.Parameters = params
runDetail.WorkflowSpecManifest = manifest
Expand All @@ -106,10 +106,11 @@ func (r *ResourceManager) ToModelRunDetail(run *api.Run, runId string, workflow
} else if templateType == template.V2 {
params, err := runtimeConfigToModelParameters(run.GetPipelineSpec().GetRuntimeConfig())
if err != nil {
return nil, util.Wrap(err, "Unable to parse the parameter.")
return nil, util.Wrap(err, "Unable to parse the V2 parameter.")
}
runDetail.Parameters = params
runDetail.PipelineSpecManifest = manifest
runDetail.PipelineSpec.RuntimeConfig.Parameters = params
runDetail.PipelineSpec.RuntimeConfig.PipelineRoot = run.GetPipelineSpec().GetRuntimeConfig().GetPipelineRoot()
return runDetail, nil

} else {
Expand Down Expand Up @@ -168,8 +169,9 @@ func (r *ResourceManager) ToModelJob(job *api.Job, swf *util.ScheduledWorkflow,
if err != nil {
return nil, util.Wrap(err, "Unable to parse the parameter.")
}
modelJob.Parameters = params
modelJob.PipelineSpecManifest = manifest
modelJob.PipelineSpec.RuntimeConfig.Parameters = params
modelJob.PipelineSpec.RuntimeConfig.PipelineRoot = job.GetPipelineSpec().GetRuntimeConfig().GetPipelineRoot()
return modelJob, nil

} else {
Expand Down Expand Up @@ -253,34 +255,9 @@ func runtimeConfigToModelParameters(runtimeConfig *api.PipelineSpec_RuntimeConfi
if runtimeConfig == nil {
return "", nil
}
var params util.SpecParameters
for k, v := range runtimeConfig.GetParameters() {
param := util.SpecParameter{
Name: k,
}
switch t := v.GetKind().(type) {
case *structpb.Value_StringValue:
param.Value = util.AnyStringPtr(v.GetStringValue())
case *structpb.Value_NumberValue:
param.Value = util.AnyStringPtr(v.GetNumberValue())
case *structpb.Value_BoolValue:
param.Value = util.AnyStringPtr(v.GetBoolValue())
// TODO: revisit if these are right
case *structpb.Value_StructValue:
param.Value = util.AnyStringPtr(v.GetStructValue())
case *structpb.Value_ListValue:
param.Value = util.AnyStringPtr(v.GetListValue())
case *structpb.Value_NullValue:
return "", fmt.Errorf("Unsupported property type in pipelineSpec runtimeConfig Parameters: %T", t)
default:
return "", fmt.Errorf("Unknown property type in pipelineSpec runtimeConfig Parameters: %T", t)
}

params = append(params, param)
}
paramsBytes, err := util.MarshalParameters(util.ArgoWorkflow, params)
paramsBytes, err := json.Marshal(runtimeConfig.GetParameters())
if err != nil {
return "", util.NewInternalServerError(err, "Failed to stream API parameter as string.")
return "", util.NewInternalServerError(err, "Failed to marshal RuntimeConfig API parameters as string.")
}
return string(paramsBytes), nil
}
Expand Down
31 changes: 26 additions & 5 deletions backend/src/apiserver/resource/model_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ func TestToModelRunDetail(t *testing.T) {
store, manager, experiment := initWithExperiment(t)
defer store.Close()

listParams := []interface{}{1, 2, 3}
v2RuntimeListParams, _ := structpb.NewList(listParams)

structParams := map[string]interface{}{"structParam1": "hello", "structParam2": 32}
v2RuntimeStructParams, _ := structpb.NewStruct(structParams)

// Test all parameters types converted to model.RuntimeConfig.Parameters, which is string type
v2RuntimeParams := map[string]*structpb.Value{
"param2": &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "world"}},
"param3": &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: true}},
"param4": &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: v2RuntimeListParams}},
"param5": &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: 12}},
"param6": &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: v2RuntimeStructParams}},
}

tests := []struct {
name string
apiRun *api.Run
Expand Down Expand Up @@ -217,9 +232,10 @@ func TestToModelRunDetail(t *testing.T) {
Id: "run1",
Name: "name1",
Description: "this is a run",
PipelineSpec: &api.PipelineSpec{RuntimeConfig: &api.PipelineSpec_RuntimeConfig{Parameters: map[string]*structpb.Value{
"param2": &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "world"}},
}}},
PipelineSpec: &api.PipelineSpec{
RuntimeConfig: &api.PipelineSpec_RuntimeConfig{
Parameters: v2RuntimeParams,
}},
ResourceReferences: []*api.ResourceReference{
{
Key: &api.ResourceKey{Type: api.ResourceType_EXPERIMENT, Id: experiment.UUID},
Expand All @@ -241,7 +257,10 @@ func TestToModelRunDetail(t *testing.T) {
Description: "this is a run",
PipelineSpec: model.PipelineSpec{
PipelineSpecManifest: "pipeline spec",
Parameters: `[{"name":"param2","value":"world"}]`,
RuntimeConfig: model.RuntimeConfig{
// Note: for some versions of structpb.Value.MarshalJSON(), there is a trailing space after array items or struct items
Parameters: "{\"param2\":\"world\",\"param3\":true,\"param4\":[1,2,3],\"param5\":12,\"param6\":{\"structParam1\":\"hello\",\"structParam2\":32}}",
},
},
ResourceReferences: []*model.ResourceReference{
{
Expand Down Expand Up @@ -385,7 +404,9 @@ func TestToModelJob(t *testing.T) {
PipelineId: pipeline.UUID,
PipelineName: pipeline.Name,
PipelineSpecManifest: "pipeline spec",
Parameters: `[{"name":"param2","value":"world"}]`,
RuntimeConfig: model.RuntimeConfig{
Parameters: "{\"param2\":\"world\"}",
},
},
ResourceReferences: []*model.ResourceReference{
{
Expand Down
29 changes: 29 additions & 0 deletions backend/src/apiserver/server/api_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
package server

import (
"encoding/json"
"fmt"

"github.com/golang/protobuf/ptypes/timestamp"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/common/util"
"google.golang.org/protobuf/types/known/structpb"
)

func ToApiExperiment(experiment *model.Experiment) *api.Experiment {
Expand Down Expand Up @@ -148,14 +152,38 @@ func toApiParameters(paramsString string) ([]*api.Parameter, error) {
return apiParams, nil
}

func toApiRuntimeConfig(modelRuntime model.RuntimeConfig) (*api.PipelineSpec_RuntimeConfig, error) {
var runtimeParams map[string]*structpb.Value
if modelRuntime.Parameters != "" {
err := json.Unmarshal([]byte(modelRuntime.Parameters), &runtimeParams)
if err != nil {
return nil, util.NewInternalServerError(err, fmt.Sprintf("Cannot unmarshal RuntimeConfig Parameter to map[string]*structpb.Value, string value: %+v", modelRuntime.Parameters))
}
}
apiRuntimeConfig := &api.PipelineSpec_RuntimeConfig{
Parameters: runtimeParams,
PipelineRoot: modelRuntime.PipelineRoot,
}
return apiRuntimeConfig, nil
}

func toApiRun(run *model.Run) *api.Run {
// v1 parameters
params, err := toApiParameters(run.Parameters)
if err != nil {
return &api.Run{
Id: run.UUID,
Error: err.Error(),
}
}
// v2 RuntimeConfig
runtimeConfig, err := toApiRuntimeConfig(run.PipelineSpec.RuntimeConfig)
if err != nil {
return &api.Run{
Id: run.UUID,
Error: err.Error(),
}
}
var metrics []*api.RunMetric
if run.Metrics != nil {
for _, metric := range run.Metrics {
Expand All @@ -179,6 +207,7 @@ func toApiRun(run *model.Run) *api.Run {
WorkflowManifest: run.WorkflowSpecManifest,
PipelineManifest: run.PipelineSpecManifest,
Parameters: params,
RuntimeConfig: runtimeConfig,
},
ResourceReferences: toApiResourceReferences(run.ResourceReferences),
}
Expand Down
Loading

0 comments on commit d852e8b

Please sign in to comment.