diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index cca71d041f5..269870e2320 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -12,13 +12,14 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" - "github.com/dgraph-io/badger/v2/pb" + badgerstruct "github.com/dgraph-io/badger/v2/pb" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" logger "github.com/ipfs/go-log/v2" pool "github.com/libp2p/go-buffer-pool" "github.com/multiformats/go-base32" + "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/xerrors" @@ -242,7 +243,7 @@ func (b *Blockstore) unlockMove(state bsMoveState) { // are persisted to the new blockstore; if a failure occurs aboring the move, // then they must be peristed to the old blockstore. // In short, the blockstore must not lose data from new writes during the move. -func (b *Blockstore) movingGC() error { +func (b *Blockstore) movingGC(ctx context.Context) error { // this inlines moveLock/moveUnlock for the initial state check to prevent a second move // while one is in progress without clobbering state b.moveMx.Lock() @@ -327,7 +328,7 @@ func (b *Blockstore) movingGC() error { b.unlockMove(moveStateMoving) log.Info("copying blockstore") - err = b.doCopy(b.db, b.dbNext) + err = b.doCopy(ctx, b.db, b.dbNext) if err != nil { return fmt.Errorf("error moving badger blockstore to %s: %w", newPath, err) } @@ -389,37 +390,66 @@ func symlink(path, linkTo string) error { return os.Symlink(path, linkTo) } -// doCopy copies a badger blockstore to another, with an optional filter; if the filter -// is not nil, then only cids that satisfy the filter will be copied. -func (b *Blockstore) doCopy(from, to *badger.DB) error { - workers := runtime.NumCPU() / 2 - if workers < 2 { - workers = 2 - } - if workers > 8 { - workers = 8 - } - - stream := from.NewStream() - stream.NumGo = workers - stream.LogPrefix = "doCopy" - stream.Send = func(list *pb.KVList) error { - batch := to.NewWriteBatch() - defer batch.Cancel() +// doCopy copies a badger blockstore to another +func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr error) { + batch := to.NewWriteBatch() + defer func() { + if defErr == nil { + defErr = batch.Flush() + } + if defErr != nil { + batch.Cancel() + } + }() - for _, kv := range list.Kv { - if kv.Key == nil || kv.Value == nil { - continue - } + return iterateBadger(ctx, from, func(kvs []*badgerstruct.KV) error { + // check whether context is closed on every kv group + if err := ctx.Err(); err != nil { + return err + } + for _, kv := range kvs { if err := batch.Set(kv.Key, kv.Value); err != nil { return err } } + return nil + }) +} + +var IterateLSMWorkers int // defaults to between( 2, 8, runtime.NumCPU/2 ) - return batch.Flush() +func iterateBadger(ctx context.Context, db *badger.DB, iter func([]*badgerstruct.KV) error) error { + workers := IterateLSMWorkers + if workers == 0 { + workers = between(2, 8, runtime.NumCPU()/2) } - return stream.Orchestrate(context.Background()) + stream := db.NewStream() + stream.NumGo = workers + stream.LogPrefix = "iterateBadgerKVs" + stream.Send = func(kvl *badgerstruct.KVList) error { + kvs := make([]*badgerstruct.KV, 0, len(kvl.Kv)) + for _, kv := range kvl.Kv { + if kv.Key != nil && kv.Value != nil { + kvs = append(kvs, kv) + } + } + if len(kvs) == 0 { + return nil + } + return iter(kvs) + } + return stream.Orchestrate(ctx) +} + +func between(min, max, val int) int { + if val > max { + val = max + } + if val < min { + val = min + } + return val } func (b *Blockstore) deleteDB(path string) { @@ -500,7 +530,7 @@ func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.Bloc } if options.FullGC { - return b.movingGC() + return b.movingGC(ctx) } threshold := options.Threshold if threshold == 0 { @@ -627,7 +657,15 @@ func (b *Blockstore) Flush(context.Context) error { b.lockDB() defer b.unlockDB() - return b.db.Sync() + var nextErr error + if b.dbNext != nil { + nextErr = b.dbNext.Sync() + } + + return multierr.Combine( + nextErr, + b.db.Sync(), + ) } // Has implements Blockstore.Has.