Skip to content

Commit

Permalink
Extracted cache as a standalone package.
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrapCodes committed Feb 17, 2022
1 parent 80d0450 commit ebe62af
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 120 deletions.
32 changes: 32 additions & 0 deletions tekton-catalog/cache/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Cache: Reuse the results from previous execution for custom tasks.

### How To

1. Setup.
```go
taskCacheStore := TaskCacheStore{}
err := taskCacheStore.Connect(db.ConnectionParams{DbDriver: "sqlite3", DbName: ":memory:"})
// Currently, mysql and sqlite3 are supported driver.
```

2. Store an entry to cache.
```go
taskCache := &model.TaskCache{
TaskHashKey: cacheKey,
TaskOutput: cacheOutput,
}
taskCacheStore.Put(taskCache)
```

3. Fetch an entry from cache.
```go
cacheResult, err := taskCacheStore.Get(taskCache.TaskHashKey)
if err != nil {
fmt.Printf("%v", err)
}

```
4. Prune entries older than a day using:
```go
taskCacheStore.PruneOlderThan(time.Now().Add(-24*time.Hour))
```
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,15 @@
package db

import (
"flag"
"fmt"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
"go.uber.org/zap"

"github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/cache/model"
)

const (
mysqlDBDriverDefault = "mysql"
mysqlDBHostDefault = "mysql.kubeflow.svc.cluster.local"
mysqlDBPortDefault = "3306"
mysqlDBGroupConcatMaxLenDefault = "4194304"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
)

