diff --git a/app/app.go b/app/app.go index 6d2a8b473d..a4961297df 100644 --- a/app/app.go +++ b/app/app.go @@ -151,7 +151,8 @@ const ( // NOTE: In the SDK, the default value is 255. AddrLen = 20 - FlagMemIAVL = "memiavl" + FlagMemIAVL = "store.memiavl" + FlagAsyncWAL = "store.async-wal" ) // this line is used by starport scaffolding # stargate/wasm/app/enabledProposals @@ -350,6 +351,7 @@ func New( // cms must be overridden before the other options, because they may use the cms, // FIXME we are assuming the cms won't be overridden by the other options, but we can't be sure. cms := rootmulti.NewStore(filepath.Join(homePath, "data", "memiavl.db"), logger) + cms.SetAsyncWAL(cast.ToBool(appOpts.Get(FlagAsyncWAL))) baseAppOptions = append([]func(*baseapp.BaseApp){setCMS(cms)}, baseAppOptions...) } diff --git a/integration_tests/configs/default.jsonnet b/integration_tests/configs/default.jsonnet index 00b82ae91b..9b763c954b 100644 --- a/integration_tests/configs/default.jsonnet +++ b/integration_tests/configs/default.jsonnet @@ -27,8 +27,8 @@ staked: '1000000000000000000stake', mnemonic: '${VALIDATOR1_MNEMONIC}', 'app-config': { - memiavl: true, store: { + memiavl: true, streamers: ['versiondb'], }, }, diff --git a/memiavl/db.go b/memiavl/db.go index 412a9a5254..8061ca5063 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -38,7 +38,11 @@ type DB struct { pruneSnapshotLock sync.Mutex // invariant: the LastIndex always match the current version of MultiTree - wal *wal.Log + wal *wal.Log + walChan chan *walEntry + walQuit chan error + + // pending store upgrades, will be written into WAL in next Commit call pendingUpgrades []*TreeNameUpgrade // The assumptions to concurrency: @@ -59,6 +63,8 @@ type Options struct { SnapshotKeepRecent uint32 // load the target version instead of latest version TargetVersion uint32 + // Write WAL asynchronously, it's ok in blockchain case because we can always replay the raw blocks. + AsyncWAL bool } const ( @@ -89,10 +95,36 @@ func Load(dir string, opts Options) (*DB, error) { return nil, err } + var ( + walChan chan *walEntry + walQuit chan error + ) + if opts.AsyncWAL { + walChan = make(chan *walEntry, 100) + walQuit = make(chan error) + go func() { + defer close(walQuit) + + for entry := range walChan { + bz, err := entry.data.Marshal() + if err != nil { + walQuit <- err + return + } + if err := wal.Write(entry.index, bz); err != nil { + walQuit <- err + return + } + } + }() + } + db := &DB{ MultiTree: *mtree, dir: dir, wal: wal, + walChan: walChan, + walQuit: walQuit, snapshotKeepRecent: opts.SnapshotKeepRecent, } @@ -142,13 +174,28 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error { return nil } -// cleanupSnapshotRewrite cleans up the old snapshots and switches to a new multitree -// if a snapshot rewrite is in progress. It returns true if a snapshot rewrite has completed -// and false otherwise, along with any error encountered during the cleanup process. -func (db *DB) cleanupSnapshotRewrite() (bool, error) { - if db.snapshotRewriteChan == nil { - return false, nil +// checkAsyncTasks checks the status of background tasks non-blocking-ly and process the result +func (db *DB) checkAsyncTasks() error { + return errors.Join( + db.checkAsyncWAL(), + db.checkBackgroundSnapshotRewrite(), + ) +} + +// checkAsyncWAL check the quit signal of async wal writing +func (db *DB) checkAsyncWAL() error { + select { + case err := <-db.walQuit: + // async wal writing failed, we need to abort the state machine + return fmt.Errorf("async wal writing goroutine quit unexpectedly: %w", err) + default: } + + return nil +} + +// checkBackgroundSnapshotRewrite check the result of background snapshot rewrite, cleans up the old snapshots and switches to a new multitree +func (db *DB) checkBackgroundSnapshotRewrite() error { // check the completeness of background snapshot rewriting select { case result := <-db.snapshotRewriteChan: @@ -156,15 +203,15 @@ func (db *DB) cleanupSnapshotRewrite() (bool, error) { if result.mtree == nil { // background snapshot rewrite failed - return true, fmt.Errorf("background snapshot rewriting failed: %w", result.err) + return fmt.Errorf("background snapshot rewriting failed: %w", result.err) } // snapshot rewrite succeeded, catchup and switch if err := result.mtree.CatchupWAL(db.wal, 0); err != nil { - return true, fmt.Errorf("catchup failed: %w", err) + return fmt.Errorf("catchup failed: %w", err) } if err := db.reloadMultiTree(result.mtree); err != nil { - return true, fmt.Errorf("switch multitree failed: %w", err) + return fmt.Errorf("switch multitree failed: %w", err) } // prune the old snapshots // wait until last prune finish @@ -176,12 +223,12 @@ func (db *DB) cleanupSnapshotRewrite() (bool, error) { if err == nil { for _, entry := range entries { if entry.IsDir() && strings.HasPrefix(entry.Name(), SnapshotPrefix) { - currentVersion, err := strconv.ParseUint(strings.TrimPrefix(entry.Name(), SnapshotPrefix), 10, 32) + currentVersion, err := strconv.ParseInt(strings.TrimPrefix(entry.Name(), SnapshotPrefix), 10, 32) if err != nil { fmt.Printf("failed when parse current version: %s\n", err) continue } - if result.version-uint32(currentVersion) > db.snapshotKeepRecent { + if result.mtree.metadata.CommitInfo.Version-currentVersion > int64(db.snapshotKeepRecent) { fullPath := filepath.Join(db.dir, entry.Name()) if err := os.RemoveAll(fullPath); err != nil { fmt.Printf("failed when remove old snapshot: %s\n", err) @@ -191,11 +238,11 @@ func (db *DB) cleanupSnapshotRewrite() (bool, error) { } } }() - return true, nil + return nil default: } - return false, nil + return nil } // Commit wraps `MultiTree.ApplyChangeSet` to add some db level operations: @@ -205,7 +252,7 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) { db.mtx.Lock() defer db.mtx.Unlock() - if _, err := db.cleanupSnapshotRewrite(); err != nil { + if err := db.checkAsyncTasks(); err != nil { return nil, 0, err } @@ -216,16 +263,21 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) { if db.wal != nil { // write write-ahead-log - entry := WALEntry{ + entry := walEntry{index: walIndex(v, db.initialVersion), data: &WALEntry{ Changesets: changeSets, Upgrades: db.pendingUpgrades, - } - bz, err := entry.Marshal() - if err != nil { - return nil, 0, err - } - if err := db.wal.Write(walIndex(v, db.initialVersion), bz); err != nil { - return nil, 0, err + }} + if db.walChan != nil { + // async wal writing + db.walChan <- &entry + } else { + bz, err := entry.data.Marshal() + if err != nil { + return nil, 0, err + } + if err := db.wal.Write(entry.index, bz); err != nil { + return nil, 0, err + } } } @@ -345,7 +397,16 @@ func (db *DB) Close() error { db.mtx.Lock() defer db.mtx.Unlock() - return errors.Join(db.MultiTree.Close(), db.wal.Close()) + var walErr error + if db.walChan != nil { + close(db.walChan) + walErr = <-db.walQuit + + db.walChan = nil + db.walQuit = nil + } + + return errors.Join(db.MultiTree.Close(), db.wal.Close(), walErr) } // TreeByName wraps MultiTree.TreeByName to add a lock. @@ -452,3 +513,8 @@ func updateCurrentSymlink(dir, snapshot string) error { // assuming file renaming operation is atomic return os.Rename(tmpPath, currentPath(dir)) } + +type walEntry struct { + index uint64 + data *WALEntry +} diff --git a/memiavl/db_test.go b/memiavl/db_test.go index f69797e7a2..850d793653 100644 --- a/memiavl/db_test.go +++ b/memiavl/db_test.go @@ -4,8 +4,8 @@ import ( "encoding/hex" "os" "strconv" - "strings" "testing" + "time" "github.com/cosmos/iavl" "github.com/stretchr/testify/require" @@ -40,7 +40,7 @@ func TestRewriteSnapshotBackground(t *testing.T) { db, err := Load(t.TempDir(), Options{ CreateIfMissing: true, InitialStores: []string{"test"}, - SnapshotKeepRecent: 1, + SnapshotKeepRecent: 0, // only a single snapshot is kept }) require.NoError(t, err) @@ -56,13 +56,12 @@ func TestRewriteSnapshotBackground(t *testing.T) { require.Equal(t, i+1, int(v)) require.Equal(t, RefHashes[i], db.lastCommitInfo.StoreInfos[0].CommitId.Hash) - err = db.RewriteSnapshotBackground() - require.NoError(t, err) - for { - if cleaned, _ := db.cleanupSnapshotRewrite(); cleaned { - break - } - } + _ = db.RewriteSnapshotBackground() + time.Sleep(time.Millisecond * 20) + } + + for db.snapshotRewriteChan != nil { + require.NoError(t, db.checkAsyncTasks()) } db.pruneSnapshotLock.Lock() @@ -70,15 +69,9 @@ func TestRewriteSnapshotBackground(t *testing.T) { entries, err := os.ReadDir(db.dir) require.NoError(t, err) - version := uint64(db.lastCommitInfo.Version) - for _, entry := range entries { - if entry.IsDir() && strings.HasPrefix(entry.Name(), SnapshotPrefix) { - currentVersion, err := strconv.ParseUint(strings.TrimPrefix(entry.Name(), SnapshotPrefix), 10, 32) - require.NoError(t, err) - require.GreaterOrEqual(t, currentVersion, version-uint64(db.snapshotKeepRecent)) - require.LessOrEqual(t, currentVersion, version) - } - } + + // three files: snapshot, current link, wal + require.Equal(t, 3, len(entries)) } func TestWAL(t *testing.T) { diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index 8dc86b286e..b0b5e9c43e 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -49,6 +49,8 @@ type Store struct { listeners map[types.StoreKey][]types.WriteListener interBlockCache types.MultiStorePersistentCache + + asyncWAL bool } func NewStore(dir string, logger log.Logger) *Store { @@ -268,6 +270,7 @@ func (rs *Store) LoadVersionAndUpgrade(version int64, upgrades *types.StoreUpgra CreateIfMissing: true, InitialStores: initialStores, TargetVersion: uint32(version), + AsyncWAL: rs.asyncWAL, }) if err != nil { return errors.Wrapf(err, "fail to load memiavl at %s", rs.dir) @@ -381,6 +384,10 @@ func (rs *Store) SetIAVLDisableFastNode(disable bool) { func (rs *Store) SetLazyLoading(lazyLoading bool) { } +func (rs *Store) SetAsyncWAL(async bool) { + rs.asyncWAL = async +} + // Implements interface CommitMultiStore func (rs *Store) RollbackToVersion(version int64) error { return stderrors.New("rootmulti store don't support rollback")