diff --git a/.changelog/4010.bugfix.2.md b/.changelog/4010.bugfix.2.md new file mode 100644 index 00000000000..d6744549111 --- /dev/null +++ b/.changelog/4010.bugfix.2.md @@ -0,0 +1 @@ +go/common/badger: Fix v2->v3 migration for managed mode diff --git a/go/common/badger/helpers.go b/go/common/badger/helpers.go index 4d1d8e759c8..2d5c0381991 100644 --- a/go/common/badger/helpers.go +++ b/go/common/badger/helpers.go @@ -149,7 +149,7 @@ func openWithMigrations(opts badger.Options, managed bool) (*badger.DB, error) { } // Perform the migration. - if err := migrateDatabase(opts); err != nil { + if err := migrateDatabase(opts, managed); err != nil { return nil, fmt.Errorf("migration failed: %w", err) } @@ -157,7 +157,7 @@ func openWithMigrations(opts badger.Options, managed bool) (*badger.DB, error) { return openFn(opts) } -func migrateDatabase(opts badger.Options) error { +func migrateDatabase(opts badger.Options, managed bool) error { var logger *logging.Logger adapter, _ := opts.Logger.(*badgerLogger) if logger != nil { @@ -180,12 +180,19 @@ func migrateDatabase(opts badger.Options) error { return fmt.Errorf("failed to remove temporary destination '%s': %w", temporaryDbName, err) } + openFnV2 := badgerV2.Open + openFnV3 := badger.Open + if managed { + openFnV2 = badgerV2.OpenManaged + openFnV3 = badger.OpenManaged + } + // Open the database as Badger v2. optsV2 := badgerV2.DefaultOptions(opts.Dir) optsV2 = optsV2.WithNumVersionsToKeep(math.MaxInt32) optsV2 = optsV2.WithLogger(nil) - dbV2, err := badgerV2.Open(optsV2) + dbV2, err := openFnV2(optsV2) if err != nil { return fmt.Errorf("failed to open source database: %w", err) } @@ -196,7 +203,7 @@ func migrateDatabase(opts badger.Options) error { optsV3 = optsV3.WithNumVersionsToKeep(math.MaxInt32) optsV3 = optsV3.WithLogger(NewLogAdapter(logger)) - dbV3, err := badger.Open(optsV3) + dbV3, err := openFnV3(optsV3) if err != nil { return fmt.Errorf("failed to open destination database: %w", err) } @@ -213,7 +220,7 @@ func migrateDatabase(opts badger.Options) error { defer w.Close() defer bw.Flush() - _, errBackup := dbV2.Backup(bw, 0) + _, errBackup := backup(dbV2, bw, managed) backupCh <- errBackup }() diff --git a/go/common/badger/migrate.go b/go/common/badger/migrate.go new file mode 100644 index 00000000000..2e238b66e28 --- /dev/null +++ b/go/common/badger/migrate.go @@ -0,0 +1,86 @@ +package badger + +import ( + "bytes" + "context" + "encoding/binary" + "io" + "math" + + badgerV2 "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2/pb" + "github.com/golang/protobuf/proto" +) + +// Adapted from Badger v2 which is Copyright 2017 Dgraph Labs, Inc. and Contributors, released +// under the Apache-2 license. + +func backup(db *badgerV2.DB, w io.Writer, managed bool) (uint64, error) { + var stream *badgerV2.Stream + switch managed { + case true: + stream = db.NewStreamAt(math.MaxUint64) + case false: + stream = db.NewStream() + } + + stream.LogPrefix = "migration" + stream.KeyToList = func(key []byte, itr *badgerV2.Iterator) (*pb.KVList, error) { + list := &pb.KVList{} + for ; itr.Valid(); itr.Next() { + item := itr.Item() + if !bytes.Equal(item.Key(), key) { + return list, nil + } + + var valCopy []byte + var meta byte + switch item.IsDeletedOrExpired() { + case true: + // No need to copy value, if item is deleted or expired. + // Set delete bit. + meta = 1 << 0 // bitDelete + case false: + var err error + valCopy, err = item.ValueCopy(nil) + if err != nil { + return nil, err + } + } + + kv := &pb.KV{ + Key: item.KeyCopy(nil), + Value: valCopy, + UserMeta: []byte{item.UserMeta()}, + Version: item.Version(), + ExpiresAt: item.ExpiresAt(), + Meta: []byte{meta}, + } + list.Kv = append(list.Kv, kv) + } + return list, nil + } + + var maxVersion uint64 + stream.Send = func(list *pb.KVList) error { + for _, kv := range list.Kv { + if maxVersion < kv.Version { + maxVersion = kv.Version + } + } + if err := binary.Write(w, binary.LittleEndian, uint64(proto.Size(list))); err != nil { + return err + } + buf, err := proto.Marshal(list) + if err != nil { + return err + } + _, err = w.Write(buf) + return err + } + + if err := stream.Orchestrate(context.Background()); err != nil { + return 0, err + } + return maxVersion, nil +}