Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recording database updates #177

Merged
merged 15 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 160 additions & 13 deletions arbitrum/recordingdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package arbitrum

import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -25,7 +28,7 @@ type RecordingKV struct {
enableBypass bool
}

func NewRecordingKV(inner *trie.Database) *RecordingKV {
func newRecordingKV(inner *trie.Database) *RecordingKV {
return &RecordingKV{inner, make(map[common.Hash][]byte), false}
}

Expand Down Expand Up @@ -126,7 +129,7 @@ type RecordingChainContext struct {
initialBlockNumber uint64
}

func NewRecordingChainContext(inner core.ChainContext, blocknumber uint64) *RecordingChainContext {
func newRecordingChainContext(inner core.ChainContext, blocknumber uint64) *RecordingChainContext {
return &RecordingChainContext{
bc: inner,
minBlockNumberAccessed: blocknumber,
Expand All @@ -149,9 +152,83 @@ func (r *RecordingChainContext) GetMinBlockNumberAccessed() uint64 {
return r.minBlockNumberAccessed
}

func PrepareRecording(blockchain *core.BlockChain, lastBlockHeader *types.Header) (*state.StateDB, core.ChainContext, *RecordingKV, error) {
rawTrie := blockchain.StateCache().TrieDB()
recordingKeyValue := NewRecordingKV(rawTrie)
type RecordingDatabase struct {
db state.Database
bc *core.BlockChain
mutex sync.Mutex // protects StateFor and Dereference
references int64
}

func NewRecordingDatabase(ethdb ethdb.Database, blockchain *core.BlockChain) *RecordingDatabase {
return &RecordingDatabase{
db: state.NewDatabaseWithConfig(ethdb, &trie.Config{Cache: 16}), //TODO cache needed? configurable?
bc: blockchain,
}
}

// Normal geth state.New + Reference is not atomic vs Dereference. This one is.
// This function does not recreate a state
func (r *RecordingDatabase) StateFor(header *types.Header) (*state.StateDB, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

sdb, err := state.NewDeterministic(header.Root, r.db)
if err == nil {
r.referenceRootLockHeld(header.Root)
}
return sdb, err
}

func (r *RecordingDatabase) Dereference(header *types.Header) {
if header != nil {
r.dereferenceRoot(header.Root)
}
}

func (r *RecordingDatabase) WriteStateToDatabase(header *types.Header) error {
if header != nil {
return r.db.TrieDB().Commit(header.Root, true, nil)
}
return nil
}

// lock must be held when calling that
func (r *RecordingDatabase) referenceRootLockHeld(root common.Hash) {
r.references++
r.db.TrieDB().Reference(root, common.Hash{})
}

func (r *RecordingDatabase) dereferenceRoot(root common.Hash) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.references--
r.db.TrieDB().Dereference(root)
}

func (r *RecordingDatabase) addStateVerify(statedb *state.StateDB, expected common.Hash) error {
r.mutex.Lock()
defer r.mutex.Unlock()
result, err := statedb.Commit(true)
if err != nil {
return err
}
if result != expected {
return fmt.Errorf("bad root hash expected: %v got: %v", expected, result)
}
r.referenceRootLockHeld(result)
return nil
}

type StateBuildingLogFunction func(targetHeader, header *types.Header, hasState bool)

func (r *RecordingDatabase) PrepareRecording(ctx context.Context, lastBlockHeader *types.Header, logFunc StateBuildingLogFunction) (*state.StateDB, core.ChainContext, *RecordingKV, error) {
_, err := r.GetOrRecreateState(ctx, lastBlockHeader, logFunc)
if err != nil {
return nil, nil, nil, err
}
finalDereference := lastBlockHeader // dereference in case of error
defer func() { r.Dereference(finalDereference) }()
recordingKeyValue := newRecordingKV(r.db.TrieDB())

recordingStateDatabase := state.NewDatabase(rawdb.NewDatabase(recordingKeyValue))
var prevRoot common.Hash
Expand All @@ -167,29 +244,99 @@ func PrepareRecording(blockchain *core.BlockChain, lastBlockHeader *types.Header
if !lastBlockHeader.Number.IsUint64() {
return nil, nil, nil, errors.New("block number not uint64")
}
recordingChainContext = NewRecordingChainContext(blockchain, lastBlockHeader.Number.Uint64())
recordingChainContext = newRecordingChainContext(r.bc, lastBlockHeader.Number.Uint64())
}
finalDereference = nil
return recordingStateDb, recordingChainContext, recordingKeyValue, nil
}

func PreimagesFromRecording(chainContextIf core.ChainContext, recordingDb *RecordingKV) (map[common.Hash][]byte, error) {
func (r *RecordingDatabase) PreimagesFromRecording(chainContextIf core.ChainContext, recordingDb *RecordingKV) (map[common.Hash][]byte, error) {
entries := recordingDb.GetRecordedEntries()
recordingChainContext, ok := chainContextIf.(*RecordingChainContext)
if (recordingChainContext == nil) || (!ok) {
return nil, errors.New("recordingChainContext invalid")
}
blockchain, ok := recordingChainContext.bc.(*core.BlockChain)
if (blockchain == nil) || (!ok) {
return nil, errors.New("blockchain invalid")
}

for i := recordingChainContext.GetMinBlockNumberAccessed(); i <= recordingChainContext.initialBlockNumber; i++ {
header := blockchain.GetHeaderByNumber(i)
header := r.bc.GetHeaderByNumber(i)
hash := header.Hash()
bytes, err := rlp.EncodeToBytes(header)
if err != nil {
panic(fmt.Sprintf("Error RLP encoding header: %v\n", err))
return nil, fmt.Errorf("Error RLP encoding header: %v\n", err)
}
entries[hash] = bytes
}
return entries, nil
}

func (r *RecordingDatabase) GetOrRecreateState(ctx context.Context, header *types.Header, logFunc StateBuildingLogFunction) (*state.StateDB, error) {
stateDb, err := r.StateFor(header)
if err == nil {
return stateDb, nil
}
returnedBlockNumber := header.Number.Uint64()
genesis := r.bc.Config().ArbitrumChainParams.GenesisBlockNum
currentHeader := header
var lastRoot common.Hash
for ctx.Err() == nil {
if logFunc != nil {
logFunc(header, currentHeader, false)
}
if currentHeader.Number.Uint64() <= genesis {
return nil, fmt.Errorf("moved beyond genesis looking for state looking for %d, genesis %d, err %w", returnedBlockNumber, genesis, err)
}
currentHeader = r.bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)
if currentHeader == nil {
return nil, fmt.Errorf("chain doesn't contain parent of block %d hash %v", currentHeader.Number, currentHeader.Hash())
}
stateDb, err = r.StateFor(currentHeader)
if err == nil {
lastRoot = currentHeader.Root
break
}
}
defer func() {
if (lastRoot != common.Hash{}) {
r.dereferenceRoot(lastRoot)
}
}()
blockToRecreate := currentHeader.Number.Uint64() + 1
prevHash := currentHeader.Hash()
for ctx.Err() == nil {
block := r.bc.GetBlockByNumber(blockToRecreate)
if block == nil {
return nil, fmt.Errorf("block not found while recreating: %d", blockToRecreate)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could check here if block.ParentHash() matches what we'd expect to provide early protection against reorgs, though I suppose checking that the state root matches is also good enough

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Worth noting, though, that most reorgs will only be found after we reach the block number we're originally looking for and comparing it with the param header

}
if block.ParentHash() != prevHash {
return nil, fmt.Errorf("reorg detected: number %d expectedPrev: %v foundPrev: %v", blockToRecreate, prevHash, block.ParentHash())
}
prevHash = block.Hash()
if logFunc != nil {
logFunc(header, block.Header(), true)
}
_, _, _, err := r.bc.Processor().Process(block, stateDb, vm.Config{})
if err != nil {
return nil, fmt.Errorf("failed recreating state for block %d : %w", blockToRecreate, err)
}
err = r.addStateVerify(stateDb, block.Root())
if err != nil {
return nil, fmt.Errorf("failed commiting state for block %d : %w", blockToRecreate, err)
}
r.dereferenceRoot(lastRoot)
lastRoot = block.Root()
if blockToRecreate >= returnedBlockNumber {
if block.Hash() != header.Hash() {
return nil, fmt.Errorf("blockHash doesn't match when recreating number: %d expected: %v got: %v", blockToRecreate, header.Hash(), block.Hash())
}
// don't dereference this one
lastRoot = common.Hash{}
return stateDb, nil
}
blockToRecreate++
}
return nil, ctx.Err()
}

func (r *RecordingDatabase) ReferenceCount() int64 {
return r.references
}
10 changes: 9 additions & 1 deletion core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"errors"
"fmt"
"runtime"

"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -118,11 +119,13 @@ func NewDatabase(db ethdb.Database) Database {
// large memory cache.
func NewDatabaseWithConfig(db ethdb.Database, config *trie.Config) Database {
csc, _ := lru.New(codeSizeCacheSize)
return &cachingDB{
cdb := &cachingDB{
db: trie.NewDatabaseWithConfig(db, config),
codeSizeCache: csc,
codeCache: fastcache.New(codeCacheSize),
}
runtime.SetFinalizer(cdb, (*cachingDB).finalizer)
tsahee marked this conversation as resolved.
Show resolved Hide resolved
return cdb
}

type cachingDB struct {
Expand All @@ -140,6 +143,11 @@ func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) {
return tr, nil
}

// fastcache chunks are not mannaged by GC.
func (db *cachingDB) finalizer() {
db.codeCache.Reset()
}

// OpenStorageTrie opens the storage trie of an account.
func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) {
tr, err := trie.NewSecure(addrHash, root, db.db)
Expand Down
8 changes: 8 additions & 0 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func NewDatabaseWithConfig(diskdb ethdb.KeyValueStore, config *Config) *Database
if config == nil || config.Preimages { // TODO(karalabe): Flip to default off in the future
db.preimages = make(map[common.Hash][]byte)
}
runtime.SetFinalizer(db, (*Database).finalizer)
return db
}

Expand All @@ -305,6 +306,13 @@ func (db *Database) DiskDB() ethdb.KeyValueStore {
return db.diskdb
}

// must call Reset() to reclaim memory used by fastcache
func (db *Database) finalizer() {
if db.cleans != nil {
db.cleans.Reset()
}
}

// insert inserts a collapsed trie node into the memory database.
// The blob size must be specified to allow proper size tracking.
// All nodes inserted by this function will be reference tracked
Expand Down