Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core,eth: add rpc method to trie flush frequency #24785

Merged
merged 5 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 62 additions & 52 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,14 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)

// txLookupLimit is the maximum number of blocks from head whose tx indices
// are reserved:
Expand Down Expand Up @@ -258,6 +260,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
flushInterval: int64(cacheConfig.TrieTimeLimit),
triegc: prque.New(nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
Expand Down Expand Up @@ -1248,8 +1251,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return 0, nil
}

var lastWrite uint64

// writeBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
Expand Down Expand Up @@ -1311,53 +1312,55 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false, nil)
} else {
// Full but not archive node, do proper garbage collection
bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))

if current := block.NumberU64(); current > TriesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = bc.triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory

// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
bc.triedb.Commit(header.Root, true, nil)
lastWrite = chosen
bc.gcproc = 0
}
}
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
bc.triedb.Dereference(root.(common.Hash))
}
// Full but not archive node, do proper garbage collection
bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))

current := block.NumberU64()
// Flush limits are not considered for the first TriesInMemory blocks.
if current <= TriesInMemory {
return nil
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = bc.triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory
flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval))
// If we exceeded time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval {
Comment on lines +1335 to +1337
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is large mainly due to un-indenting an if-clause (which is good). But just to verify: this change right here is the only real diff, and the heart of the change, right?

We compare the gcproc with the configurable flushInterval instead of cacheConfig.TrieTimeLimit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes see this commit: bea73e5

// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < bc.lastWrite+TriesInMemory && bc.gcproc >= 2*flushInterval {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
bc.triedb.Commit(header.Root, true, nil)
bc.lastWrite = chosen
bc.gcproc = 0
}
}
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
bc.triedb.Dereference(root.(common.Hash))
}
return nil
}

Expand Down Expand Up @@ -2436,3 +2439,10 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro
bc.validator = v
bc.processor = p
}

// SetTrieFlushInterval configures how often in-memory tries are persisted to disk.
// The interval is in terms of block processing time, not wall clock.
// It is thread-safe and can be called repeatedly without side effects.
func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
atomic.StoreInt64(&bc.flushInterval, int64(interval))
}
11 changes: 11 additions & 0 deletions eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,14 @@ func (api *DebugAPI) GetAccessibleState(from, to rpc.BlockNumber) (uint64, error
}
return 0, errors.New("no state found")
}

// SetTrieFlushInterval configures how often in-memory tries are persisted
// to disk. The value is in terms of block processing time, not wall clock.
func (api *DebugAPI) SetTrieFlushInterval(interval string) error {
t, err := time.ParseDuration(interval)
if err != nil {
return err
}
api.eth.blockchain.SetTrieFlushInterval(t)
return nil
}
5 changes: 5 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,11 @@ web3._extend({
call: 'debug_dbAncients',
params: 0
}),
new web3._extend.Method({
name: 'setTrieFlushInterval',
call: 'debug_setTrieFlushInterval',
params: 1
}),
],
properties: []
});
Expand Down