Skip to content

Commit

Permalink
Problem: no command to fix corrupted data in versiondb (#1685)
Browse files Browse the repository at this point in the history
* Problem: no command to fix corrupted data in versiondb

Closes: #1683

Solution:
- add fix command to fix corrupted data in versiondb

* rename

* support SkipVersionZero

* support SkipVersionZero

* cleanup

* cleanup

* cleanup

* destroy

* fix data manually

* Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

* Update versiondb/tsrocksdb/store.go

Co-authored-by: mmsqe <[email protected]>
Signed-off-by: yihuang <[email protected]>

* cli

* cleanup

* Update versiondb/client/fixdata.go

Signed-off-by: yihuang <[email protected]>

* rename

* fix test

* check nil

* don't return nil as empty slice

* add dryrun mode

* cleanup

* separete read from iteration

* store name flag

* validate timestamp

* skip non-zero version

* update gomod2nix

* fix build

* flush after fix

* fix

* revert

---------

Signed-off-by: yihuang <[email protected]>
Co-authored-by: mmsqe <[email protected]>
  • Loading branch information
yihuang and mmsqe authored Nov 12, 2024
1 parent 0c5e998 commit 6dc645a
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bug Fixes

* [#1679](https://github.com/crypto-org-chain/cronos/pull/1679) Include no trace detail on insufficient balance fix.
* [#1685](https://github.com/crypto-org-chain/cronos/pull/1685) Add command to fix versiondb corrupted data.

### Improvements

Expand Down
4 changes: 4 additions & 0 deletions app/versiondb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (app *App) setupVersionDB(
for _, key := range keys {
exposedKeys = append(exposedKeys, key)
}

// see: https://github.com/crypto-org-chain/cronos/issues/1683
versionDB.SetSkipVersionZero(true)

app.CommitMultiStore().AddListeners(exposedKeys)

// register in app streaming manager
Expand Down
2 changes: 1 addition & 1 deletion gomod2nix.toml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions versiondb/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func ChangeSetGroupCmd(opts Options) *cobra.Command {
ChangeSetToVersionDBCmd(),
RestoreAppDBCmd(opts),
RestoreVersionDBCmd(),
FixDataCmd(opts.DefaultStores),
)
return cmd
}
59 changes: 59 additions & 0 deletions versiondb/client/fixdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package client

import (
"github.com/crypto-org-chain/cronos/versiondb/tsrocksdb"
"github.com/linxGnu/grocksdb"
"github.com/spf13/cobra"
)

const (
FlagDryRun = "dry-run"
FlagStore = "store-name"
)

func FixDataCmd(defaultStores []string) *cobra.Command {
cmd := &cobra.Command{
Use: "fixdata <dir>",
Args: cobra.ExactArgs(1),
Short: "Fix wrong data in versiondb, see: https://github.com/crypto-org-chain/cronos/issues/1683",
RunE: func(cmd *cobra.Command, args []string) error {
dir := args[0]
dryRun, err := cmd.Flags().GetBool(FlagDryRun)
if err != nil {
return err
}
stores, err := cmd.Flags().GetStringArray(FlagStore)
if err != nil {
return err
}
if len(stores) == 0 {
stores = defaultStores
}

var (
db *grocksdb.DB
cfHandle *grocksdb.ColumnFamilyHandle
)

if dryRun {
db, cfHandle, err = tsrocksdb.OpenVersionDBForReadOnly(dir, false)
} else {
db, cfHandle, err = tsrocksdb.OpenVersionDB(dir)
}
if err != nil {
return err
}

versionDB := tsrocksdb.NewStoreWithDB(db, cfHandle)
if err := versionDB.FixData(stores, dryRun); err != nil {
return err
}

return nil
},
}

cmd.Flags().Bool(FlagDryRun, false, "Dry run, do not write to the database, open the database in read-only mode.")
cmd.Flags().StringArray(FlagStore, []string{}, "Store names to fix, if not specified, all stores will be fixed.")
return cmd
}
40 changes: 32 additions & 8 deletions versiondb/tsrocksdb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsrocksdb

import (
"bytes"
"encoding/binary"

"github.com/crypto-org-chain/cronos/versiondb"
"github.com/linxGnu/grocksdb"
Expand All @@ -12,11 +13,14 @@ type rocksDBIterator struct {
prefix, start, end []byte
isReverse bool
isInvalid bool

// see: https://github.com/crypto-org-chain/cronos/issues/1683
skipVersionZero bool
}

var _ versiondb.Iterator = (*rocksDBIterator)(nil)

func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool) *rocksDBIterator {
func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool, skipVersionZero bool) *rocksDBIterator {
if isReverse {
if end == nil {
source.SeekToLast()
Expand All @@ -39,14 +43,18 @@ func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, is
source.Seek(start)
}
}
return &rocksDBIterator{
source: source,
prefix: prefix,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
it := &rocksDBIterator{
source: source,
prefix: prefix,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
skipVersionZero: skipVersionZero,
}

it.trySkipZeroVersion()
return it
}

// Domain implements Iterator.
Expand Down Expand Up @@ -120,6 +128,22 @@ func (itr rocksDBIterator) Next() {
} else {
itr.source.Next()
}

itr.trySkipZeroVersion()
}

func (itr rocksDBIterator) timestamp() uint64 {
ts := itr.source.Timestamp()
defer ts.Free()
return binary.LittleEndian.Uint64(ts.Data())
}

func (itr rocksDBIterator) trySkipZeroVersion() {
if itr.skipVersionZero {
for itr.Valid() && itr.timestamp() == 0 {
itr.Next()
}
}
}

// Error implements Iterator.
Expand Down
14 changes: 14 additions & 0 deletions versiondb/tsrocksdb/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ func OpenVersionDB(dir string) (*grocksdb.DB, *grocksdb.ColumnFamilyHandle, erro
return db, cfHandles[1], nil
}

// OpenVersionDBForReadOnly open versiondb in readonly mode
func OpenVersionDBForReadOnly(dir string, errorIfWalFileExists bool) (*grocksdb.DB, *grocksdb.ColumnFamilyHandle, error) {
opts := grocksdb.NewDefaultOptions()
db, cfHandles, err := grocksdb.OpenDbForReadOnlyColumnFamilies(
opts, dir, []string{"default", VersionDBCFName},
[]*grocksdb.Options{opts, NewVersionDBOpts(false)},
errorIfWalFileExists,
)
if err != nil {
return nil, nil, err
}
return db, cfHandles[1], nil
}

// OpenVersionDBAndTrimHistory opens versiondb similar to `OpenVersionDB`,
// but it also trim the versions newer than target one, can be used for rollback.
func OpenVersionDBAndTrimHistory(dir string, version int64) (*grocksdb.DB, *grocksdb.ColumnFamilyHandle, error) {
Expand Down
134 changes: 122 additions & 12 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsrocksdb

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -38,6 +39,9 @@ func init() {
type Store struct {
db *grocksdb.DB
cfHandle *grocksdb.ColumnFamilyHandle

// see: https://github.com/crypto-org-chain/cronos/issues/1683
skipVersionZero bool
}

func NewStore(dir string) (Store, error) {
Expand All @@ -58,6 +62,10 @@ func NewStoreWithDB(db *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) Stor
}
}

func (s *Store) SetSkipVersionZero(skip bool) {
s.skipVersionZero = skip
}

func (s Store) SetLatestVersion(version int64) error {
var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
Expand Down Expand Up @@ -86,11 +94,23 @@ func (s Store) PutAtVersion(version int64, changeSet []*types.StoreKVPair) error
}

func (s Store) GetAtVersionSlice(storeKey string, key []byte, version *int64) (*grocksdb.Slice, error) {
return s.db.GetCF(
value, ts, err := s.db.GetCFWithTS(
newTSReadOptions(version),
s.cfHandle,
prependStoreKey(storeKey, key),
)
if err != nil {
return nil, err
}
defer ts.Free()

if value.Exists() && s.skipVersionZero {
if binary.LittleEndian.Uint64(ts.Data()) == 0 {
return grocksdb.NewSlice(nil, 0), nil
}
}

return value, err
}

// GetAtVersion implements VersionStore interface
Expand Down Expand Up @@ -128,28 +148,24 @@ func (s Store) GetLatestVersion() (int64, error) {

// IteratorAtVersion implements VersionStore interface
func (s Store) IteratorAtVersion(storeKey string, start, end []byte, version *int64) (versiondb.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}

prefix := storePrefix(storeKey)
start, end = iterateWithPrefix(prefix, start, end)

itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle)
return newRocksDBIterator(itr, prefix, start, end, false), nil
return s.iteratorAtVersion(storeKey, start, end, version, false)
}

// ReverseIteratorAtVersion implements VersionStore interface
func (s Store) ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (versiondb.Iterator, error) {
return s.iteratorAtVersion(storeKey, start, end, version, true)
}

func (s Store) iteratorAtVersion(storeKey string, start, end []byte, version *int64, reverse bool) (versiondb.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}

prefix := storePrefix(storeKey)
start, end = iterateWithPrefix(storePrefix(storeKey), start, end)
start, end = iterateWithPrefix(prefix, start, end)

itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle)
return newRocksDBIterator(itr, prefix, start, end, true), nil
return newRocksDBIterator(itr, prefix, start, end, reverse, s.skipVersionZero), nil
}

// FeedChangeSet is used to migrate legacy change sets into versiondb
Expand Down Expand Up @@ -216,6 +232,100 @@ func (s Store) Flush() error {
)
}

// FixData fixes wrong data written in versiondb due to rocksdb upgrade, the operation is idempotent.
// see: https://github.com/crypto-org-chain/cronos/issues/1683
// call this before `SetSkipVersionZero(true)`.
func (s Store) FixData(storeNames []string, dryRun bool) error {
for _, storeName := range storeNames {
if err := s.fixDataStore(storeName, dryRun); err != nil {
return err
}
}

return s.Flush()
}

// fixDataStore iterate the wrong data at version 0, parse the timestamp from the key and write it again.
func (s Store) fixDataStore(storeName string, dryRun bool) error {
pairs, err := s.loadWrongData(storeName)
if err != nil {
return err
}

batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

prefix := storePrefix(storeName)
readOpts := grocksdb.NewDefaultReadOptions()
defer readOpts.Destroy()
for _, pair := range pairs {
realKey := cloneAppend(prefix, pair.Key)

readOpts.SetTimestamp(pair.Timestamp)
oldValue, err := s.db.GetCF(readOpts, s.cfHandle, realKey)
if err != nil {
return err
}

clean := bytes.Equal(oldValue.Data(), pair.Value)
oldValue.Free()

if clean {
continue
}

if dryRun {
fmt.Printf("fix data: %s, key: %X, ts: %X\n", storeName, pair.Key, pair.Timestamp)
} else {
batch.PutCFWithTS(s.cfHandle, realKey, pair.Timestamp, pair.Value)
}
}

if !dryRun {
return s.db.Write(defaultSyncWriteOpts, batch)
}

return nil
}

type KVPairWithTS struct {
Key []byte
Value []byte
Timestamp []byte
}

func (s Store) loadWrongData(storeName string) ([]KVPairWithTS, error) {
var version int64
iter, err := s.IteratorAtVersion(storeName, nil, nil, &version)
if err != nil {
return nil, err
}
defer iter.Close()

var pairs []KVPairWithTS
for ; iter.Valid(); iter.Next() {
if binary.LittleEndian.Uint64(iter.Timestamp()) != 0 {
// FIXME: https://github.com/crypto-org-chain/cronos/issues/1689
continue
}

key := iter.Key()
if len(key) < TimestampSize {
return nil, fmt.Errorf("invalid key length: %X, store: %s", key, storeName)
}

ts := key[len(key)-TimestampSize:]
key = key[:len(key)-TimestampSize]
pairs = append(pairs, KVPairWithTS{
Key: key,
Value: iter.Value(),
Timestamp: ts,
})
}

return pairs, nil
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down
Loading

0 comments on commit 6dc645a

Please sign in to comment.