type DBConnectionParameters struct {
type ConnectionParams struct {
DbDriver string
DbHost string
DbPort string
Expand All @@ -44,28 +34,19 @@ type DBConnectionParameters struct {
DbExtraParams string
}

func (params *DBConnectionParameters) LoadDefaults() {
flag.StringVar(&params.DbDriver, "db_driver", mysqlDBDriverDefault, "Database driver name, mysql is the default value")
flag.StringVar(&params.DbHost, "db_host", mysqlDBHostDefault, "Database host name.")
flag.StringVar(&params.DbPort, "db_port", mysqlDBPortDefault, "Database port number.")
flag.StringVar(&params.DbName, "db_name", "cachedb", "Database name.")
flag.StringVar(&params.DbUser, "db_user", "root", "Database user name.")
flag.StringVar(&params.DbPwd, "db_password", "", "Database password.")
flag.StringVar(&params.DbGroupConcatMaxLen, "db_group_concat_max_len", mysqlDBGroupConcatMaxLenDefault, "Database group concat max length.")
flag.Parse()
}

func InitDBClient(params DBConnectionParameters, initConnectionTimeout time.Duration, log *zap.SugaredLogger) (*gorm.DB, error) {
func InitDBClient(params ConnectionParams, initConnectionTimeout time.Duration) (*gorm.DB, error) {
driverName := params.DbDriver
var arg string
var err error

switch driverName {
case mysqlDBDriverDefault:
arg, err = initMysql(params, initConnectionTimeout, log)
arg, err = initMysql(params, initConnectionTimeout)
if err != nil {
return nil, err
}
case sqliteDriverDefault:
arg = initSqlite(params.DbName)
default:
return nil, fmt.Errorf("driver %v is not supported", driverName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,37 @@ import (
"database/sql"
"encoding/json"
"fmt"
"github.com/cenkalti/backoff"
"go.uber.org/zap"
"time"

"github.com/cenkalti/backoff"

"github.com/go-sql-driver/mysql"
)

func initMysql(params DBConnectionParameters, initConnectionTimeout time.Duration, log *zap.SugaredLogger) (string, error) {
const (
mysqlDBDriverDefault = "mysql"
mysqlDBHostDefault = "mysql.kubeflow.svc.cluster.local"
mysqlDBPortDefault = "3306"
mysqlDBGroupConcatMaxLenDefault = "4194304"
)

func setDefault(field *string, defaultVal string) {
if *field == "" {
*field = defaultVal
}
}

func (params *ConnectionParams) LoadMySQLDefaults() {
setDefault(&params.DbDriver, mysqlDBDriverDefault)
setDefault(&params.DbHost, mysqlDBHostDefault)
setDefault(&params.DbPort, mysqlDBPortDefault)
setDefault(&params.DbName, "cachedb")
setDefault(&params.DbUser, "root")
setDefault(&params.DbPwd, "")
setDefault(&params.DbGroupConcatMaxLen, mysqlDBGroupConcatMaxLenDefault)
}

func initMysql(params ConnectionParams, initConnectionTimeout time.Duration) (string, error) {
var mysqlExtraParams = map[string]string{}
data := []byte(params.DbExtraParams)
_ = json.Unmarshal(data, &mysqlExtraParams)
Expand Down Expand Up @@ -64,7 +86,6 @@ func initMysql(params DBConnectionParameters, initConnectionTimeout time.Duratio
if err != nil {
return err
}
log.Infof("Backing database for cache is created")
return nil
}
b = backoff.NewExponentialBackOff()
Expand All @@ -88,8 +109,8 @@ func initMysql(params DBConnectionParameters, initConnectionTimeout time.Duratio
return mysqlConfig.FormatDSN(), nil
}

func CreateMySQLConfig(user, password string, mysqlServiceHost string,
mysqlServicePort string, dbName string, mysqlGroupConcatMaxLen string, mysqlExtraParams map[string]string) *mysql.Config {
func CreateMySQLConfig(user, password, mysqlServiceHost, mysqlServicePort, dbName, mysqlGroupConcatMaxLen string,
mysqlExtraParams map[string]string) *mysql.Config {

params := map[string]string{
"charset": "utf8",
Expand Down
29 changes: 29 additions & 0 deletions tekton-catalog/cache/pkg/db/sqlite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package db

const sqliteDriverDefault = "sqlite3"

func (params *ConnectionParams) LoadSqliteDefaults() {
setDefault(&params.DbDriver, sqliteDriverDefault)
setDefault(&params.DbName, ":memory:")
}

func initSqlite(dbName string) string {
if dbName == "" {
dbName = ":memory:" // default db.
}
return dbName
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
)

type TaskCache struct {
ID int64 `gorm:"column:ID; not null; primary_key; AUTO_INCREMENT"`
TaskHashKey string `gorm:"column:TaskHashKey; not null; index:idx_cache_key"`
TaskOutput string `gorm:"column:TaskOutput; type:longtext; not null"`
CreatedAt time.Time `gorm:"column:CreatedAt; autoCreateTime:nano; not null"`
ID int64 `gorm:"column:ID; not null; primary_key; AUTO_INCREMENT"`
TaskHashKey string `gorm:"column:TaskHashKey; not null; index:idx_cache_key"`
TaskOutput string `gorm:"column:TaskOutput; type:longtext; not null"`
CreatedAt time.Time `gorm:"column:CreatedAt; autoCreateTime:nano; not null"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,33 @@ package cache

import (
"fmt"
"github.com/jinzhu/gorm"
"github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/cache/db"
"github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/cache/model"
"go.uber.org/zap"
"time"

"github.com/jinzhu/gorm"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
)

const (
DefaultConnectionTimeout = time.Minute * 6
)

type TaskCacheStore struct {
db *gorm.DB
Enabled bool
db *gorm.DB
Disabled bool
}

func (t *TaskCacheStore) Connect(params db.DBConnectionParameters, logger *zap.SugaredLogger) error {
if t.db != nil {
func (t *TaskCacheStore) Connect(params db.ConnectionParams) error {
if t.db != nil || t.Disabled {
return nil
}
var err error
t.db, err = db.InitDBClient(params, DefaultConnectionTimeout, logger)
if err == nil {
t.Enabled = true
}
t.db, err = db.InitDBClient(params, DefaultConnectionTimeout)
return err
}

func (t *TaskCacheStore) Get(taskHashKey string) (*model.TaskCache, error) {
if !t.Enabled {
if t.Disabled || t.db == nil {
return nil, nil
}
entry := &model.TaskCache{}
Expand All @@ -58,7 +55,7 @@ func (t *TaskCacheStore) Get(taskHashKey string) (*model.TaskCache, error) {
}

func (t *TaskCacheStore) Put(entry *model.TaskCache) (*model.TaskCache, error) {
if !t.Enabled {
if t.Disabled || t.db == nil {
return nil, nil
}
ok := t.db.NewRecord(entry)
Expand All @@ -74,7 +71,7 @@ func (t *TaskCacheStore) Put(entry *model.TaskCache) (*model.TaskCache, error) {
}

func (t *TaskCacheStore) Delete(id string) error {
if !t.Enabled {
if t.Disabled || t.db == nil {
return nil
}
d := t.db.Delete(&model.TaskCache{}, "ID = ?", id)
Expand All @@ -85,7 +82,7 @@ func (t *TaskCacheStore) Delete(id string) error {
}

func (t *TaskCacheStore) PruneOlderThan(timestamp time.Time) error {
if !t.Enabled {
if t.Disabled || t.db == nil {
return nil
}
d := t.db.Delete(&model.TaskCache{}, "CreatedAt <= ?", timestamp)
Expand Down
Loading

0 comments on commit ebe62af

Please sign in to comment.