Skip to content

Commit

Permalink
feat: recover block curruption on poison error (#28)
Browse files Browse the repository at this point in the history
* refactor: extract height key related functions

* feat: rollback previous block on inject error

* style: add comments
  • Loading branch information
Jeff Woo authored Feb 11, 2022
1 parent bd24353 commit ecdf9bd
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 16 deletions.
20 changes: 9 additions & 11 deletions bin/v0.34.x/db/heleveldb/leveldb_batch.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,28 @@
package heleveldb

import (
"math"

tmdb "github.com/tendermint/tm-db"
"github.com/terra-money/mantlemint-provider-v0.34.x/db/hld"
"github.com/terra-money/mantlemint-provider-v0.34.x/lib"
"github.com/terra-money/mantlemint-provider-v0.34.x/db/rollbackable"
)

var _ hld.HeightLimitEnabledBatch = (*LevelBatch)(nil)
var _ rollbackable.HasRollbackBatch = (*LevelBatch)(nil)

type LevelBatch struct {
height int64
batch tmdb.Batch
batch *rollbackable.RollbackableBatch
mode int
}

func (b *LevelBatch) keyBytesWithHeight(key []byte) []byte {
if b.mode == DriverModeKeySuffixAsc {
return append(prefixDataWithHeightKey(key), lib.UintToBigEndian(uint64(b.height))...)
} else {
return append(prefixDataWithHeightKey(key), lib.UintToBigEndian(math.MaxUint64-uint64(b.height))...)
}

return append(prefixDataWithHeightKey(key), serializeHeight(b.mode, b.height)...)
}

func NewLevelDBBatch(atHeight int64, driver *Driver) *LevelBatch {
return &LevelBatch{
height: atHeight,
batch: driver.session.NewBatch(),
batch: rollbackable.NewRollbackableBatch(driver.session),
mode: driver.mode,
}
}
Expand Down Expand Up @@ -75,3 +69,7 @@ func (b *LevelBatch) WriteSync() error {
func (b *LevelBatch) Close() error {
return b.batch.Close()
}

func (b *LevelBatch) RollbackBatch() tmdb.Batch {
return b.batch.RollbackBatch
}
23 changes: 23 additions & 0 deletions bin/v0.34.x/db/heleveldb/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package heleveldb

import (
"encoding/binary"
"math"

"github.com/terra-money/mantlemint-provider-v0.34.x/lib"
)

const (
DriverModeKeySuffixAsc = iota
DriverModeKeySuffixDesc
Expand All @@ -25,3 +32,19 @@ func prefixDataWithHeightKey(key []byte) []byte {
result = append(result, key...)
return result
}

func serializeHeight(mode int, height int64) []byte {
if mode == DriverModeKeySuffixAsc {
return lib.UintToBigEndian(uint64(height))
} else {
return lib.UintToBigEndian(math.MaxUint64 - uint64(height))
}
}

func deserializeHeight(mode int, data []byte) int64 {
if mode == DriverModeKeySuffixAsc {
return int64(binary.BigEndian.Uint64(data))
} else {
return int64(math.MaxUint64 - binary.BigEndian.Uint64(data))
}
}
53 changes: 53 additions & 0 deletions bin/v0.34.x/db/rollbackable/rollbackable_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package rollbackable

import (
tmdb "github.com/tendermint/tm-db"
)

type HasRollbackBatch interface {
RollbackBatch() tmdb.Batch
}

var _ tmdb.Batch = (*RollbackableBatch)(nil)

type RollbackableBatch struct {
tmdb.Batch

db tmdb.DB
RollbackBatch tmdb.Batch
}

func NewRollbackableBatch(db tmdb.DB) *RollbackableBatch {
return &RollbackableBatch{
db: db,
Batch: db.NewBatch(),
RollbackBatch: db.NewBatch(),
}
}

// revert value for key to previous state
func (b *RollbackableBatch) backup(key []byte) error {
data, err := b.db.Get(key)
if err != nil {
return err
}
if data == nil {
return b.RollbackBatch.Delete(key)
} else {
return b.RollbackBatch.Set(key, data)
}
}

func (b *RollbackableBatch) Set(key, value []byte) error {
if err := b.backup(key); err != nil {
return err
}
return b.Batch.Set(key, value)
}

func (b *RollbackableBatch) Delete(key []byte) error {
if err := b.backup(key); err != nil {
return err
}
return b.Batch.Delete(key)
}
13 changes: 10 additions & 3 deletions bin/v0.34.x/db/safe_batch/safe_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

tmdb "github.com/tendermint/tm-db"
"github.com/terra-money/mantlemint-provider-v0.34.x/db/rollbackable"
)

var _ tmdb.DB = (*SafeBatchDB)(nil)
Expand All @@ -12,7 +13,7 @@ var _ SafeBatchDBCloser = (*SafeBatchDB)(nil)
type SafeBatchDBCloser interface {
tmdb.DB
Open()
Flush() error
Flush() (tmdb.Batch, error)
}

type SafeBatchDB struct {
Expand All @@ -25,14 +26,20 @@ func (s *SafeBatchDB) Open() {
s.batch = s.db.NewBatch()
}

func (s *SafeBatchDB) Flush() error {
// flush batch and return rollback batch if rollbackable
func (s *SafeBatchDB) Flush() (tmdb.Batch, error) {
defer func() {
if s.batch != nil {
s.batch.Close()
}
s.batch = nil
}()
return s.batch.WriteSync()

if batch, ok := s.batch.(rollbackable.HasRollbackBatch); ok {
return batch.RollbackBatch(), s.batch.WriteSync()
} else {
return nil, s.batch.WriteSync()
}
}

func NewSafeBatchDB(db tmdb.DB) tmdb.DB {
Expand Down
25 changes: 23 additions & 2 deletions bin/v0.34.x/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/terra-money/mantlemint-provider-v0.34.x/mantlemint"
"github.com/terra-money/mantlemint-provider-v0.34.x/rpc"
"github.com/terra-money/mantlemint-provider-v0.34.x/store/rootmulti"

tmdb "github.com/tendermint/tm-db"
)

// initialize mantlemint for v0.34.x
Expand Down Expand Up @@ -142,9 +144,11 @@ func main() {
}

// flush to db; panic upon error (can't proceed)
if flushErr := batchedOrigin.Flush(); flushErr != nil {
if rollback, flushErr := batchedOrigin.Flush(); flushErr != nil {
debug.PrintStack()
panic(flushErr)
} else if rollback != nil {
rollback.Close()
}

// load initial state to mantlemint
Expand Down Expand Up @@ -208,21 +212,38 @@ func main() {
} else if cBlockFeed, blockFeedErr := blockFeed.Subscribe(0); blockFeedErr != nil {
panic(blockFeedErr)
} else {
var rollbackBatch tmdb.Batch
for {
feed := <-cBlockFeed

// open db batch
hldb.SetWriteHeight(feed.Block.Height)
batchedOrigin.Open()
if injectErr := mm.Inject(feed.Block); injectErr != nil {
// rollback last block
if rollbackBatch != nil {
fmt.Println("rollback previous block")
rollbackBatch.WriteSync()
rollbackBatch.Close()
}

debug.PrintStack()
panic(injectErr)
}

// last block is okay -> dispose rollback batch
if rollbackBatch != nil {
rollbackBatch.Close()
rollbackBatch = nil
}

// flush db batch
if flushErr := batchedOrigin.Flush(); flushErr != nil {
// returns rollback batch that reverts current block injection
if rollback, flushErr := batchedOrigin.Flush(); flushErr != nil {
debug.PrintStack()
panic(flushErr)
} else {
rollbackBatch = rollback
}

hldb.ClearWriteHeight()
Expand Down

0 comments on commit ecdf9bd

Please sign in to comment.