Skip to content

Commit

Permalink
Problem: wal write is heavy in block replaying (#1004)
Browse files Browse the repository at this point in the history
* Problem: wal write is heavy in block replaying

Solution:
- introduce `store.async-wal` option to write wal asynchronously
- rename the `memiavl` flag to `store.memiavl`.

fix unit test

* rename

* do the marshaling in goroutine
  • Loading branch information
yihuang authored Apr 25, 2023
1 parent 73af001 commit b262fa8
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 44 deletions.
4 changes: 3 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ const (
// NOTE: In the SDK, the default value is 255.
AddrLen = 20

FlagMemIAVL = "memiavl"
FlagMemIAVL = "store.memiavl"
FlagAsyncWAL = "store.async-wal"
)

// this line is used by starport scaffolding # stargate/wasm/app/enabledProposals
Expand Down Expand Up @@ -350,6 +351,7 @@ func New(
// cms must be overridden before the other options, because they may use the cms,
// FIXME we are assuming the cms won't be overridden by the other options, but we can't be sure.
cms := rootmulti.NewStore(filepath.Join(homePath, "data", "memiavl.db"), logger)
cms.SetAsyncWAL(cast.ToBool(appOpts.Get(FlagAsyncWAL)))
baseAppOptions = append([]func(*baseapp.BaseApp){setCMS(cms)}, baseAppOptions...)
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/configs/default.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
staked: '1000000000000000000stake',
mnemonic: '${VALIDATOR1_MNEMONIC}',
'app-config': {
memiavl: true,
store: {
memiavl: true,
streamers: ['versiondb'],
},
},
Expand Down
114 changes: 90 additions & 24 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ type DB struct {
pruneSnapshotLock sync.Mutex

// invariant: the LastIndex always match the current version of MultiTree
wal *wal.Log
wal *wal.Log
walChan chan *walEntry
walQuit chan error

// pending store upgrades, will be written into WAL in next Commit call
pendingUpgrades []*TreeNameUpgrade

// The assumptions to concurrency:
Expand All @@ -59,6 +63,8 @@ type Options struct {
SnapshotKeepRecent uint32
// load the target version instead of latest version
TargetVersion uint32
// Write WAL asynchronously, it's ok in blockchain case because we can always replay the raw blocks.
AsyncWAL bool
}

const (
Expand Down Expand Up @@ -89,10 +95,36 @@ func Load(dir string, opts Options) (*DB, error) {
return nil, err
}

var (
walChan chan *walEntry
walQuit chan error
)
if opts.AsyncWAL {
walChan = make(chan *walEntry, 100)
walQuit = make(chan error)
go func() {
defer close(walQuit)

for entry := range walChan {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
}
if err := wal.Write(entry.index, bz); err != nil {
walQuit <- err
return
}
}
}()
}

db := &DB{
MultiTree: *mtree,
dir: dir,
wal: wal,
walChan: walChan,
walQuit: walQuit,
snapshotKeepRecent: opts.SnapshotKeepRecent,
}

Expand Down Expand Up @@ -142,29 +174,44 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
return nil
}

// cleanupSnapshotRewrite cleans up the old snapshots and switches to a new multitree
// if a snapshot rewrite is in progress. It returns true if a snapshot rewrite has completed
// and false otherwise, along with any error encountered during the cleanup process.
func (db *DB) cleanupSnapshotRewrite() (bool, error) {
if db.snapshotRewriteChan == nil {
return false, nil
// checkAsyncTasks checks the status of background tasks non-blocking-ly and process the result
func (db *DB) checkAsyncTasks() error {
return errors.Join(
db.checkAsyncWAL(),
db.checkBackgroundSnapshotRewrite(),
)
}

// checkAsyncWAL check the quit signal of async wal writing
func (db *DB) checkAsyncWAL() error {
select {
case err := <-db.walQuit:
// async wal writing failed, we need to abort the state machine
return fmt.Errorf("async wal writing goroutine quit unexpectedly: %w", err)
default:
}

return nil
}

// checkBackgroundSnapshotRewrite check the result of background snapshot rewrite, cleans up the old snapshots and switches to a new multitree
func (db *DB) checkBackgroundSnapshotRewrite() error {
// check the completeness of background snapshot rewriting
select {
case result := <-db.snapshotRewriteChan:
db.snapshotRewriteChan = nil

if result.mtree == nil {
// background snapshot rewrite failed
return true, fmt.Errorf("background snapshot rewriting failed: %w", result.err)
return fmt.Errorf("background snapshot rewriting failed: %w", result.err)
}

// snapshot rewrite succeeded, catchup and switch
if err := result.mtree.CatchupWAL(db.wal, 0); err != nil {
return true, fmt.Errorf("catchup failed: %w", err)
return fmt.Errorf("catchup failed: %w", err)
}
if err := db.reloadMultiTree(result.mtree); err != nil {
return true, fmt.Errorf("switch multitree failed: %w", err)
return fmt.Errorf("switch multitree failed: %w", err)
}
// prune the old snapshots
// wait until last prune finish
Expand All @@ -176,12 +223,12 @@ func (db *DB) cleanupSnapshotRewrite() (bool, error) {
if err == nil {
for _, entry := range entries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), SnapshotPrefix) {
currentVersion, err := strconv.ParseUint(strings.TrimPrefix(entry.Name(), SnapshotPrefix), 10, 32)
currentVersion, err := strconv.ParseInt(strings.TrimPrefix(entry.Name(), SnapshotPrefix), 10, 32)
if err != nil {
fmt.Printf("failed when parse current version: %s\n", err)
continue
}
if result.version-uint32(currentVersion) > db.snapshotKeepRecent {
if result.mtree.metadata.CommitInfo.Version-currentVersion > int64(db.snapshotKeepRecent) {
fullPath := filepath.Join(db.dir, entry.Name())
if err := os.RemoveAll(fullPath); err != nil {
fmt.Printf("failed when remove old snapshot: %s\n", err)
Expand All @@ -191,11 +238,11 @@ func (db *DB) cleanupSnapshotRewrite() (bool, error) {
}
}
}()
return true, nil
return nil

default:
}
return false, nil
return nil
}

// Commit wraps `MultiTree.ApplyChangeSet` to add some db level operations:
Expand All @@ -205,7 +252,7 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
db.mtx.Lock()
defer db.mtx.Unlock()

if _, err := db.cleanupSnapshotRewrite(); err != nil {
if err := db.checkAsyncTasks(); err != nil {
return nil, 0, err
}

Expand All @@ -216,16 +263,21 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {

if db.wal != nil {
// write write-ahead-log
entry := WALEntry{
entry := walEntry{index: walIndex(v, db.initialVersion), data: &WALEntry{
Changesets: changeSets,
Upgrades: db.pendingUpgrades,
}
bz, err := entry.Marshal()
if err != nil {
return nil, 0, err
}
if err := db.wal.Write(walIndex(v, db.initialVersion), bz); err != nil {
return nil, 0, err
}}
if db.walChan != nil {
// async wal writing
db.walChan <- &entry
} else {
bz, err := entry.data.Marshal()
if err != nil {
return nil, 0, err
}
if err := db.wal.Write(entry.index, bz); err != nil {
return nil, 0, err
}
}
}

Expand Down Expand Up @@ -345,7 +397,16 @@ func (db *DB) Close() error {
db.mtx.Lock()
defer db.mtx.Unlock()

return errors.Join(db.MultiTree.Close(), db.wal.Close())
var walErr error
if db.walChan != nil {
close(db.walChan)
walErr = <-db.walQuit

db.walChan = nil
db.walQuit = nil
}

return errors.Join(db.MultiTree.Close(), db.wal.Close(), walErr)
}

// TreeByName wraps MultiTree.TreeByName to add a lock.
Expand Down Expand Up @@ -452,3 +513,8 @@ func updateCurrentSymlink(dir, snapshot string) error {
// assuming file renaming operation is atomic
return os.Rename(tmpPath, currentPath(dir))
}

type walEntry struct {
index uint64
data *WALEntry
}
29 changes: 11 additions & 18 deletions memiavl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"encoding/hex"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/cosmos/iavl"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -40,7 +40,7 @@ func TestRewriteSnapshotBackground(t *testing.T) {
db, err := Load(t.TempDir(), Options{
CreateIfMissing: true,
InitialStores: []string{"test"},
SnapshotKeepRecent: 1,
SnapshotKeepRecent: 0, // only a single snapshot is kept
})
require.NoError(t, err)

Expand All @@ -56,29 +56,22 @@ func TestRewriteSnapshotBackground(t *testing.T) {
require.Equal(t, i+1, int(v))
require.Equal(t, RefHashes[i], db.lastCommitInfo.StoreInfos[0].CommitId.Hash)

err = db.RewriteSnapshotBackground()
require.NoError(t, err)
for {
if cleaned, _ := db.cleanupSnapshotRewrite(); cleaned {
break
}
}
_ = db.RewriteSnapshotBackground()
time.Sleep(time.Millisecond * 20)
}

for db.snapshotRewriteChan != nil {
require.NoError(t, db.checkAsyncTasks())
}

db.pruneSnapshotLock.Lock()
defer db.pruneSnapshotLock.Unlock()

entries, err := os.ReadDir(db.dir)
require.NoError(t, err)
version := uint64(db.lastCommitInfo.Version)
for _, entry := range entries {
if entry.IsDir() && strings.HasPrefix(entry.Name(), SnapshotPrefix) {
currentVersion, err := strconv.ParseUint(strings.TrimPrefix(entry.Name(), SnapshotPrefix), 10, 32)
require.NoError(t, err)
require.GreaterOrEqual(t, currentVersion, version-uint64(db.snapshotKeepRecent))
require.LessOrEqual(t, currentVersion, version)
}
}

// three files: snapshot, current link, wal
require.Equal(t, 3, len(entries))
}

func TestWAL(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Store struct {
listeners map[types.StoreKey][]types.WriteListener

interBlockCache types.MultiStorePersistentCache

asyncWAL bool
}

func NewStore(dir string, logger log.Logger) *Store {
Expand Down Expand Up @@ -268,6 +270,7 @@ func (rs *Store) LoadVersionAndUpgrade(version int64, upgrades *types.StoreUpgra
CreateIfMissing: true,
InitialStores: initialStores,
TargetVersion: uint32(version),
AsyncWAL: rs.asyncWAL,
})
if err != nil {
return errors.Wrapf(err, "fail to load memiavl at %s", rs.dir)
Expand Down Expand Up @@ -381,6 +384,10 @@ func (rs *Store) SetIAVLDisableFastNode(disable bool) {
func (rs *Store) SetLazyLoading(lazyLoading bool) {
}

func (rs *Store) SetAsyncWAL(async bool) {
rs.asyncWAL = async
}

// Implements interface CommitMultiStore
func (rs *Store) RollbackToVersion(version int64) error {
return stderrors.New("rootmulti store don't support rollback")
Expand Down

0 comments on commit b262fa8

Please sign in to comment.