Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added namespace as cache scope #6342

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/api/go_client/task.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/api/swagger/task.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/api/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ message Task {
// Output. Unique task ID. Generated by API server.
string id = 1;

// Optional input field. The Namespace to which this pipeline task belongs.
// Required input field. The Namespace to which this pipeline task belongs.
string namespace = 2;

// Required input field. The PipelineName to which this pipeline task belongs.
Expand Down
14 changes: 14 additions & 0 deletions backend/src/apiserver/server/task_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"strings"
)

type TaskServer struct {
Expand Down Expand Up @@ -40,6 +41,19 @@ func (s *TaskServer) validateCreateTaskRequest(request *api.CreateTaskRequest) e
if task.GetPipelineName() == "" {
return errMustSpecify("PipelineName")
}
if task.GetNamespace() == "" {
return errMustSpecify("Namespace")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR will require a coordinated release with SDK, because 1.7.0 SDK launchers will no longer work.

I think we can go with this too, but it might be slightly better to keep namespace as optional, because usually people will not expect 1.7.1 SDK works with the backend, but 1.7.0 SDK doesn't. We can change it to be required after SDK 1.8.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it won't work with sdk? Launcher is the only caller of such apis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if strings.HasPrefix(task.GetPipelineName(), "namespace/") {
s := strings.Split(task.GetPipelineName(), "/")
if len(s) != 4 {
return util.NewInvalidInputError("invalid PipelineName for namespaced pipelines, need to follow 'namespace/${namespace}/pipeline/${pipelineName}': %s", task.GetPipelineName())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note, we don't have any restrictions on pipeline name so far, so it may contain "/".
Recommend using https://pkg.go.dev/strings#SplitN instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

namespace := s[1]
if namespace != task.GetNamespace() {
return util.NewInvalidInputError("the namespace %s extracted from pipelineName is not equal to the namespace %s in task", namespace, task.GetNamespace())
}
}
if task.GetRunId() == "" {
return errMustSpecify("RunId")
}
Expand Down
9 changes: 7 additions & 2 deletions v2/cacheutils/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func cacheDefaultEndpoint() string {
return defaultKfpApiEndpoint
}

func (c *Client) GetExecutionCache(fingerPrint, pipelineName string) (string, error) {
func (c *Client) GetExecutionCache(fingerPrint, pipelineName, namespace string) (string, error) {
fingerPrintPredicate := &api.Predicate{
Op: api.Predicate_EQUALS,
Key: "fingerprint",
Expand All @@ -155,7 +155,12 @@ func (c *Client) GetExecutionCache(fingerPrint, pipelineName string) (string, er
Key: "pipelineName",
Value: &api.Predicate_StringValue{StringValue: pipelineName},
}
filter := api.Filter{Predicates: []*api.Predicate{fingerPrintPredicate, pipelineNamePredicate}}
namespacePredicate := &api.Predicate{
Op: api.Predicate_EQUALS,
Key: "namespace",
Value: &api.Predicate_StringValue{StringValue: namespace},
}
filter := api.Filter{Predicates: []*api.Predicate{fingerPrintPredicate, pipelineNamePredicate, namespacePredicate}}

taskFilterJson, err := protojson.Marshal(&filter)
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion v2/component/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ func (o *LauncherOptions) validate() error {
if empty(o.MLMDServerPort) {
return err("MLMDServerPort")
}
if strings.HasPrefix(o.PipelineName, "namespace/") {
s := strings.Split(o.PipelineName, "/")
if len(s) != 4 {
return fmt.Errorf("invalid PipelineName options for namespaced pipelines, need to follow 'namespace/${namespace}/pipeline/${pipelineName}': %s", o.PipelineName)
}
namespace := s[1]
if namespace != o.Namespace {
return fmt.Errorf("the namespace %s extracted from pipelineName is not equal to the namespace %s in launcher options", namespace, o.Namespace)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For discussion: is it helpful adding the validation in launcher? I feel like launcher can just be unaware of the pipeline name format, if there's a problem. It'll be reported at task server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info like "PipelineName" and "Namespace" will get recorded in MLMD when cache is not enabled. I think launcher should also be aware of the validation.

return nil
}

Expand Down Expand Up @@ -230,7 +240,7 @@ func (l *Launcher) executeWithCacheEnabled(ctx context.Context, executorInput *p
return fmt.Errorf("failure while generating CacheKey: %w", err)
}
fingerPrint, err := cacheutils.GenerateFingerPrint(cacheKey)
cachedMLMDExecutionID, err := l.cacheClient.GetExecutionCache(fingerPrint, l.options.PipelineName)
cachedMLMDExecutionID, err := l.cacheClient.GetExecutionCache(fingerPrint, l.options.PipelineName, l.options.Namespace)
if err != nil {
return fmt.Errorf("failure while getting executionCache: %w", err)
}
Expand Down Expand Up @@ -399,6 +409,7 @@ func (l *Launcher) executeWithoutCacheHit(ctx context.Context, executorInput *pi
}
task := &api.Task{
PipelineName: l.options.PipelineName,
Namespace: l.options.Namespace,
RunId: l.options.RunID,
MlmdExecutionID: strconv.FormatInt(id, 10),
CreatedAt: &timestamp.Timestamp{Seconds: executedStartedTime},
Expand Down