diff --git a/tekton-catalog/cache/README.md b/tekton-catalog/cache/README.md new file mode 100644 index 0000000000..fbcfe3d3d4 --- /dev/null +++ b/tekton-catalog/cache/README.md @@ -0,0 +1,41 @@ +# Cache: Reuse the results from previous execution for custom tasks. + +### How To + +1. Setup. +```go + +import ( + "fmt" + "time" + + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db" + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model" +) + +taskCacheStore := TaskCacheStore{Params: db.ConnectionParams{DbDriver: "sqlite3", DbName: "example.db"}} + err := taskCacheStore.Connect() + // Currently, mysql and sqlite3 are supported driver. +``` + +2. Store an entry to cache. +```go + taskCache := &model.TaskCache{ + TaskHashKey: cacheKey, + TaskOutput: cacheOutput, + } + taskCacheStore.Put(taskCache) +``` + +3. Fetch an entry from cache. +```go + cacheResult, err := taskCacheStore.Get(taskCache.TaskHashKey) + if err != nil { + fmt.Printf("%v", err) + } + +``` +4. Prune entries older than a day using: +```go + taskCacheStore.PruneOlderThan(time.Now().Add(-24*time.Hour)) +``` \ No newline at end of file diff --git a/tekton-catalog/cache/go.mod b/tekton-catalog/cache/go.mod new file mode 100644 index 0000000000..8728d1c25d --- /dev/null +++ b/tekton-catalog/cache/go.mod @@ -0,0 +1,10 @@ +module github.com/kubeflow/kfp-tekton/tekton-catalog/cache + +go 1.13 + +require ( + github.com/cenkalti/backoff v2.2.1+incompatible + github.com/go-sql-driver/mysql v1.6.0 + github.com/jinzhu/gorm v1.9.16 + github.com/mattn/go-sqlite3 v1.14.0 +) diff --git a/tekton-catalog/cache/pkg/db/db_conn_manager.go b/tekton-catalog/cache/pkg/db/db_conn_manager.go new file mode 100644 index 0000000000..e72610d882 --- /dev/null +++ b/tekton-catalog/cache/pkg/db/db_conn_manager.go @@ -0,0 +1,67 @@ +// Copyright 2022 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "fmt" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/jinzhu/gorm" + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model" +) + +type ConnectionParams struct { + DbDriver string + DbHost string + DbPort string + DbName string + DbUser string + DbPwd string + DbGroupConcatMaxLen string + DbExtraParams string + Timeout time.Duration +} + +func InitDBClient(params ConnectionParams, initConnectionTimeout time.Duration) (*gorm.DB, error) { + driverName := params.DbDriver + var arg string + var err error + + switch driverName { + case mysqlDBDriverDefault: + arg, err = initMysql(params, initConnectionTimeout) + if err != nil { + return nil, err + } + case sqliteDriverDefault: + arg = initSqlite(params.DbName) + default: + return nil, fmt.Errorf("driver %v is not supported", driverName) + } + + // db is safe for concurrent use by multiple goroutines + // and maintains its own pool of idle connections. + db, err := gorm.Open(driverName, arg) + if err != nil { + return nil, err + } + // Create table + response := db.AutoMigrate(&model.TaskCache{}) + if response.Error != nil { + return nil, fmt.Errorf("failed to initialize the databases: Error: %w", response.Error) + } + return db, nil +} diff --git a/tekton-catalog/cache/pkg/db/mysql.go b/tekton-catalog/cache/pkg/db/mysql.go new file mode 100644 index 0000000000..f1609e6f08 --- /dev/null +++ b/tekton-catalog/cache/pkg/db/mysql.go @@ -0,0 +1,139 @@ +// Copyright 2022 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/cenkalti/backoff" + + "github.com/go-sql-driver/mysql" +) + +const ( + mysqlDBDriverDefault = "mysql" + mysqlDBHostDefault = "mysql.kubeflow.svc.cluster.local" + mysqlDBPortDefault = "3306" + mysqlDBGroupConcatMaxLenDefault = "4194304" + DefaultConnectionTimeout = time.Minute * 6 +) + +func setDefault(field *string, defaultVal string) { + if *field == "" { + *field = defaultVal + } +} + +func (params *ConnectionParams) LoadMySQLDefaults() { + setDefault(¶ms.DbDriver, mysqlDBDriverDefault) + setDefault(¶ms.DbHost, mysqlDBHostDefault) + setDefault(¶ms.DbPort, mysqlDBPortDefault) + setDefault(¶ms.DbName, "cachedb") + setDefault(¶ms.DbUser, "root") + setDefault(¶ms.DbPwd, "") + setDefault(¶ms.DbGroupConcatMaxLen, mysqlDBGroupConcatMaxLenDefault) + if params.Timeout == 0 { + params.Timeout = DefaultConnectionTimeout + } +} + +func initMysql(params ConnectionParams, initConnectionTimeout time.Duration) (string, error) { + var mysqlExtraParams = map[string]string{} + data := []byte(params.DbExtraParams) + _ = json.Unmarshal(data, &mysqlExtraParams) + mysqlConfig := CreateMySQLConfig( + params.DbUser, + params.DbPwd, + params.DbHost, + params.DbPort, + "", + params.DbGroupConcatMaxLen, + mysqlExtraParams, + ) + + var db *sql.DB + var err error + var operation = func() error { + db, err = sql.Open(params.DbDriver, mysqlConfig.FormatDSN()) + if err != nil { + return err + } + return nil + } + b := backoff.NewExponentialBackOff() + b.MaxElapsedTime = initConnectionTimeout + err = backoff.Retry(operation, b) + if err != nil { + return "", err + } + defer db.Close() + + // Create database if not exist + dbName := params.DbName + operation = func() error { + _, err = db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName)) + if err != nil { + return err + } + return nil + } + b = backoff.NewExponentialBackOff() + b.MaxElapsedTime = initConnectionTimeout + err = backoff.Retry(operation, b) + + operation = func() error { + _, err = db.Exec(fmt.Sprintf("USE %s", dbName)) + if err != nil { + return err + } + return nil + } + b = backoff.NewExponentialBackOff() + b.MaxElapsedTime = initConnectionTimeout + err = backoff.Retry(operation, b) + + mysqlConfig.DBName = dbName + // Config reference: https://github.com/go-sql-driver/mysql#clientfoundrows + mysqlConfig.ClientFoundRows = true + return mysqlConfig.FormatDSN(), nil +} + +func CreateMySQLConfig(user, password, mysqlServiceHost, mysqlServicePort, dbName, mysqlGroupConcatMaxLen string, + mysqlExtraParams map[string]string) *mysql.Config { + + params := map[string]string{ + "charset": "utf8", + "parseTime": "True", + "loc": "Local", + "group_concat_max_len": mysqlGroupConcatMaxLen, + } + + for k, v := range mysqlExtraParams { + params[k] = v + } + + return &mysql.Config{ + User: user, + Passwd: password, + Net: "tcp", + Addr: fmt.Sprintf("%s:%s", mysqlServiceHost, mysqlServicePort), + Params: params, + DBName: dbName, + AllowNativePasswords: true, + } +} diff --git a/tekton-catalog/cache/pkg/db/sqlite.go b/tekton-catalog/cache/pkg/db/sqlite.go new file mode 100644 index 0000000000..5c2e84a33a --- /dev/null +++ b/tekton-catalog/cache/pkg/db/sqlite.go @@ -0,0 +1,29 @@ +// Copyright 2022 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +const sqliteDriverDefault = "sqlite3" + +func (params *ConnectionParams) LoadSqliteDefaults() { + setDefault(¶ms.DbDriver, sqliteDriverDefault) + setDefault(¶ms.DbName, ":memory:") +} + +func initSqlite(dbName string) string { + if dbName == "" { + dbName = ":memory:" // default db. + } + return dbName +} diff --git a/tekton-catalog/cache/pkg/model/task_cache.go b/tekton-catalog/cache/pkg/model/task_cache.go new file mode 100644 index 0000000000..4e7ff5417c --- /dev/null +++ b/tekton-catalog/cache/pkg/model/task_cache.go @@ -0,0 +1,26 @@ +// Copyright 2022 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "time" +) + +type TaskCache struct { + ID int64 `gorm:"column:ID; not null; primary_key; AUTO_INCREMENT"` + TaskHashKey string `gorm:"column:TaskHashKey; not null; index:idx_cache_key"` + TaskOutput string `gorm:"column:TaskOutput; type:longtext; not null"` + CreatedAt time.Time `gorm:"column:CreatedAt; autoCreateTime:nano; not null"` +} diff --git a/tekton-catalog/cache/pkg/task_cache_store.go b/tekton-catalog/cache/pkg/task_cache_store.go new file mode 100644 index 0000000000..6cb2fbaac6 --- /dev/null +++ b/tekton-catalog/cache/pkg/task_cache_store.go @@ -0,0 +1,91 @@ +// Copyright 2022 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "fmt" + "time" + + "github.com/jinzhu/gorm" + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db" + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model" +) + + +type TaskCacheStore struct { + db *gorm.DB + Disabled bool + Params db.ConnectionParams +} + +func (t *TaskCacheStore) Connect() error { + if t.db != nil || t.Disabled { + return nil + } + var err error + t.db, err = db.InitDBClient(t.Params, t.Params.Timeout) + return err +} + +func (t *TaskCacheStore) Get(taskHashKey string) (*model.TaskCache, error) { + if t.Disabled || t.db == nil { + return nil, nil + } + entry := &model.TaskCache{} + d := t.db.Table("task_caches").Where("TaskHashKey = ?", taskHashKey). + Order("CreatedAt DESC").First(entry) + if d.Error != nil { + return nil, fmt.Errorf("failed to get entry from cache: %q. Error: %v", taskHashKey, d.Error) + } + return entry, nil +} + +func (t *TaskCacheStore) Put(entry *model.TaskCache) (*model.TaskCache, error) { + if t.Disabled || t.db == nil { + return nil, nil + } + ok := t.db.NewRecord(entry) + if !ok { + return nil, fmt.Errorf("failed to create a new cache entry, %#v, Error: %v", entry, t.db.Error) + } + rowInsert := &model.TaskCache{} + d := t.db.Create(entry).Scan(rowInsert) + if d.Error != nil { + return nil, d.Error + } + return rowInsert, nil +} + +func (t *TaskCacheStore) Delete(id string) error { + if t.Disabled || t.db == nil { + return nil + } + d := t.db.Delete(&model.TaskCache{}, "ID = ?", id) + if d.Error != nil { + return d.Error + } + return nil +} + +func (t *TaskCacheStore) PruneOlderThan(timestamp time.Time) error { + if t.Disabled || t.db == nil { + return nil + } + d := t.db.Delete(&model.TaskCache{}, "CreatedAt <= ?", timestamp) + if d.Error != nil { + return d.Error + } + return nil +} diff --git a/tekton-catalog/cache/pkg/task_cache_store_test.go b/tekton-catalog/cache/pkg/task_cache_store_test.go new file mode 100644 index 0000000000..6c9f31d79e --- /dev/null +++ b/tekton-catalog/cache/pkg/task_cache_store_test.go @@ -0,0 +1,167 @@ +// Copyright 2022 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db" + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model" + _ "github.com/mattn/go-sqlite3" +) + +func newTestingCacheStore(disabled bool) (*TaskCacheStore, error) { + t := TaskCacheStore{ + Disabled: disabled, + Params: db.ConnectionParams{DbDriver: "sqlite3", DbName: ":memory:"}, + } + err := t.Connect() + return &t, err +} + +func createTaskCache(cacheKey string, cacheOutput string) *model.TaskCache { + return &model.TaskCache{ + TaskHashKey: cacheKey, + TaskOutput: cacheOutput, + } +} + +func TestPut(t *testing.T) { + taskCacheStore, err := newTestingCacheStore(false) + if err != nil { + t.Fatal(err) + } + entry := createTaskCache("x", "y") + taskCache, err := taskCacheStore.Put(entry) + if err != nil { + t.Fatal(err) + } + if taskCache.TaskHashKey != entry.TaskHashKey { + t.Errorf("Mismatached key. Expected %s Found: %s", entry.TaskHashKey, + taskCache.TaskHashKey) + } + if taskCache.TaskOutput != entry.TaskOutput { + t.Errorf("Mismatached output. Expected : %s Found: %s", + entry.TaskOutput, + taskCache.TaskOutput) + } +} + +func TestGet(t *testing.T) { + taskCacheStore, err := newTestingCacheStore(false) + if err != nil { + t.Fatal(err) + } + entry := createTaskCache("x", "y") + taskCache, err := taskCacheStore.Put(entry) + if err != nil { + t.Fatal(err) + } + cacheResult, err := taskCacheStore.Get(taskCache.TaskHashKey) + if err != nil { + t.Error(err) + } + if cacheResult.TaskHashKey != entry.TaskHashKey { + t.Errorf("Mismatached key. Expected %s Found: %s", entry.TaskHashKey, + cacheResult.TaskHashKey) + } + if cacheResult.TaskOutput != entry.TaskOutput { + t.Errorf("Mismatached output. Expected : %s Found: %s", + entry.TaskOutput, + cacheResult.TaskOutput) + } +} + +// Get should get the latest entry each time. +func TestGetLatest(t *testing.T) { + taskCacheStore, err := newTestingCacheStore(false) + if err != nil { + t.Fatal(err) + } + for i := 1; i < 10; i++ { + entry := createTaskCache("x", fmt.Sprintf("y%d", i)) + taskCache, err := taskCacheStore.Put(entry) + if err != nil { + t.Fatal(err) + } + cacheResult, err := taskCacheStore.Get(taskCache.TaskHashKey) + if err != nil { + t.Error(err) + } + if cacheResult.TaskHashKey != entry.TaskHashKey { + t.Errorf("Mismatached key. Expected %s Found: %s", entry.TaskHashKey, + cacheResult.TaskHashKey) + } + if cacheResult.TaskOutput != entry.TaskOutput { + t.Errorf("Mismatached output. Expected : %s Found: %s", + entry.TaskOutput, + cacheResult.TaskOutput) + } + } +} + +func TestDisabledCache(t *testing.T) { + taskCacheStore, err := newTestingCacheStore(true) + if err != nil { + t.Fatal(err) + } + taskCache, err := taskCacheStore.Get("random") + if err != nil { + t.Errorf("a disabled cache returned non nil error: %w", err) + } + if taskCache != nil { + t.Errorf("a disabled cache should return nil") + } +} + +func TestPruneOlderThan(t *testing.T) { + taskCacheStore, err := newTestingCacheStore(false) + if err != nil { + t.Fatal(err) + } + hashKey := "cacheKey" + for i := 1; i < 10000000; i *= 100 { + t1 := &model.TaskCache{ + TaskHashKey: hashKey, + TaskOutput: "cacheOutput", + CreatedAt: time.UnixMicro(int64(i * 100)), + } + _, err = taskCacheStore.Put(t1) + if err != nil { + t.Fatal(err) + } + } + taskCache, err := taskCacheStore.Get(hashKey) + if err != nil { + t.Error(err) + } + if taskCache == nil { + t.Error("TaskCache should be not nil.") + } + err = taskCacheStore.PruneOlderThan(time.UnixMicro(100000000)) + if err != nil { + t.Fatal(err) + } + taskCache, err = taskCacheStore.Get(hashKey) + if err == nil { + t.Errorf("Expected error to be not nil") + } + if !strings.HasPrefix(err.Error(), "failed to get entry from cache") { + t.Error("Should fail with entry not found in cache.") + } +} diff --git a/tekton-catalog/pipeline-loops/examples/loop-example-basic.yaml b/tekton-catalog/pipeline-loops/examples/loop-example-basic.yaml index a56f221e14..955448c42d 100644 --- a/tekton-catalog/pipeline-loops/examples/loop-example-basic.yaml +++ b/tekton-catalog/pipeline-loops/examples/loop-example-basic.yaml @@ -32,7 +32,7 @@ metadata: tekton.dev/example-loop-pipeline: '{"spec":{"pipelineSpec":{"params":[{"name":"message","type":"string"}],"tasks":[{"name":"echo-loop-task","params":[{"name":"message","value":"$(params.message)"}],"taskSpec":{"params":[{"name":"message","type":"string"}],"steps":[{"name":"echo","image":"ubuntu","imagePullPolicy":"IfNotPresent","script":"#!/usr/bin/env bash\necho \"$(params.message)\"\n"}]}}]},"iterateParam":"message"}}' name: pr-loop-example labels: - mylooplabels: mylooplabels + pipelines.kubeflow.org/cache_enabled: true spec: pipelineSpec: tasks: diff --git a/tekton-catalog/pipeline-loops/go.mod b/tekton-catalog/pipeline-loops/go.mod index 337c5b1f67..e754391d56 100644 --- a/tekton-catalog/pipeline-loops/go.mod +++ b/tekton-catalog/pipeline-loops/go.mod @@ -5,7 +5,9 @@ go 1.13 require ( github.com/google/go-cmp v0.5.6 github.com/hashicorp/go-multierror v1.1.1 + github.com/kubeflow/kfp-tekton/tekton-catalog/cache v0.0.0 github.com/kubeflow/kfp-tekton/tekton-catalog/objectstorelogger v0.0.0 + github.com/mattn/go-sqlite3 v1.14.0 github.com/tektoncd/pipeline v0.30.0 go.uber.org/zap v1.19.1 gomodules.xyz/jsonpatch/v2 v2.2.0 @@ -16,5 +18,6 @@ require ( ) replace ( + github.com/kubeflow/kfp-tekton/tekton-catalog/cache => ../cache/ github.com/kubeflow/kfp-tekton/tekton-catalog/objectstorelogger => ../objectstorelogger/ ) diff --git a/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1/pipelineloop_types.go b/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1/pipelineloop_types.go index 38644d99d7..2fb8ec1e80 100644 --- a/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1/pipelineloop_types.go +++ b/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1/pipelineloop_types.go @@ -91,6 +91,9 @@ const ( // PipelineLoopRunReasonStarted is the reason set when the Run has just started PipelineLoopRunReasonStarted PipelineLoopRunReason = "Started" + // PipelineLoopRunReasonCacheHit indicates that the Run result was fetched from cache instead of performing an actual run. + PipelineLoopRunReasonCacheHit PipelineLoopRunReason = "CacheHit" + // PipelineLoopRunReasonRunning indicates that the Run is in progress PipelineLoopRunReasonRunning PipelineLoopRunReason = "Running" diff --git a/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1/zz_generated.deepcopy.go b/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1/zz_generated.deepcopy.go index 86bd52f970..ab715c6c7e 100644 --- a/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1/zz_generated.deepcopy.go +++ b/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated /* diff --git a/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/controller.go b/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/controller.go index e6bb24f10f..0e732ca9ea 100644 --- a/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/controller.go +++ b/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/controller.go @@ -22,8 +22,11 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" + taskCache "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg" + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db" cl "github.com/kubeflow/kfp-tekton/tekton-catalog/objectstorelogger/pkg/objectstorelogger" "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop" pipelineloopv1alpha1 "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1" @@ -66,6 +69,8 @@ func load(ctx context.Context, kubeClientSet kubernetes.Interface, o *cl.ObjectS return nil } +var params db.ConnectionParams + // NewController instantiates a new controller.Impl from knative.dev/pkg/controller func NewController(namespace string) func(context.Context, configmap.Watcher) *controller.Impl { return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { @@ -76,7 +81,18 @@ func NewController(namespace string) func(context.Context, configmap.Watcher) *c runInformer := runinformer.Get(ctx) pipelineLoopInformer := pipelineloopinformer.Get(ctx) pipelineRunInformer := pipelineruninformer.Get(ctx) - + params.LoadMySQLDefaults() + cacheStore := &taskCache.TaskCacheStore{Params: params} + if strings.EqualFold(os.Getenv("CACHE_STORE_DISABLED"), "true") { + cacheStore.Disabled = true + } + if !cacheStore.Disabled { + err := cacheStore.Connect() + if err != nil { + cacheStore.Disabled = true + logger.Errorf("Failed to connect to cache store backed, cache store disabled. err: %v", err) + } + } c := &Reconciler{ KubeClientSet: kubeclientset, pipelineClientSet: pipelineclientset, @@ -84,6 +100,7 @@ func NewController(namespace string) func(context.Context, configmap.Watcher) *c runLister: runInformer.Lister(), pipelineLoopLister: pipelineLoopInformer.Lister(), pipelineRunLister: pipelineRunInformer.Lister(), + cacheStore: cacheStore, } loggerConfig := cl.ObjectStoreLogConfig{} objectStoreLogger := cl.Logger{ diff --git a/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun.go b/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun.go index 53b9daf3aa..3bbbb4bbdf 100644 --- a/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun.go +++ b/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun.go @@ -18,6 +18,7 @@ package pipelinelooprun import ( "context" + "crypto/md5" "encoding/json" "fmt" "log" @@ -30,7 +31,8 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "github.com/hashicorp/go-multierror" - + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg" + "github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model" "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop" pipelineloopv1alpha1 "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1" pipelineloopclientset "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/client/clientset/versioned" @@ -93,6 +95,7 @@ type Reconciler struct { runLister listersalpha.RunLister pipelineLoopLister listerspipelineloop.PipelineLoopLister pipelineRunLister listers.PipelineRunLister + cacheStore *cache.TaskCacheStore } var ( @@ -114,6 +117,10 @@ func init() { } } +func isCachingEnabled(run *v1alpha1.Run) bool { + return run.ObjectMeta.Labels["pipelines.kubeflow.org/cache_enabled"] == "true" +} + // ReconcileKind compares the actual state with the desired, and attempts to converge the two. // It then updates the Status block of the Run resource with the current status of the resource. func (c *Reconciler) ReconcileKind(ctx context.Context, run *v1alpha1.Run) pkgreconciler.Event { @@ -122,6 +129,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, run *v1alpha1.Run) pkgre logger.Infof("Reconciling Run %s/%s at %v", run.Namespace, run.Name, time.Now()) if run.Spec.Ref != nil && run.Spec.Spec != nil { logger.Errorf("Run %s/%s can provide one of Run.Spec.Ref/Run.Spec.Spec", run.Namespace, run.Name) + return nil } if run.Spec.Spec == nil && run.Spec.Ref == nil { logger.Errorf("Run %s/%s does not provide a spec or ref.", run.Namespace, run.Name) @@ -161,11 +169,6 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, run *v1alpha1.Run) pkgre events.Emit(ctx, nil, afterCondition, run) } - if run.IsDone() { - logger.Infof("Run %s/%s is done", run.Namespace, run.Name) - return nil - } - // Store the condition before reconcile beforeCondition := run.Status.GetCondition(apis.ConditionSucceeded) @@ -176,6 +179,26 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, run *v1alpha1.Run) pkgre logger.Errorf("DecodeExtraFields error: %v", err.Error()) } + if run.IsDone() { + if run.IsSuccessful() && !c.cacheStore.Disabled && isCachingEnabled(run) { + marshal, err := json.Marshal(status.PipelineLoopSpec) + if err == nil { + hashSum := fmt.Sprintf("%x", md5.Sum(marshal)) + resultBytes, err := json.Marshal(run.Status.Results) + _, err = c.cacheStore.Put(&model.TaskCache{ + TaskHashKey: hashSum, + TaskOutput: string(resultBytes), + }) + if err != nil { + logger.Errorf("Error while adding result to cache for run: %s, Error: %v", run.Name, err) + return fmt.Errorf("error while adding result to cache for run: %s, %w", run.Name, err) + } + logger.Infof("cached the results of successful run %s, with key: %s", run.Name, hashSum) + } + } + logger.Infof("Run %s/%s is done", run.Namespace, run.Name) + return nil + } // Reconcile the Run if err := c.reconcile(ctx, run, status); err != nil { logger.Errorf("Reconcile error: %v", err.Error()) @@ -285,7 +308,7 @@ func (c *Reconciler) setMaxNestedStackDepth(ctx context.Context, pipelineLoopSpe func (c *Reconciler) reconcile(ctx context.Context, run *v1alpha1.Run, status *pipelineloopv1alpha1.PipelineLoopRunStatus) error { ctx = EnableCustomTaskFeatureFlag(ctx) logger := logging.FromContext(ctx) - + var hashSum string // Get the PipelineLoop referenced by the Run pipelineLoopMeta, pipelineLoopSpec, err := c.getPipelineLoop(ctx, run) if err != nil { @@ -305,7 +328,26 @@ func (c *Reconciler) reconcile(ctx context.Context, run *v1alpha1.Run, status *p pipelineLoopMeta.Namespace, pipelineLoopMeta.Name, err) return nil } - + if !c.cacheStore.Disabled && isCachingEnabled(run) { + marshal, err := json.Marshal(pipelineLoopSpec) + if marshal != nil && err == nil { + hashSum = fmt.Sprintf("%x", md5.Sum(marshal)) + taskCache, err := c.cacheStore.Get(hashSum) + if err == nil && taskCache != nil { + logger.Infof("Found a cached entry, for run: %s, with key:", run.Name, hashSum) + err := json.Unmarshal([]byte(taskCache.TaskOutput), &run.Status.Results) + if err != nil { + logger.Errorf("error while unmarshal of task output. %v", err) + } + run.Status.MarkRunSucceeded(pipelineloopv1alpha1.PipelineLoopRunReasonCacheHit.String(), + "A cached result of the previous run was found.") + return nil + } + } + if err != nil { + logger.Warnf("failed marshalling the spec, for pipelineloop: %s", pipelineLoopMeta.Name) + } + } // Determine how many iterations of the Task will be done. totalIterations, iterationElements, err := computeIterations(run, pipelineLoopSpec) if err != nil { diff --git a/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun_test.go b/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun_test.go index 58dc99af50..a750a6c76f 100644 --- a/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun_test.go +++ b/tekton-catalog/pipeline-loops/pkg/reconciler/pipelinelooprun/pipelinelooprun_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "os" "sort" "strings" "testing" @@ -32,6 +33,7 @@ import ( fakeclient "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/client/injection/client/fake" fakepipelineloopinformer "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/client/injection/informers/pipelineloop/v1alpha1/pipelineloop/fake" "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/test" + _ "github.com/mattn/go-sqlite3" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing" @@ -57,6 +59,13 @@ var ( trueB = true ) +func init() { + tmp := os.TempDir() + params.DbDriver = "sqlite3" + params.DbName = tmp + "/testing.db" + params.Timeout = 2 * time.Second +} + func getRunName(run *v1alpha1.Run) string { return strings.Join([]string{run.Namespace, run.Name}, "/") } @@ -68,6 +77,13 @@ func loopRunning(run *v1alpha1.Run) *v1alpha1.Run { return runWithStatus } +func loopSucceeded(run *v1alpha1.Run) *v1alpha1.Run { + runWithStatus := run.DeepCopy() + runWithStatus.Status.InitializeConditions() + runWithStatus.Status.MarkRunSucceeded(pipelineloopv1alpha1.PipelineLoopRunReasonSucceeded.String(), "") + return runWithStatus +} + func successful(pr *v1beta1.PipelineRun) *v1beta1.PipelineRun { prWithStatus := pr.DeepCopy() prWithStatus.Status.SetCondition(&apis.Condition{ @@ -114,11 +130,6 @@ func setDeleted(pr *v1beta1.PipelineRun) *v1beta1.PipelineRun { return pr } -func setPrName(pr *v1beta1.PipelineRun, name string) *v1beta1.PipelineRun { - pr.Name = name - return pr -} - // getPipelineLoopController returns an instance of the PipelineLoop controller/reconciler that has been seeded with // d, where d represents the state of the system (existing resources) needed for the test. func getPipelineLoopController(t *testing.T, d test.Data, pipelineloops []*pipelineloopv1alpha1.PipelineLoop) (test.Assets, func()) { @@ -1686,3 +1697,85 @@ func TestReconcilePipelineLoopRunFailures(t *testing.T) { }) } } + +func enableCacheForRun(run *v1alpha1.Run) *v1alpha1.Run { + run.ObjectMeta.Labels["pipelines.kubeflow.org/cache_enabled"] = "true" + return run +} + +func enableCacheForPr(pr *v1beta1.PipelineRun) *v1beta1.PipelineRun { + pr.ObjectMeta.Labels["pipelines.kubeflow.org/cache_enabled"] = "true" + return pr +} + +func TestReconcilePipelineLoopRunCachedRun(t *testing.T) { + testcases := []struct { + name string + pipeline *v1beta1.Pipeline + pipelineloop *pipelineloopv1alpha1.PipelineLoop + run *v1alpha1.Run + pipelineruns []*v1beta1.PipelineRun + expectedStatus corev1.ConditionStatus + expectedReason pipelineloopv1alpha1.PipelineLoopRunReason + expectedEvents []string + }{{ + name: "Reconcile a run successfully", + pipeline: aPipeline, + pipelineloop: aPipelineLoop, + run: enableCacheForRun(loopSucceeded(runPipelineLoop)), + pipelineruns: []*v1beta1.PipelineRun{successful(enableCacheForPr(expectedPipelineRunIteration1)), successful(enableCacheForPr(expectedPipelineRunIteration2))}, + expectedStatus: corev1.ConditionTrue, + expectedReason: pipelineloopv1alpha1.PipelineLoopRunReasonSucceeded, + expectedEvents: []string{}, + }, { + name: "Test fetch from cache for previously successful Run.", + pipeline: aPipeline, + pipelineloop: aPipelineLoop, + run: enableCacheForRun(runPipelineLoop), + expectedStatus: corev1.ConditionTrue, + expectedReason: pipelineloopv1alpha1.PipelineLoopRunReasonCacheHit, + expectedEvents: []string{"Normal Started ", "Normal Succeeded A cached result of the previous run was found."}, + }} + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + names.TestingSeed() + optionalPipeline := []*v1beta1.Pipeline{tc.pipeline} + status := &pipelineloopv1alpha1.PipelineLoopRunStatus{} + tc.pipelineloop.Spec.SetDefaults(ctx) + status.PipelineLoopSpec = &tc.pipelineloop.Spec + err := tc.run.Status.EncodeExtraFields(status) + if err != nil { + t.Fatal("Failed to encode spec in the pipelineSpec:", err) + } + if tc.pipeline == nil { + optionalPipeline = nil + } + + d := test.Data{ + Runs: []*v1alpha1.Run{tc.run}, + Pipelines: optionalPipeline, + PipelineRuns: tc.pipelineruns, + } + + testAssets, _ := getPipelineLoopController(t, d, []*pipelineloopv1alpha1.PipelineLoop{tc.pipelineloop}) + c := testAssets.Controller + clients := testAssets.Clients + + if err := c.Reconciler.Reconcile(ctx, getRunName(tc.run)); err != nil { + t.Fatalf("Error reconciling: %s", err) + } + // Fetch the updated Run + reconciledRun, err := clients.Pipeline.TektonV1alpha1().Runs(tc.run.Namespace).Get(ctx, tc.run.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Error getting reconciled run from fake client: %s", err) + } + // Verify that the Run has the expected status and reason. + checkRunCondition(t, reconciledRun, tc.expectedStatus, tc.expectedReason) + // Verify expected events were created. + if err := checkEvents(testAssets.Recorder, tc.name, tc.expectedEvents); err != nil { + t.Errorf(err.Error()) + } + }) + } +}