Skip to content

Commit

Permalink
Make trie flush interval settable via rpc method
Browse files Browse the repository at this point in the history
  • Loading branch information
s1na committed Dec 7, 2022
1 parent 9f78416 commit bea73e5
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
33 changes: 21 additions & 12 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,14 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)

// txLookupLimit is the maximum number of blocks from head whose tx indices
// are reserved:
Expand Down Expand Up @@ -259,6 +260,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
flushInterval: int64(cacheConfig.TrieTimeLimit),
triegc: prque.New(nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
Expand Down Expand Up @@ -1330,9 +1332,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory

// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval))
log.Info("Deciding to flush", "interval", flushInterval, "chosen", chosen, "time", bc.gcproc, "lastWrite", bc.lastWrite)
// If we exceeded time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
Expand All @@ -1341,8 +1344,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < bc.lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory)
if chosen < bc.lastWrite+TriesInMemory && bc.gcproc >= 2*flushInterval {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
bc.triedb.Commit(header.Root, true, nil)
Expand Down Expand Up @@ -2437,3 +2440,9 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro
bc.validator = v
bc.processor = p
}

// SetTrieFlushInterval configures how often in-memory tries are persisted to disk.
// It is thread-safe and can be called repeatedly without side effects.
func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
atomic.StoreInt64(&bc.flushInterval, int64(interval))
}
11 changes: 11 additions & 0 deletions eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,14 @@ func (api *DebugAPI) GetAccessibleState(from, to rpc.BlockNumber) (uint64, error
}
return 0, errors.New("no state found")
}

// SetTrieFlushInterval configures how often in-memory tries are persisted
// to disk. The value is
func (api *DebugAPI) SetTrieFlushInterval(interval string) error {
t, err := time.ParseDuration(interval)
if err != nil {
return err
}
api.eth.blockchain.SetTrieFlushInterval(t)
return nil
}
5 changes: 5 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,11 @@ web3._extend({
call: 'debug_dbAncients',
params: 0
}),
new web3._extend.Method({
name: 'setTrieFlushInterval',
call: 'debug_setTrieFlushInterval',
params: 1
}),
],
properties: []
});
Expand Down

0 comments on commit bea73e5

Please sign in to comment.