Skip to content

Commit

Permalink
fixes: genesis TD, triedb writes, snapshot initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
darioush committed Oct 24, 2024
1 parent 6da51cf commit 25f4f0f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 203 deletions.
51 changes: 27 additions & 24 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ type cacheConfig struct {

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it

// ADDED
SnapshotVerify bool // Whether to verify the snapshot on startup
SnapshotDelayInit bool // Whether to delay the snapshot initialization
TrieRefCountingDisabled bool // Whether to disable trie node reference counting (i.e., it will be handled externally)
}

// defaultCacheConfig are the default caching values if none are specified by the
Expand Down Expand Up @@ -422,25 +427,8 @@ func newBlockChain(db ethdb.Database, triedb *triedb.Database, cacheConfig *cach
}

// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
// var recover bool

head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer)
// recover = true
}
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
// Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Hash(), head.Root)
if !bc.cacheConfig.SnapshotDelayInit {
bc.initSnapshot(bc.CurrentBlock())
}

// Start future block processor.
Expand Down Expand Up @@ -922,7 +910,7 @@ func (bc *blockChain) writeHeadBlock(block *types.Block) {
rawdb.WriteHeadHeaderHash(batch, block.Hash())
rawdb.WriteHeadFastBlockHash(batch, block.Hash())
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
rawdb.WriteTxLookupEntriesByBlock(batch, block)
// rawdb.WriteTxLookupEntriesByBlock(batch, block)
rawdb.WriteHeadBlockHash(batch, block.Hash())

// Flush the whole batch into the disk, exit the node if failed
Expand Down Expand Up @@ -1364,7 +1352,7 @@ func (bc *blockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
if bc.triedb.Scheme() == rawdb.PathScheme || bc.cacheConfig.TrieRefCountingDisabled {
return nil
}
// If we're running an archive node, always flush
Expand Down Expand Up @@ -1534,6 +1522,12 @@ func (bc *blockChain) InsertChain(chain types.Blocks) (int, error) {
return bc.insertChain(chain, true)
}

type InsertOption int

const (
NoWrites InsertOption = iota
)

// insertChain is the internal implementation of InsertChain, which assumes that
// 1) chains are contiguous, and 2) The chain mutex is held.
//
Expand All @@ -1542,7 +1536,7 @@ func (bc *blockChain) InsertChain(chain types.Blocks) (int, error) {
// racey behaviour. If a sidechain import is in progress, and the historic state
// is imported, but then new canon-head is added before the actual sidechain
// completes, then the historic state could be pruned again
func (bc *blockChain) insertChain(chain types.Blocks, setHead bool) (int, error) {
func (bc *blockChain) insertChain(chain types.Blocks, setHead bool, opts ...InsertOption) (int, error) {
// If the chain is terminating, don't even bother starting up.
if bc.insertStopped() {
return 0, nil
Expand Down Expand Up @@ -1793,9 +1787,18 @@ func (bc *blockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// Write the block to the chain and get the status.
var (
wstart = time.Now()
status WriteStatus
wstart = time.Now()
status WriteStatus
skipWrites = false
)
for _, opt := range opts {
if opt == NoWrites {
skipWrites = true
}
}
if skipWrites {
continue
}
if !setHead {
// Don't set the head, only insert the block
err = bc.writeBlockWithState(block, receipts, statedb)
Expand Down
187 changes: 44 additions & 143 deletions core/blockchain_ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,18 @@ func (c *CacheConfig) cacheConfig() *cacheConfig {
TrieDirtyDisabled: defaultCacheConfig.TrieDirtyDisabled,
TrieTimeLimit: defaultCacheConfig.TrieTimeLimit,

// XXX: blockChain shouldn't initialize snapshot while block processing
// is still handled in BlockChain. Otherwise both may try to initialize snapshots.
SnapshotLimit: 0,

Preimages: c.Preimages,
StateHistory: c.StateHistory,
StateScheme: c.StateScheme,

SnapshotNoBuild: c.SnapshotNoBuild,
SnapshotLimit: c.SnapshotLimit,
SnapshotWait: c.SnapshotWait,

// ADDED
SnapshotVerify: c.SnapshotVerify,
SnapshotDelayInit: true, // Always initialize snapshots in blockchain_ext.go if they are enabled
TrieRefCountingDisabled: true, // Always disable trie ref counting in blockchain_ext.go
}
}

Expand Down Expand Up @@ -207,19 +209,12 @@ type BlockChain struct {
cacheConfig *CacheConfig // Cache configuration for pruning

db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triedb *triedb.Database // The database handler for maintaining trie nodes.
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
stateManager TrieWriter

rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
chainAcceptedFeed event.Feed
logsFeed event.Feed
logsAcceptedFeed event.Feed
blockProcFeed event.Feed
txAcceptedFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
Expand Down Expand Up @@ -1202,9 +1197,6 @@ func (bc *BlockChain) InsertBlock(block *types.Block) error {
}

func (bc *BlockChain) InsertBlockManual(block *types.Block, writes bool) error {
bc.blockProcFeed.Send(true)
defer bc.blockProcFeed.Send(false)

bc.chainmu.Lock()
err := bc.insertBlock(block, writes)
bc.chainmu.Unlock()
Expand All @@ -1213,146 +1205,54 @@ func (bc *BlockChain) InsertBlockManual(block *types.Block, writes bool) error {
}

func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
start := time.Now()
bc.senderCacher.Recover(types.MakeSigner(bc.chainConfig, block.Number(), block.Time()), block.Transactions())

substart := time.Now()
err := bc.engine.VerifyHeader(bc, block.Header())
if err == nil {
err = bc.validator.ValidateBody(block)
}

switch {
case errors.Is(err, ErrKnownBlock):
// even if the block is already known, we still need to generate the
// snapshot layer and add a reference to the triedb, so we re-execute
// the block. Note that insertBlock should only be called on a block
// once if it returns nil
if bc.newTip(block) {
log.Debug("Setting head to be known block", "number", block.Number(), "hash", block.Hash())
} else {
log.Debug("Reprocessing already known block", "number", block.Number(), "hash", block.Hash())
}

// If an ancestor has been pruned, then this block cannot be acceptable.
case errors.Is(err, consensus.ErrPrunedAncestor):
return errors.New("side chain insertion is not supported")

// Future blocks are not supported, but should not be reported, so we return an error
// early here
case errors.Is(err, consensus.ErrFutureBlock):
return errFutureBlockUnsupported

// Some other error occurred, abort
case err != nil:
bc.reportBlock(block, nil, err)
return err
var opts []InsertOption
if !writes {
opts = append(opts, NoWrites)
}
blockContentValidationTimer.Inc(time.Since(substart).Milliseconds())

// No validation errors for the block
var activeState *state.StateDB
defer func() {
// The chain importer is starting and stopping trie prefetchers. If a bad
// block or other error is hit however, an early return may not properly
// terminate the background threads. This defer ensures that we clean up
// and dangling prefetcher, without deferring each and holding on live refs.
if activeState != nil {
activeState.StopPrefetcher()
}
}()

// Retrieve the parent block to determine which root to build state on
substart = time.Now()
parent := bc.GetHeader(block.ParentHash(), block.NumberU64()-1)

// Instantiate the statedb to use for processing transactions
//
// NOTE: Flattening a snapshot during block execution requires fetching state
// entries directly from the trie (much slower).
bc.flattenLock.Lock()
defer bc.flattenLock.Unlock()
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
// If snapshots are enabled, WithBlockHashes must be called as snapshot layers
// are stored by block hash.
if bc.snaps != nil {
bc.snaps.WithBlockHashes(block.Hash(), block.ParentHash())
}
_, err := bc.blockChain.insertChain([]*types.Block{block}, true, opts...)
if err != nil {
return err
}
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
activeState = statedb

// Process block using the parent state as reference point
pstart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, parent, statedb, bc.vmConfig)
if serr := statedb.Error(); serr != nil {
log.Error("statedb error encountered", "err", serr, "number", block.Number(), "hash", block.Hash())
parent := bc.GetHeaderByHash(block.ParentHash())
// Upstream does not perform a snapshot update if the root is the same as the
// parent root, however here the snapshots are based on the block hash, so
// this update is necessary.
if bc.snaps != nil && block.Root() == parent.Root {
if err := bc.snaps.Update(block.Root(), parent.Root, nil, nil, nil); err != nil {
return err
}
}
if err != nil {
bc.reportBlock(block, receipts, err)
return err
// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
return nil
}
ptime := time.Since(pstart)

// Validate the state using the default validator
vstart := time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
return err
}
vtime := time.Since(vstart)

// Update the metrics touched during block processing and validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete(in processing)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete(in processing)
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
triehash := statedb.AccountHashes + statedb.StorageHashes // The time spent on tries hashing
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
trieRead := statedb.SnapshotAccountReads + statedb.AccountReads // The time spent on account read
trieRead += statedb.SnapshotStorageReads + statedb.StorageReads // The time spent on storage read
blockExecutionTimer.Update((ptime - trieRead)) // The time spent on EVM processing
blockValidationTimer.Update((vtime - (triehash + trieUpdate))) // The time spent on block validation
blockTrieOpsTimer.Inc((triehash + trieUpdate + trieRead).Milliseconds()) // The time spent on trie operations

// If [writes] are disabled, skip [writeBlockWithState] so that we do not write the block
// or the state trie to disk.
// Note: in pruning mode, this prevents us from generating a reference to the state root.
if !writes {
// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
if !writes || bc.triedb.Scheme() == rawdb.PathScheme {
return nil
}

// Write the block to the chain and get the status.
// writeBlockWithState (called within writeBlockAndSethead) creates a reference that
// will be cleaned up in Accept/Reject so we need to ensure an error cannot occur
// later in verification, since that would cause the referenced root to never be dereferenced.
wstart := time.Now()
if err := bc.writeBlockAndSetHead(block, parent.Root, receipts, logs, statedb); err != nil {
// Note: if InsertTrie must be the last step in verification that can return an error.
// This allows [stateManager] to assume that if it inserts a trie without returning an
// error then the block has passed verification and either AcceptTrie/RejectTrie will
// eventually be called on [root] unless a fatal error occurs. It does not assume that
// the node will not shutdown before either AcceptTrie/RejectTrie is called.
if err := bc.stateManager.InsertTrie(block); err != nil {
if bc.snaps != nil {
discardErr := bc.snaps.Discard(block.Hash())
if discardErr != nil {
log.Debug("failed to discard snapshot after being unable to insert block trie", "block", block.Hash(), "root", block.Root())
}
}
return err
}
// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them
blockWriteTimer.Update((time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits))
blockInsertTimer.Update(time.Since(start))

log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
"parentHash", block.ParentHash(),
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
"elapsed", common.PrettyDuration(time.Since(start)),
"root", block.Root(), "baseFeePerGas", block.BaseFee(), "blockGasCost", block.BlockGasCost(),
)

processedBlockGasUsedCounter.Inc(int64(block.GasUsed()))
processedTxsCounter.Inc(int64(block.Transactions().Len()))
processedLogsCounter.Inc(int64(len(logs)))
blockInsertCount.Inc(1)
return nil
}

Expand Down Expand Up @@ -1683,7 +1583,7 @@ func (bc *BlockChain) commitWithSnap(
}

// initSnapshot instantiates a Snapshot instance and adds it to [bc]
func (bc *BlockChain) initSnapshot(b *types.Header) {
func (bc *blockChain) initSnapshot(b *types.Header) {
if bc.cacheConfig.SnapshotLimit <= 0 || bc.snaps != nil {
return
}
Expand Down Expand Up @@ -2030,6 +1930,7 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
if err := bc.batchBlockAcceptedIndices(batch, block); err != nil {
return err
}
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), block.Difficulty())
rawdb.WriteHeadBlockHash(batch, block.Hash())
rawdb.WriteHeadHeaderHash(batch, block.Hash())
rawdb.WriteSnapshotBlockHash(batch, block.Hash())
Expand Down
Loading

0 comments on commit 25f4f0f

Please sign in to comment.