Skip to content

Commit

Permalink
Merge pull request #21047 from holiman/improve_updates_2
Browse files Browse the repository at this point in the history
core: improve trie updates (part 2)
  • Loading branch information
karalabe authored Jan 20, 2021
2 parents 81bf9f9 + 42f9f1f commit ddadc3d
Show file tree
Hide file tree
Showing 12 changed files with 545 additions and 81 deletions.
3 changes: 1 addition & 2 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ func (b *SimulatedBackend) Rollback() {

func (b *SimulatedBackend) rollback() {
blocks, _ := core.GenerateChain(b.config, b.blockchain.CurrentBlock(), ethash.NewFaker(), b.database, 1, func(int, *core.BlockGen) {})
stateDB, _ := b.blockchain.State()

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
b.pendingState, _ = state.New(b.pendingBlock.Root(), b.blockchain.StateCache(), nil)
}

// stateByBlockNumber retrieves a state by a given blocknumber.
Expand Down
15 changes: 9 additions & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ type BlockChain struct {
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher // Block state prefetcher interface
processor Processor // Block transaction processor interface
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
vmConfig vm.Config

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
Expand Down Expand Up @@ -1860,12 +1860,17 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if err != nil {
return it.index, err
}
// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
defer statedb.StopPrefetcher() // stopped on write anyway, defer meant to catch early error returns

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)

go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)

Expand All @@ -1891,8 +1896,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them

triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates

Expand Down Expand Up @@ -1920,7 +1924,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if err != nil {
return it.index, err
}

// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
Expand Down
12 changes: 10 additions & 2 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,20 @@ type cachingDB struct {

// OpenTrie opens the main account trie at a specific root hash.
func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) {
return trie.NewSecure(root, db.db)
tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

// OpenStorageTrie opens the storage trie of an account.
func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) {
return trie.NewSecure(root, db.db)
tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

// CopyTrie returns an independent copy of the given trie.
Expand Down
84 changes: 62 additions & 22 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,20 @@ func (s *stateObject) touch() {

func (s *stateObject) getTrie(db Database) Trie {
if s.trie == nil {
var err error
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
if err != nil {
s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{})
s.setError(fmt.Errorf("can't create storage trie: %v", err))
// Try fetching from prefetcher first
// We don't prefetch empty tries
if s.data.Root != emptyRoot && s.db.prefetcher != nil {
// When the miner is creating the pending state, there is no
// prefetcher
s.trie = s.db.prefetcher.trie(s.data.Root)
}
if s.trie == nil {
var err error
s.trie, err = db.OpenStorageTrie(s.addrHash, s.data.Root)
if err != nil {
s.trie, _ = db.OpenStorageTrie(s.addrHash, common.Hash{})
s.setError(fmt.Errorf("can't create storage trie: %v", err))
}
}
}
return s.trie
Expand Down Expand Up @@ -197,12 +206,24 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
// If no live objects are available, attempt to use snapshots
var (
enc []byte
err error
enc []byte
err error
meter *time.Duration
)
readStart := time.Now()
if metrics.EnabledExpensive {
// If the snap is 'under construction', the first lookup may fail. If that
// happens, we don't want to double-count the time elapsed. Thus this
// dance with the metering.
defer func() {
if meter != nil {
*meter += time.Since(readStart)
}
}()
}
if s.db.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.SnapshotStorageReads += time.Since(start) }(time.Now())
meter = &s.db.SnapshotStorageReads
}
// If the object was destructed in *this* block (and potentially resurrected),
// the storage has been cleared out, and we should *not* consult the previous
Expand All @@ -217,8 +238,14 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
// If snapshot unavailable or reading from it failed, load from the database
if s.db.snap == nil || err != nil {
if meter != nil {
// If we already spent time checking the snapshot, account for it
// and reset the readStart
*meter += time.Since(readStart)
readStart = time.Now()
}
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageReads += time.Since(start) }(time.Now())
meter = &s.db.StorageReads
}
if enc, err = s.getTrie(db).TryGet(key.Bytes()); err != nil {
s.setError(err)
Expand Down Expand Up @@ -282,9 +309,16 @@ func (s *stateObject) setState(key, value common.Hash) {

// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise() {
func (s *stateObject) finalise(prefetch bool) {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
s.pendingStorage[key] = value
if value != s.originStorage[key] {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch)
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
Expand All @@ -295,26 +329,21 @@ func (s *stateObject) finalise() {
// It will return nil if the trie has not been loaded and no changes have been made
func (s *stateObject) updateTrie(db Database) Trie {
// Make sure all dirty slots are finalized into the pending storage area
s.finalise()
s.finalise(false) // Don't prefetch any more, pull directly if need be
if len(s.pendingStorage) == 0 {
return s.trie
}
// Track the amount of time wasted on updating the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now())
}
// Retrieve the snapshot storage map for the object
// The snapshot storage map for the object
var storage map[common.Hash][]byte
if s.db.snap != nil {
// Retrieve the old storage map, if available, create a new one otherwise
storage = s.db.snapStorage[s.addrHash]
if storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
// Insert all the pending updates into the trie
tr := s.getTrie(db)
hasher := s.db.hasher

usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
Expand All @@ -331,9 +360,20 @@ func (s *stateObject) updateTrie(db Database) Trie {
s.setError(tr.TryUpdate(key[:], v))
}
// If state snapshotting is active, cache the data til commit
if storage != nil {
storage[crypto.Keccak256Hash(key[:])] = v // v will be nil if value is 0x00
if s.db.snap != nil {
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = s.db.snapStorage[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00
}
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
}
if s.db.prefetcher != nil {
s.db.prefetcher.used(s.data.Root, usedStorage)
}
if len(s.pendingStorage) > 0 {
s.pendingStorage = make(Storage)
Expand Down
2 changes: 1 addition & 1 deletion core/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestSnapshot2(t *testing.T) {
state.setStateObject(so0)

root, _ := state.Commit(false)
state.Reset(root)
state, _ = New(root, state.db, state.snaps)

// and one with deleted == true
so1 := state.getStateObject(stateobjaddr1)
Expand Down
Loading

0 comments on commit ddadc3d

Please sign in to comment.