Skip to content

Commit

Permalink
fix(dot/state): inject mutex protected tries to states
Browse files Browse the repository at this point in the history
  • Loading branch information
qdm12 committed Feb 10, 2022
1 parent 9775246 commit 15d951a
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 48 deletions.
14 changes: 6 additions & 8 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
)

const (
pruneKeyBufferSize = 1000
blockPrefix = "block"
blockPrefix = "block"
)

var (
Expand Down Expand Up @@ -60,6 +59,7 @@ type BlockState struct {
genesisHash common.Hash
lastFinalised common.Hash
unfinalisedBlocks *sync.Map // map[common.Hash]*types.Block
tries *tries

// block notifiers
imported map[chan *types.Block]struct{}
Expand All @@ -69,21 +69,19 @@ type BlockState struct {
runtimeUpdateSubscriptionsLock sync.RWMutex
runtimeUpdateSubscriptions map[uint32]chan<- runtime.Version

pruneKeyCh chan *types.Header

telemetry telemetry.Client
}

// NewBlockState will create a new BlockState backed by the database located at basePath
func NewBlockState(db chaindb.Database, telemetry telemetry.Client) (*BlockState, error) {
func NewBlockState(db chaindb.Database, tries *tries, telemetry telemetry.Client) (*BlockState, error) {
bs := &BlockState{
dbPath: db.Path(),
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
unfinalisedBlocks: new(sync.Map),
tries: tries,
imported: make(map[chan *types.Block]struct{}),
finalised: make(map[chan *types.FinalisationInfo]struct{}),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version),
telemetry: telemetry,
}
Expand All @@ -107,16 +105,16 @@ func NewBlockState(db chaindb.Database, telemetry telemetry.Client) (*BlockState

// NewBlockStateFromGenesis initialises a BlockState from a genesis header,
// saving it to the database located at basePath
func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header,
func NewBlockStateFromGenesis(db chaindb.Database, tries *tries, header *types.Header,
telemetryMailer telemetry.Client) (*BlockState, error) {
bs := &BlockState{
bt: blocktree.NewBlockTreeFromRoot(header),
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
unfinalisedBlocks: new(sync.Map),
tries: tries,
imported: make(map[chan *types.Block]struct{}),
finalised: make(map[chan *types.FinalisationInfo]struct{}),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version),
genesisHash: header.Hash(),
lastFinalised: header.Hash(),
Expand Down
6 changes: 4 additions & 2 deletions dot/state/block_finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er
continue
}

bs.tries.delete(hash)

logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash)
bs.pruneKeyCh <- &block.Header
}

// if nothing was previously finalised, set the first slot of the network to the
Expand Down Expand Up @@ -238,8 +239,9 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error {
continue
}

bs.tries.delete(hash)

logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash)
bs.pruneKeyCh <- &block.Header
}

