Skip to content

Commit

Permalink
Pipeline db fixes (#283)
Browse files Browse the repository at this point in the history
* small update to docs

* fix pipeline updates and logging
  • Loading branch information
ukclivecox authored Jun 10, 2022
1 parent 2d1a27d commit ec665ea
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
7 changes: 5 additions & 2 deletions scheduler/pkg/store/pipeline/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package pipeline
import (
"github.com/dgraph-io/badger/v3"
"github.com/seldonio/seldon-core/scheduler/apis/mlops/scheduler"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

type PipelineDB struct {
db *badger.DB
}

func NewPipelineDb(path string) (*PipelineDB, error) {
db, err := badger.Open(badger.DefaultOptions(path))
func NewPipelineDb(path string, logger logrus.FieldLogger) (*PipelineDB, error) {
options := badger.DefaultOptions(path)
options.Logger = logger.WithField("source", "pipelineDb")
db, err := badger.Open(options)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion scheduler/pkg/store/pipeline/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func TestSaveAndRestore(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
path := fmt.Sprintf("%s/db", t.TempDir())
db, err := NewPipelineDb(path)
logger := log.New()
db, err := NewPipelineDb(path, logger)
g.Expect(err).To(BeNil())
for _, p := range test.pipelines {
err := db.save(p)
Expand Down
6 changes: 3 additions & 3 deletions scheduler/pkg/store/pipeline/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type PipelineStore struct {

func NewPipelineStore(logger logrus.FieldLogger, eventHub *coordinator.EventHub) *PipelineStore {
ps := &PipelineStore{
logger: logger,
logger: logger.WithField("source", "pipelineStore"),
eventHub: eventHub,
pipelines: make(map[string]*Pipeline),
db: nil,
Expand All @@ -49,7 +49,7 @@ func (ps *PipelineStore) InitialiseDB(path string) error {
if err != nil {
return err
}
db, err := NewPipelineDb(path)
db, err := NewPipelineDb(path, ps.logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func (ps *PipelineStore) setPipelineStateImpl(name string, versionNumber uint32,
if status == PipelineReady {
evts = append(evts, ps.terminateOldUnterminatedPipelinesIfNeeded(pipeline)...)
}
if ps.db != nil {
if !pipeline.Deleted && ps.db != nil {
err := ps.db.save(pipeline)
if err != nil {
return evts, err
Expand Down

0 comments on commit ec665ea

Please sign in to comment.