Skip to content

Commit

Permalink
Experiment store (#292)
Browse files Browse the repository at this point in the history
* small update to docs

* initial experiment store commit

* single db

* updated experiment db

* update git ignore

* review updates

* fix db path
  • Loading branch information
ukclivecox authored Jun 18, 2022
1 parent 9dde04c commit a92bda9
Show file tree
Hide file tree
Showing 16 changed files with 293 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ spec:
- args:
- --pipeline-gateway-host=seldon-pipelinegateway
- --tracing-config-path=/mnt/tracing/tracing.json
- --pipeline-db-path=/mnt/scheduler/pipelinedb
- --db-path=/mnt/scheduler/db
command:
- /bin/scheduler
image: seldonio/seldon-scheduler:latest
Expand Down
2 changes: 1 addition & 1 deletion k8s/yaml/seldon-v2-components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ spec:
- args:
- --pipeline-gateway-host=seldon-pipelinegateway
- --tracing-config-path=/mnt/tracing/tracing.json
- --pipeline-db-path=/mnt/scheduler/pipelinedb
- --db-path=/mnt/scheduler/db
command:
- /bin/scheduler
image: seldonio/seldon-scheduler:latest
Expand Down
3 changes: 2 additions & 1 deletion scheduler/.gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
.idea/
*~
/bin/*
/mnt/pipelinedb
/mnt/db/pipelinedb
/mnt/db/experimentdb
/mnt/mlserver
/mnt/triton
/apis-TEMP
Expand Down
15 changes: 7 additions & 8 deletions scheduler/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -502,22 +502,21 @@ stop-dataflow-engine-host:

SCHEDULER_USE_DB ?= true
ifeq ($(SCHEDULER_USE_DB),true)
PIPELINE_DB_PATH_LOCAL = ${PWD}/mnt/pipelinedb
export PIPELINE_DB_PATH_COMPOSE = /mnt/db/pipelinedb
DB_PATH_LOCAL = ${PWD}/mnt/db
export DB_PATH_COMPOSE = /mnt/db
else
PIPELINE_DB_PATH_LOCAL =
export PIPELINE_DB_PATH_COMPOSE =
DB_PATH_LOCAL =
export DB_PATH_COMPOSE =
endif

.PHONY: start-scheduler-local
start-scheduler-local:
mkdir -p ${PWD}/mnt/pipelinedb
./bin/scheduler --pipeline-db-path "${PIPELINE_DB_PATH}"
./bin/scheduler --db-path "${DB_PATH_LOCAL}"

.PHONY: clear-scheduler-state
clear-scheduler-state:
mkdir -p ${PWD}/mnt/pipelinedb
rm -r ${PWD}/mnt/pipelinedb/* || echo "no db files found"
rm -r ${PWD}/mnt/db/pipelinedb || echo "no pipeline db files found"
rm -r ${PWD}/mnt/db/experimentdb || echo "no experiment db files found"

.PHONY: start-agent-local-mlserver
start-agent-local-mlserver:
Expand Down
6 changes: 3 additions & 3 deletions scheduler/all-host-network.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ services:
- "debug"
- "--tracing-config-path"
- "/mnt/config/tracing-internal.json"
- --pipeline-db-path
- "${PIPELINE_DB_PATH_COMPOSE}"
- --db-path
- "${DB_PATH_COMPOSE}"
volumes:
- type: bind
source: ./config
target: /mnt/config
- type: bind
source: ./mnt
source: ./mnt/db
target: /mnt/db
network_mode: "host"

Expand Down
6 changes: 3 additions & 3 deletions scheduler/all-internal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ services:
- "pipelinegateway"
- "--tracing-config-path"
- "/mnt/config/tracing-internal.json"
- --pipeline-db-path
- ${PIPELINE_DB_PATH_COMPOSE}
- --db-path
- ${DB_PATH_COMPOSE}
volumes:
- type: bind
source: ./config
target: /mnt/config
- type: bind
source: ./mnt
source: ./mnt/db
target: /mnt/db
ports:
- "${SCHEDULER_XDS_PORT}:${SCHEDULER_XDS_PORT}"
Expand Down
23 changes: 16 additions & 7 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (
pipelineGatewayGrpcPort int
logLevel string
tracingConfigPath string
pipelineDbPath string
dbPath string
nodeID string
)

Expand Down Expand Up @@ -88,7 +88,7 @@ func init() {
flag.IntVar(&pipelineGatewayGrpcPort, "pipeline-gateway-grpc-port", 9011, "Pipeline gateway server grpc port")
flag.StringVar(&logLevel, "log-level", "debug", "Log level - examples: debug, info, error")
flag.StringVar(&tracingConfigPath, "tracing-config-path", "", "Tracing config path")
flag.StringVar(&pipelineDbPath, "pipeline-db-path", "", "Pipeline state Db")
flag.StringVar(&dbPath, "db-path", "", "State Db path")
}

func getNamespace() string {
Expand Down Expand Up @@ -139,18 +139,27 @@ func main() {
}

ps := pipeline.NewPipelineStore(logger, eventHub)
if pipelineDbPath != "" {
logger.Infof("Opening db at %s", pipelineDbPath)
err := ps.InitialiseDB(pipelineDbPath)
if dbPath != "" {
err := ps.InitialiseOrRestoreDB(dbPath)
if err != nil {
log.WithError(err).Fatalf("Failed to initialise pipeline db at %s", pipelineDbPath)
log.WithError(err).Fatalf("Failed to initialise pipeline db at %s", dbPath)
}
} else {
log.Warn("Not running with scheduler DB")
log.Warn("Not running with scheduler pipeline DB")
}

ss := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)

es := experiment.NewExperimentServer(logger, eventHub, ss)
if dbPath != "" {
err := es.InitialiseOrRestoreDB(dbPath)
if err != nil {
log.WithError(err).Fatalf("Failed to initialise experiment db at %s", dbPath)
}
} else {
log.Warn("Not running with scheduler experiment DB")
}

pipelineGatewayDetails := xdscache.PipelineGatewayDetails{
Host: pipelineGatewayHost,
HttpPort: pipelineGatewayHttpPort,
Expand Down
2 changes: 1 addition & 1 deletion scheduler/k8s/scheduler/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ spec:
args:
- --pipeline-gateway-host=seldon-pipelinegateway
- --tracing-config-path=/mnt/tracing/tracing.json
- --pipeline-db-path=/mnt/scheduler/pipelinedb
- --db-path=/mnt/scheduler/db
image: scheduler:latest
imagePullPolicy: IfNotPresent
name: scheduler
Expand Down
Empty file added scheduler/mnt/db/.keep
Empty file.
78 changes: 78 additions & 0 deletions scheduler/pkg/store/experiment/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package experiment

import (
"github.com/dgraph-io/badger/v3"
"github.com/seldonio/seldon-core/scheduler/apis/mlops/scheduler"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type ExperimentDBManager struct {
db *badger.DB
}

func newExperimentDbManager(path string, logger logrus.FieldLogger) (*ExperimentDBManager, error) {
options := badger.DefaultOptions(path)
options.Logger = logger.WithField("source", "experimentDb")
db, err := badger.Open(options)
if err != nil {
return nil, err
}
return &ExperimentDBManager{
db: db,
}, nil
}

func (edb *ExperimentDBManager) Stop() error {
return edb.db.Close()
}

func (edb *ExperimentDBManager) save(experiment *Experiment) error {
experimentProto := CreateExperimentProto(experiment)
experimentBytes, err := proto.Marshal(experimentProto)
if err != nil {
return err
}
return edb.db.Update(func(txn *badger.Txn) error {
err = txn.Set([]byte(experiment.Name), experimentBytes)
return err
})
}

func (edb *ExperimentDBManager) delete(experiment *Experiment) error {
return edb.db.Update(func(txn *badger.Txn) error {
err := txn.Delete([]byte(experiment.Name))
return err
})
}

func (edb *ExperimentDBManager) restore(startExperimentCb func(*Experiment) error) error {
return edb.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
err := item.Value(func(v []byte) error {
snapshot := scheduler.Experiment{}
err := proto.Unmarshal(v, &snapshot)
if err != nil {
return err
}
experiment := CreateExperimentFromRequest(&snapshot)
if err != nil {
return err
}
err = startExperimentCb(experiment)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
}
return nil
})
}
73 changes: 73 additions & 0 deletions scheduler/pkg/store/experiment/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package experiment

import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
log "github.com/sirupsen/logrus"

. "github.com/onsi/gomega"
)

func TestSaveAndRestore(t *testing.T) {
g := NewGomegaWithT(t)
type test struct {
name string
experiments []*Experiment
}

tests := []test{
{
name: "basic experiment",
experiments: []*Experiment{
{
Name: "test1",
Candidates: []*Candidate{
{
ModelName: "model1",
Weight: 50,
},
{
ModelName: "model2",
Weight: 50,
},
},
Mirror: &Mirror{
ModelName: "model3",
Percent: 90,
},
Config: &Config{
StickySessions: true,
},
KubernetesMeta: &KubernetesMeta{
Namespace: "default",
Generation: 2,
},
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
logger := log.New()
db, err := newExperimentDbManager(getExperimentDbFolder(path), logger)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
err := db.save(p)
g.Expect(err).To(BeNil())
}
err = db.Stop()
g.Expect(err).To(BeNil())

es := NewExperimentServer(log.New(), nil, nil)
err = es.InitialiseOrRestoreDB(path)
g.Expect(err).To(BeNil())
for _, p := range test.experiments {
g.Expect(cmp.Equal(p, es.experiments[p.Name])).To(BeTrue())
}
})
}
}
41 changes: 41 additions & 0 deletions scheduler/pkg/store/experiment/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package experiment

import (
"os"
"path/filepath"
"sync"

"github.com/seldonio/seldon-core/scheduler/pkg/store"
Expand All @@ -15,6 +17,7 @@ const (
experimentStartEventSource = "experiment.store.start"
experimentStopEventSource = "experiment.store.stop"
modelEventHandlerName = "experiment.store.models"
experimentDbFolder = "experimentdb"
)

type ExperimentServer interface {
Expand All @@ -34,6 +37,7 @@ type ExperimentStore struct {
modelReferences map[string]map[string]*Experiment // modelName to experiments it appears in
eventHub *coordinator.EventHub
store store.ModelStore
db *ExperimentDBManager
}

func NewExperimentServer(logger logrus.FieldLogger, eventHub *coordinator.EventHub, store store.ModelStore) *ExperimentStore {
Expand All @@ -59,6 +63,31 @@ func NewExperimentServer(logger logrus.FieldLogger, eventHub *coordinator.EventH
return es
}

func getExperimentDbFolder(basePath string) string {
return filepath.Join(basePath, experimentDbFolder)
}

func (es *ExperimentStore) InitialiseOrRestoreDB(path string) error {
logger := es.logger.WithField("func", "initialiseDB")
experimentDbPath := getExperimentDbFolder(path)
logger.Infof("Initialise DB at %s", experimentDbPath)
err := os.MkdirAll(experimentDbPath, os.ModePerm)
if err != nil {
return err
}
db, err := newExperimentDbManager(experimentDbPath, es.logger)
if err != nil {
return err
}
es.db = db
// If database already existed we can restore else this is a noop
err = es.db.restore(es.StartExperiment)
if err != nil {
return err
}
return nil
}

func (es *ExperimentStore) publishModelEvent(experiment *Experiment) {
es.eventHub.PublishModelEvent(experimentStateEventSource, coordinator.ModelEventMsg{
ModelName: *experiment.DefaultModel,
Expand Down Expand Up @@ -225,6 +254,12 @@ func (es *ExperimentStore) startExperimentImpl(experiment *Experiment) (*coordin
}
}
es.updateExperimentState(experiment)
if es.db != nil {
err := es.db.save(experiment)
if err != nil {
return nil, nil, err
}
}
es.experiments[experiment.Name] = experiment
return es.createExperimentEventMsg(experiment, true), modelEvt, nil
}
Expand Down Expand Up @@ -260,6 +295,12 @@ func (es *ExperimentStore) stopExperimentImpl(experimentName string) (*coordinat
ModelName: *experiment.DefaultModel,
}
}
if es.db != nil {
err := es.db.delete(experiment)
if err != nil {
return nil, nil, err
}
}
return es.createExperimentEventMsg(experiment, true), modelEvt, nil
} else {
return nil, nil, &ExperimentNotFound{
Expand Down
Loading

0 comments on commit a92bda9

Please sign in to comment.