return batch.Flush()
Expand Down
2 changes: 1 addition & 1 deletion dot/state/block_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestConcurrencySetHeader(t *testing.T) {
go func(index int) {
defer pend.Done()

bs, err := NewBlockStateFromGenesis(dbs[index], testGenesisHeader, telemetryMock)
bs, err := NewBlockStateFromGenesis(dbs[index], nil, testGenesisHeader, telemetryMock)
require.NoError(t, err)

header := &types.Header{
Expand Down
2 changes: 1 addition & 1 deletion dot/state/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newTestBlockState(t *testing.T, header *types.Header) *BlockState {
header = testGenesisHeader
}

bs, err := NewBlockStateFromGenesis(db, header, telemetryMock)
bs, err := NewBlockStateFromGenesis(db, nil, header, telemetryMock)
require.NoError(t, err)
return bs
}
Expand Down
6 changes: 4 additions & 2 deletions dot/state/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ func (s *Service) Initialise(gen *genesis.Genesis, header *types.Header, t *trie
return fmt.Errorf("failed to write genesis values to database: %s", err)
}

tries := newTries(t)

// create block state from genesis block
blockState, err := NewBlockStateFromGenesis(db, header, s.Telemetry)
blockState, err := NewBlockStateFromGenesis(db, tries, header, s.Telemetry)
if err != nil {
return fmt.Errorf("failed to create block state from genesis: %s", err)
}

// create storage state from genesis trie
storageState, err := NewStorageState(db, blockState, t, pruner.Config{})
storageState, err := NewStorageState(db, blockState, tries, pruner.Config{})
if err != nil {
return fmt.Errorf("failed to create storage state from trie: %s", err)
}
Expand Down
6 changes: 4 additions & 2 deletions dot/state/offline_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ func NewOfflinePruner(inputDBPath, prunedDBPath string, bloomSize uint64,
return nil, fmt.Errorf("failed to load DB %w", err)
}

tries := newTries(trie.NewEmptyTrie())

// create blockState state
// NewBlockState on pruner execution does not use telemetry
blockState, err := NewBlockState(db, nil)
blockState, err := NewBlockState(db, tries, nil)
if err != nil {
return nil, fmt.Errorf("failed to create block state: %w", err)
}
Expand All @@ -60,7 +62,7 @@ func NewOfflinePruner(inputDBPath, prunedDBPath string, bloomSize uint64,
}

// load storage state
storageState, err := NewStorageState(db, blockState, trie.NewEmptyTrie(), pruner.Config{})
storageState, err := NewStorageState(db, blockState, tries, pruner.Config{})
if err != nil {
return nil, fmt.Errorf("failed to create new storage state %w", err)
}
Expand Down
9 changes: 4 additions & 5 deletions dot/state/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ func (s *Service) Start() error {
return nil
}

tries := newTries(trie.NewEmptyTrie())

var err error
// create block state
s.Block, err = NewBlockState(s.db, s.Telemetry)
s.Block, err = NewBlockState(s.db, tries, s.Telemetry)
if err != nil {
return fmt.Errorf("failed to create block state: %w", err)
}
Expand All @@ -136,7 +138,7 @@ func (s *Service) Start() error {
}

// create storage state
s.Storage, err = NewStorageState(s.db, s.Block, trie.NewEmptyTrie(), pr)
s.Storage, err = NewStorageState(s.db, s.Block, tries, pr)
if err != nil {
return fmt.Errorf("failed to create storage state: %w", err)
}
Expand Down Expand Up @@ -167,9 +169,6 @@ func (s *Service) Start() error {
", highest number " + num.String() +
" and genesis hash " + s.Block.genesisHash.String())

// Start background goroutine to GC pruned keys.
go s.Storage.pruneStorage(s.closeCh)

return nil
}

Expand Down
24 changes: 1 addition & 23 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,11 @@ type StorageState struct {

// NewStorageState creates a new StorageState backed by the given trie and database located at basePath.
func NewStorageState(db chaindb.Database, blockState *BlockState,
t *trie.Trie, onlinePruner pruner.Config) (*StorageState, error) {
tries *tries, onlinePruner pruner.Config) (*StorageState, error) {
if db == nil {
return nil, fmt.Errorf("cannot have nil database")
}

if t == nil {
return nil, fmt.Errorf("cannot have nil trie")
}

tries := newTries(t)

storageTable := chaindb.NewTable(db, storagePrefix)

var p pruner.Pruner
Expand All @@ -76,11 +70,6 @@ func NewStorageState(db chaindb.Database, blockState *BlockState,
}, nil
}

func (s *StorageState) pruneKey(keyHeader *types.Header) {
logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash())
s.tries.delete(keyHeader.StateRoot)
}

// StoreTrie stores the given trie in the StorageState and writes it to the database
func (s *StorageState) StoreTrie(ts *rtstorage.TrieState, header *types.Header) error {
root := ts.MustRoot()
Expand Down Expand Up @@ -314,14 +303,3 @@ func (s *StorageState) LoadCodeHash(hash *common.Hash) (common.Hash, error) {
func (s *StorageState) GenerateTrieProof(stateRoot common.Hash, keys [][]byte) ([][]byte, error) {
return trie.GenerateProof(stateRoot[:], keys, s.db)
}

func (s *StorageState) pruneStorage(closeCh chan interface{}) {
for {
select {
case key := <-s.blockState.pruneKeyCh:
s.pruneKey(key)
case <-closeCh:
return
}
}
}
10 changes: 7 additions & 3 deletions dot/state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func newTestStorageState(t *testing.T) *StorageState {
db := NewInMemoryDB(t)
bs := newTestBlockState(t, testGenesisHeader)

s, err := NewStorageState(db, bs, trie.NewEmptyTrie(), pruner.Config{})
tries := newTries(trie.NewEmptyTrie())

s, err := NewStorageState(db, bs, tries, pruner.Config{})
require.NoError(t, err)
return s
}
Expand Down Expand Up @@ -179,7 +181,7 @@ func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
"0",
))

blockState, err := NewBlockStateFromGenesis(db, genHeader, telemetryMock)
blockState, err := NewBlockStateFromGenesis(db, nil, genHeader, telemetryMock)
require.NoError(t, err)

testChildTrie := trie.NewEmptyTrie()
Expand All @@ -188,7 +190,9 @@ func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
err = genTrie.PutChild([]byte("keyToChild"), testChildTrie)
require.NoError(t, err)

storage, err := NewStorageState(db, blockState, genTrie, pruner.Config{})
tries := newTries(genTrie)

storage, err := NewStorageState(db, blockState, tries, pruner.Config{})
require.NoError(t, err)

trieState, err := runtime.NewTrieState(genTrie)
Expand Down
2 changes: 1 addition & 1 deletion lib/grandpa/grandpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func newTestState(t *testing.T) *state.Service {
t.Cleanup(func() { db.Close() })

_, genTrie, _ := genesis.NewTestGenesisWithTrieAndHeader(t)
block, err := state.NewBlockStateFromGenesis(db, testGenesisHeader, telemetryMock)
block, err := state.NewBlockStateFromGenesis(db, nil, testGenesisHeader, telemetryMock)
require.NoError(t, err)

rtCfg := &wasmer.Config{}
Expand Down

0 comments on commit 15d951a

Please sign in to comment.