Skip to content

Commit

Permalink
go/common/badger: Fix v2->v3 migration for managed mode
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jun 9, 2021
1 parent b6da118 commit 284c0b4
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 5 deletions.
1 change: 1 addition & 0 deletions .changelog/4010.bugfix.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/common/badger: Fix v2->v3 migration for managed mode
17 changes: 12 additions & 5 deletions go/common/badger/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func openWithMigrations(opts badger.Options, managed bool) (*badger.DB, error) {
}

// Perform the migration.
if err := migrateDatabase(opts); err != nil {
if err := migrateDatabase(opts, managed); err != nil {
return nil, fmt.Errorf("migration failed: %w", err)
}

// Retry opening the database.
return openFn(opts)
}

func migrateDatabase(opts badger.Options) error {
func migrateDatabase(opts badger.Options, managed bool) error {
var logger *logging.Logger
adapter, _ := opts.Logger.(*badgerLogger)
if logger != nil {
Expand All @@ -180,12 +180,19 @@ func migrateDatabase(opts badger.Options) error {
return fmt.Errorf("failed to remove temporary destination '%s': %w", temporaryDbName, err)
}

openFnV2 := badgerV2.Open
openFnV3 := badger.Open
if managed {
openFnV2 = badgerV2.OpenManaged
openFnV3 = badger.OpenManaged
}

// Open the database as Badger v2.
optsV2 := badgerV2.DefaultOptions(opts.Dir)
optsV2 = optsV2.WithNumVersionsToKeep(math.MaxInt32)
optsV2 = optsV2.WithLogger(nil)

dbV2, err := badgerV2.Open(optsV2)
dbV2, err := openFnV2(optsV2)
if err != nil {
return fmt.Errorf("failed to open source database: %w", err)
}
Expand All @@ -196,7 +203,7 @@ func migrateDatabase(opts badger.Options) error {
optsV3 = optsV3.WithNumVersionsToKeep(math.MaxInt32)
optsV3 = optsV3.WithLogger(NewLogAdapter(logger))

dbV3, err := badger.Open(optsV3)
dbV3, err := openFnV3(optsV3)
if err != nil {
return fmt.Errorf("failed to open destination database: %w", err)
}
Expand All @@ -213,7 +220,7 @@ func migrateDatabase(opts badger.Options) error {
defer w.Close()
defer bw.Flush()

_, errBackup := dbV2.Backup(bw, 0)
_, errBackup := backup(dbV2, bw, managed)
backupCh <- errBackup
}()

Expand Down
86 changes: 86 additions & 0 deletions go/common/badger/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package badger

import (
"bytes"
"context"
"encoding/binary"
"io"
"math"

badgerV2 "github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/pb"
"github.com/golang/protobuf/proto"
)

// Adapted from Badger v2 which is Copyright 2017 Dgraph Labs, Inc. and Contributors, released
// under the Apache-2 license.

func backup(db *badgerV2.DB, w io.Writer, managed bool) (uint64, error) {
var stream *badgerV2.Stream
switch managed {
case true:
stream = db.NewStreamAt(math.MaxUint64)
case false:
stream = db.NewStream()
}

stream.LogPrefix = "migration"
stream.KeyToList = func(key []byte, itr *badgerV2.Iterator) (*pb.KVList, error) {
list := &pb.KVList{}
for ; itr.Valid(); itr.Next() {
item := itr.Item()
if !bytes.Equal(item.Key(), key) {
return list, nil
}

var valCopy []byte
var meta byte
switch item.IsDeletedOrExpired() {
case true:
// No need to copy value, if item is deleted or expired.
// Set delete bit.
meta = 1 << 0 // bitDelete
case false:
var err error
valCopy, err = item.ValueCopy(nil)
if err != nil {
return nil, err
}
}

kv := &pb.KV{
Key: item.KeyCopy(nil),
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
Meta: []byte{meta},
}
list.Kv = append(list.Kv, kv)
}
return list, nil
}

var maxVersion uint64
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
if maxVersion < kv.Version {
maxVersion = kv.Version
}
}
if err := binary.Write(w, binary.LittleEndian, uint64(proto.Size(list))); err != nil {
return err
}
buf, err := proto.Marshal(list)
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}

if err := stream.Orchestrate(context.Background()); err != nil {
return 0, err
}
return maxVersion, nil
}

0 comments on commit 284c0b4

Please sign in to comment.