Skip to content

Commit

Permalink
Problem: memiavl api don't work with FinalizeBlock
Browse files Browse the repository at this point in the history
FinalizeBlock in abci 2.0 need the WorkingHash API.

Solution:
- redesign the internals APIs to support WorkingHash.
  instead of exposing a single `Commit` API, now we exposes separate APIs for `ApplyChangeSets` and `Commit`.
  • Loading branch information
yihuang committed Aug 31, 2023
1 parent 05b31c6 commit 51a49c5
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 120 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/cosmos/cosmos-sdk v0.46.15-0.20230807104542-537257060180
github.com/cosmos/gogoproto v1.4.8
github.com/cosmos/ibc-go/v6 v6.2.0
github.com/crypto-org-chain/cronos/store v0.0.2
github.com/crypto-org-chain/cronos/store v0.0.4
github.com/crypto-org-chain/cronos/versiondb v0.0.0-00010101000000-000000000000
github.com/ethereum/go-ethereum v1.10.26
github.com/evmos/ethermint v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -77,7 +77,7 @@ require (
github.com/cosmos/ibc-go/v5 v5.2.1 // indirect
github.com/cosmos/ledger-cosmos-go v0.12.2 // indirect
github.com/creachadair/taskgroup v0.3.2 // indirect
github.com/crypto-org-chain/cronos/memiavl v0.0.3 // indirect
github.com/crypto-org-chain/cronos/memiavl v0.0.4 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
Expand Down
143 changes: 107 additions & 36 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"

storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/iavl"
"github.com/tendermint/tendermint/libs/log"
"github.com/tidwall/wal"
)
Expand Down Expand Up @@ -63,8 +65,8 @@ type DB struct {
walChan chan *walEntry
walQuit chan error

// pending store upgrades, will be written into WAL in next Commit call
pendingUpgrades []*TreeNameUpgrade
// pending changes, will be written into WAL in next Commit call
pendingLog WALEntry

// The assumptions to concurrency:
// - The methods on DB are protected by a mutex
Expand Down Expand Up @@ -281,19 +283,23 @@ func (db *DB) SetInitialVersion(initialVersion int64) error {
db.mtx.Lock()
defer db.mtx.Unlock()

if err := db.MultiTree.SetInitialVersion(initialVersion); err != nil {
return err
if db.readOnly {
return errReadOnly
}

if err := initEmptyDB(db.dir, db.initialVersion); err != nil {
if db.lastCommitInfo.Version > 0 {
return errors.New("initial version can only be set before any commit")
}

if err := db.MultiTree.SetInitialVersion(initialVersion); err != nil {
return err
}

return db.reload()
return initEmptyDB(db.dir, db.initialVersion)
}

// ApplyUpgrades wraps MultiTree.ApplyUpgrades, it also append the upgrades in a temporary field,
// and include in the WAL entry in next Commit call.
// ApplyUpgrades wraps MultiTree.ApplyUpgrades, it also append the upgrades in a pending log,
// which will be persisted to the WAL in next Commit call.
func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
db.mtx.Lock()
defer db.mtx.Unlock()
Expand All @@ -306,7 +312,74 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
return err
}

db.pendingUpgrades = append(db.pendingUpgrades, upgrades...)
db.pendingLog.Upgrades = append(db.pendingLog.Upgrades, upgrades...)
return nil
}

// ApplyChangeSets wraps MultiTree.ApplyChangeSets, it also append the changesets in the pending log,
// which will be persisted to the WAL in next Commit call.
func (db *DB) ApplyChangeSets(changeSets []*NamedChangeSet) error {
if len(changeSets) == 0 {
return nil
}

db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
return errReadOnly
}

if err := db.MultiTree.ApplyChangeSets(changeSets); err != nil {
return err
}

if len(db.pendingLog.Changesets) == 0 {
db.pendingLog.Changesets = changeSets
} else {
panic("TODO")
// TODO merge change sets
}

return nil
}

// ApplyChangeSet wraps MultiTree.ApplyChangeSet, it also append the changesets in the pending log,
// which will be persisted to the WAL in next Commit call.
func (db *DB) ApplyChangeSet(name string, changeSet iavl.ChangeSet) error {
if len(changeSet.Pairs) == 0 {
return nil
}

db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
return errReadOnly
}

