Skip to content

Commit

Permalink
store in memory tries in BlockState instead of StorageState
Browse files Browse the repository at this point in the history
  • Loading branch information
qdm12 committed Feb 11, 2022
1 parent 9775246 commit 42dcd64
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 68 deletions.
15 changes: 7 additions & 8 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/runtime"
"github.com/ChainSafe/gossamer/lib/trie"
"github.com/ChainSafe/gossamer/pkg/scale"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -27,8 +28,7 @@ import (
)

const (
pruneKeyBufferSize = 1000
blockPrefix = "block"
blockPrefix = "block"
)

var (
Expand Down Expand Up @@ -60,6 +60,7 @@ type BlockState struct {
genesisHash common.Hash
lastFinalised common.Hash
unfinalisedBlocks *sync.Map // map[common.Hash]*types.Block
tries *tries

// block notifiers
imported map[chan *types.Block]struct{}
Expand All @@ -69,21 +70,19 @@ type BlockState struct {
runtimeUpdateSubscriptionsLock sync.RWMutex
runtimeUpdateSubscriptions map[uint32]chan<- runtime.Version

pruneKeyCh chan *types.Header

telemetry telemetry.Client
}

// NewBlockState will create a new BlockState backed by the database located at basePath
func NewBlockState(db chaindb.Database, telemetry telemetry.Client) (*BlockState, error) {
func NewBlockState(db chaindb.Database, trs *tries, telemetry telemetry.Client) (*BlockState, error) {
bs := &BlockState{
dbPath: db.Path(),
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
unfinalisedBlocks: new(sync.Map),
tries: trs,
imported: make(map[chan *types.Block]struct{}),
finalised: make(map[chan *types.FinalisationInfo]struct{}),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version),
telemetry: telemetry,
}
Expand All @@ -107,16 +106,16 @@ func NewBlockState(db chaindb.Database, telemetry telemetry.Client) (*BlockState

// NewBlockStateFromGenesis initialises a BlockState from a genesis header,
// saving it to the database located at basePath
func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header,
func NewBlockStateFromGenesis(db chaindb.Database, trie *trie.Trie, header *types.Header,
telemetryMailer telemetry.Client) (*BlockState, error) {
bs := &BlockState{
bt: blocktree.NewBlockTreeFromRoot(header),
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
unfinalisedBlocks: new(sync.Map),
tries: newTries(trie),
imported: make(map[chan *types.Block]struct{}),
finalised: make(map[chan *types.FinalisationInfo]struct{}),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
runtimeUpdateSubscriptions: make(map[uint32]chan<- runtime.Version),
genesisHash: header.Hash(),
lastFinalised: header.Hash(),
Expand Down
8 changes: 5 additions & 3 deletions dot/state/block_finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,16 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er
bs.notifyFinalized(hash, round, setID)
}

pruned := bs.bt.Prune(hash)
pruned := bs.bt.Prune(hash) // TODO this is always 0
for _, hash := range pruned {
block, has := bs.getAndDeleteUnfinalisedBlock(hash)
if !has {
continue
}

bs.tries.delete(block.Header.StateRoot)

logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash)
bs.pruneKeyCh <- &block.Header
}

// if nothing was previously finalised, set the first slot of the network to the
Expand Down Expand Up @@ -238,8 +239,9 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error {
continue
}

bs.tries.delete(block.Header.StateRoot)

logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash)
bs.pruneKeyCh <- &block.Header
}

return batch.Flush()
Expand Down
4 changes: 3 additions & 1 deletion dot/state/block_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ func TestConcurrencySetHeader(t *testing.T) {
dbs[i] = NewInMemoryDB(t)
}

someTrie := trie.NewEmptyTrie() // not used in this test

pend := new(sync.WaitGroup)
pend.Add(threads)
for i := 0; i < threads; i++ {
go func(index int) {
defer pend.Done()

bs, err := NewBlockStateFromGenesis(dbs[index], testGenesisHeader, telemetryMock)
bs, err := NewBlockStateFromGenesis(dbs[index], someTrie, testGenesisHeader, telemetryMock)
require.NoError(t, err)

header := &types.Header{
Expand Down
2 changes: 1 addition & 1 deletion dot/state/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newTestBlockState(t *testing.T, header *types.Header) *BlockState {
header = testGenesisHeader
}

bs, err := NewBlockStateFromGenesis(db, header, telemetryMock)
bs, err := NewBlockStateFromGenesis(db, trie.NewEmptyTrie(), header, telemetryMock)
require.NoError(t, err)
return bs
}
Expand Down
4 changes: 2 additions & 2 deletions dot/state/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func (s *Service) Initialise(gen *genesis.Genesis, header *types.Header, t *trie
}

// create block state from genesis block
blockState, err := NewBlockStateFromGenesis(db, header, s.Telemetry)
blockState, err := NewBlockStateFromGenesis(db, t, header, s.Telemetry)
if err != nil {
return fmt.Errorf("failed to create block state from genesis: %s", err)
}

// create storage state from genesis trie
storageState, err := NewStorageState(db, blockState, t, pruner.Config{})
storageState, err := NewStorageState(db, blockState, pruner.Config{})
if err != nil {
return fmt.Errorf("failed to create storage state from trie: %s", err)
}
Expand Down
6 changes: 4 additions & 2 deletions dot/state/offline_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ func NewOfflinePruner(inputDBPath, prunedDBPath string, bloomSize uint64,
return nil, fmt.Errorf("failed to load DB %w", err)
}

tries := newTries(trie.NewEmptyTrie())

// create blockState state
// NewBlockState on pruner execution does not use telemetry
blockState, err := NewBlockState(db, nil)
blockState, err := NewBlockState(db, tries, nil)
if err != nil {
return nil, fmt.Errorf("failed to create block state: %w", err)
}
Expand All @@ -60,7 +62,7 @@ func NewOfflinePruner(inputDBPath, prunedDBPath string, bloomSize uint64,
}

// load storage state
storageState, err := NewStorageState(db, blockState, trie.NewEmptyTrie(), pruner.Config{})
storageState, err := NewStorageState(db, blockState, pruner.Config{})
if err != nil {
return nil, fmt.Errorf("failed to create new storage state %w", err)
}
Expand Down
9 changes: 4 additions & 5 deletions dot/state/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ func (s *Service) Start() error {
return nil
}

tries := newTries(trie.NewEmptyTrie())

var err error
// create block state
s.Block, err = NewBlockState(s.db, s.Telemetry)
s.Block, err = NewBlockState(s.db, tries, s.Telemetry)
if err != nil {
return fmt.Errorf("failed to create block state: %w", err)
}
Expand All @@ -136,7 +138,7 @@ func (s *Service) Start() error {
}

// create storage state
s.Storage, err = NewStorageState(s.db, s.Block, trie.NewEmptyTrie(), pr)
s.Storage, err = NewStorageState(s.db, s.Block, pr)
if err != nil {
return fmt.Errorf("failed to create storage state: %w", err)
}
Expand Down Expand Up @@ -167,9 +169,6 @@ func (s *Service) Start() error {
", highest number " + num.String() +
" and genesis hash " + s.Block.genesisHash.String())

// Start background goroutine to GC pruned keys.
go s.Storage.pruneStorage(s.closeCh)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion dot/state/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestService_PruneStorage(t *testing.T) {
time.Sleep(1 * time.Second)

for _, v := range prunedArr {
tr := serv.Storage.tries.get(v.hash)
tr := serv.Storage.blockState.tries.get(v.hash)
require.Nil(t, tr)
}
}
Expand Down
41 changes: 9 additions & 32 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func errTrieDoesNotExist(hash common.Hash) error {
// StorageState is the struct that holds the trie, db and lock
type StorageState struct {
blockState *BlockState
tries *tries

db chaindb.Database
sync.RWMutex
Expand All @@ -41,19 +40,14 @@ type StorageState struct {
pruner pruner.Pruner
}

// NewStorageState creates a new StorageState backed by the given trie and database located at basePath.
// NewStorageState creates a new StorageState backed by the given block state
// and database located at basePath.
func NewStorageState(db chaindb.Database, blockState *BlockState,
t *trie.Trie, onlinePruner pruner.Config) (*StorageState, error) {
onlinePruner pruner.Config) (*StorageState, error) {
if db == nil {
return nil, fmt.Errorf("cannot have nil database")
}

if t == nil {
return nil, fmt.Errorf("cannot have nil trie")
}

tries := newTries(t)

storageTable := chaindb.NewTable(db, storagePrefix)

var p pruner.Pruner
Expand All @@ -69,23 +63,17 @@ func NewStorageState(db chaindb.Database, blockState *BlockState,

return &StorageState{
blockState: blockState,
tries: tries,
db: storageTable,
observerList: []Observer{},
pruner: p,
}, nil
}

func (s *StorageState) pruneKey(keyHeader *types.Header) {
logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash())
s.tries.delete(keyHeader.StateRoot)
}

// StoreTrie stores the given trie in the StorageState and writes it to the database
func (s *StorageState) StoreTrie(ts *rtstorage.TrieState, header *types.Header) error {
root := ts.MustRoot()

s.tries.softSet(root, ts.Trie())
s.blockState.tries.softSet(root, ts.Trie())

if _, ok := s.pruner.(*pruner.FullNode); header == nil && ok {
return fmt.Errorf("block cannot be empty for Full node pruner")
Expand Down Expand Up @@ -126,15 +114,15 @@ func (s *StorageState) TrieState(root *common.Hash) (*rtstorage.TrieState, error
root = &sr
}

t := s.tries.get(*root)
t := s.blockState.tries.get(*root)
if t == nil {
var err error
t, err = s.LoadFromDB(*root)
if err != nil {
return nil, err
}

s.tries.softSet(*root, t)
s.blockState.tries.softSet(*root, t)
} else if t.MustHash() != *root {
panic("trie does not have expected root")
}
Expand All @@ -157,7 +145,7 @@ func (s *StorageState) LoadFromDB(root common.Hash) (*trie.Trie, error) {
return nil, err
}

s.tries.softSet(t.MustHash(), t)
s.blockState.tries.softSet(t.MustHash(), t)
return t, nil
}

Expand All @@ -170,7 +158,7 @@ func (s *StorageState) loadTrie(root *common.Hash) (*trie.Trie, error) {
root = &sr
}

t := s.tries.get(*root)
t := s.blockState.tries.get(*root)
if t != nil {
return t, nil
}
Expand Down Expand Up @@ -201,7 +189,7 @@ func (s *StorageState) GetStorage(root *common.Hash, key []byte) ([]byte, error)
root = &sr
}

t := s.tries.get(*root)
t := s.blockState.tries.get(*root)
if t != nil {
val := t.Get(key)
return val, nil
Expand Down Expand Up @@ -314,14 +302,3 @@ func (s *StorageState) LoadCodeHash(hash *common.Hash) (common.Hash, error) {
func (s *StorageState) GenerateTrieProof(stateRoot common.Hash, keys [][]byte) ([][]byte, error) {
return trie.GenerateProof(stateRoot[:], keys, s.db)
}

func (s *StorageState) pruneStorage(closeCh chan interface{}) {
for {
select {
case key := <-s.blockState.pruneKeyCh:
s.pruneKey(key)
case <-closeCh:
return
}
}
}
22 changes: 11 additions & 11 deletions dot/state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func newTestStorageState(t *testing.T) *StorageState {
db := NewInMemoryDB(t)
bs := newTestBlockState(t, testGenesisHeader)

s, err := NewStorageState(db, bs, trie.NewEmptyTrie(), pruner.Config{})
s, err := NewStorageState(db, bs, pruner.Config{})
require.NoError(t, err)
return s
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestStorage_TrieState(t *testing.T) {
time.Sleep(time.Millisecond * 100)

// get trie from db
storage.tries.delete(root)
storage.blockState.tries.delete(root)
ts3, err := storage.TrieState(&root)
require.NoError(t, err)
require.Equal(t, ts.Trie().MustHash(), ts3.Trie().MustHash())
Expand Down Expand Up @@ -131,19 +131,19 @@ func TestStorage_LoadFromDB(t *testing.T) {
require.NoError(t, err)

// Clear trie from cache and fetch data from disk.
storage.tries.delete(root)
storage.blockState.tries.delete(root)

data, err := storage.GetStorage(&root, trieKV[0].key)
require.NoError(t, err)
require.Equal(t, trieKV[0].value, data)

storage.tries.delete(root)
storage.blockState.tries.delete(root)

prefixKeys, err := storage.GetKeysWithPrefix(&root, []byte("ke"))
require.NoError(t, err)
require.Equal(t, 2, len(prefixKeys))

storage.tries.delete(root)
storage.blockState.tries.delete(root)

entries, err := storage.Entries(&root)
require.NoError(t, err)
Expand All @@ -161,7 +161,7 @@ func TestStorage_StoreTrie_NotSyncing(t *testing.T) {

err = storage.StoreTrie(ts, nil)
require.NoError(t, err)
require.Equal(t, 2, storage.tries.len())
require.Equal(t, 2, storage.blockState.tries.len())
}

func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
Expand All @@ -179,16 +179,16 @@ func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
"0",
))

blockState, err := NewBlockStateFromGenesis(db, genHeader, telemetryMock)
require.NoError(t, err)

testChildTrie := trie.NewEmptyTrie()
testChildTrie.Put([]byte("keyInsidechild"), []byte("voila"))

err = genTrie.PutChild([]byte("keyToChild"), testChildTrie)
require.NoError(t, err)

storage, err := NewStorageState(db, blockState, genTrie, pruner.Config{})
blockState, err := NewBlockStateFromGenesis(db, genTrie, genHeader, telemetryMock)
require.NoError(t, err)

storage, err := NewStorageState(db, blockState, pruner.Config{})
require.NoError(t, err)

trieState, err := runtime.NewTrieState(genTrie)
Expand All @@ -208,7 +208,7 @@ func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
require.NoError(t, err)

// Clear trie from cache and fetch data from disk.
storage.tries.delete(rootHash)
storage.blockState.tries.delete(rootHash)

_, err = storage.GetStorageChild(&rootHash, []byte("keyToChild"))
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 42dcd64

Please sign in to comment.