if err := db.MultiTree.ApplyChangeSet(name, changeSet); err != nil {
return err
}

merged := false
for _, cs := range db.pendingLog.Changesets {
if cs.Name == name {
cs.Changeset.Pairs = append(cs.Changeset.Pairs, changeSet.Pairs...)
merged = true
break
}
}
if !merged {
db.pendingLog.Changesets = append(db.pendingLog.Changesets, &NamedChangeSet{
Name: name,
Changeset: changeSet,
})
sort.SliceStable(db.pendingLog.Changesets, func(i, j int) bool {
return db.pendingLog.Changesets[i].Name < db.pendingLog.Changesets[j].Name
})
}

return nil
}

Expand Down Expand Up @@ -441,32 +514,23 @@ func (db *DB) pruneSnapshots() {
}()
}

// Commit wraps `MultiTree.ApplyChangeSet` to add some db level operations:
// - manage background snapshot rewriting
// - write WAL
func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
// Commit wraps SaveVersion to bump the version and writes the pending changes into log files to persist on disk
func (db *DB) Commit() ([]byte, int64, error) {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
return nil, 0, errReadOnly
}

if err := db.checkAsyncTasks(); err != nil {
return nil, 0, err
}

hash, v, err := db.MultiTree.ApplyChangeSet(changeSets, true)
hash, v, err := db.MultiTree.SaveVersion(true)
if err != nil {
return nil, 0, err
}

// write logs if enabled
if db.wal != nil {
// write write-ahead-log
entry := walEntry{index: walIndex(v, db.initialVersion), data: &WALEntry{
Changesets: changeSets,
Upgrades: db.pendingUpgrades,
}}
entry := walEntry{index: walIndex(v, db.initialVersion), data: db.pendingLog}
if db.walChanSize >= 0 {
if db.walChan == nil {
db.initAsyncCommit()
Expand All @@ -485,8 +549,11 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
}
}

db.pendingUpgrades = db.pendingUpgrades[:0]
db.pendingLog = WALEntry{}

if err := db.checkAsyncTasks(); err != nil {
return nil, 0, err
}
db.rewriteIfApplicable(v)

return hash, v, nil
Expand Down Expand Up @@ -607,14 +674,8 @@ func (db *DB) reloadMultiTree(mtree *MultiTree) error {
}

db.MultiTree = *mtree

if len(db.pendingUpgrades) > 0 {
if err := db.MultiTree.ApplyUpgrades(db.pendingUpgrades); err != nil {
return err
}
}

return nil
// catch-up the pending changes
return db.MultiTree.applyWALEntry(db.pendingLog)
}

// rewriteIfApplicable execute the snapshot rewrite strategy according to current height
Expand Down Expand Up @@ -734,23 +795,33 @@ func (db *DB) LastCommitInfo() *storetypes.CommitInfo {
return db.MultiTree.LastCommitInfo()
}

// ApplyChangeSet wraps MultiTree.ApplyChangeSet to add a lock.
func (db *DB) ApplyChangeSet(changeSets []*NamedChangeSet, updateCommitInfo bool) ([]byte, int64, error) {
func (db *DB) SaveVersion(updateCommitInfo bool) ([]byte, int64, error) {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
return nil, 0, errReadOnly
}

return db.MultiTree.ApplyChangeSet(changeSets, updateCommitInfo)
return db.MultiTree.SaveVersion(updateCommitInfo)
}

func (db *DB) WorkingHash() []byte {
db.mtx.Lock()
defer db.mtx.Unlock()

return db.MultiTree.WorkingHash()
}

// UpdateCommitInfo wraps MultiTree.UpdateCommitInfo to add a lock.
func (db *DB) UpdateCommitInfo() []byte {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
panic("can't update commit info in read-only mode")
}

return db.MultiTree.UpdateCommitInfo()
}

Expand Down Expand Up @@ -936,7 +1007,7 @@ func createDBIfNotExist(dir string, initialVersion uint32) error {

type walEntry struct {
index uint64
data *WALEntry
data WALEntry
}

func isSnapshotName(name string) bool {
Expand Down
Loading

0 comments on commit 51a49c5

Please sign in to comment.