Skip to content

Commit

Permalink
fix(dot/state,dot/network): improve memory usage when syncing (#1491)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored Mar 25, 2021
1 parent 3880a40 commit 3b2ad8d
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 9 deletions.
2 changes: 2 additions & 0 deletions dot/core/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ func (s *mockSyncer) IsSynced() bool {
return false
}

func (s *mockSyncer) SetSyncing(bool) {}

type mockDigestItem struct { //nolint
i int
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestService_Health(t *testing.T) {
require.Equal(t, s.Health().IsSyncing, true)
mockSync := s.syncer.(*mockSyncer)

mockSync.setSyncedState(true)
mockSync.SetSyncing(false)
require.Equal(t, s.Health().IsSyncing, false)
}

Expand Down
2 changes: 2 additions & 0 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Syncer interface {

// IsSynced exposes the internal synced state // TODO: use syncQueue for this
IsSynced() bool

SetSyncing(bool)
}

// TransactionHandler is the interface used by the transactions sub-protocol
Expand Down
17 changes: 12 additions & 5 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ func (q *syncQueue) syncAtHead() {
return
}

q.s.syncer.SetSyncing(true)

for {
select {
// sleep for average block time TODO: make this configurable from slot duration
Expand All @@ -183,6 +185,9 @@ func (q *syncQueue) syncAtHead() {
continue
}

logger.Info("set syncing to false")
q.s.syncer.SetSyncing(false)

// we have received new blocks since the last check, sleep
if prev.Number.Int64() < curr.Number.Int64() {
prev = curr
Expand All @@ -192,10 +197,7 @@ func (q *syncQueue) syncAtHead() {
prev = curr
start := uint64(curr.Number.Int64()) + 1
logger.Debug("haven't received new blocks since last check, pushing request", "start", start)
q.requestData.Store(start, requestData{
sent: true,
received: false,
})
q.requestData.Delete(start)
q.pushRequest(start, 1, "")
}
}
Expand Down Expand Up @@ -248,7 +250,12 @@ func (q *syncQueue) handleResponseQueue() {
// prune peers with low score and connect to new peers
func (q *syncQueue) prunePeers() {
for {
time.Sleep(time.Second * 30)
select {
case <-time.After(time.Second * 30):
case <-q.ctx.Done():
return
}

logger.Debug("✂️ pruning peers w/ low score...")

peers := q.getSortedPeers()
Expand Down
4 changes: 2 additions & 2 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (s *mockSyncer) IsSynced() bool {
return s.synced
}

func (s *mockSyncer) setSyncedState(newState bool) {
s.synced = newState
func (s *mockSyncer) SetSyncing(syncing bool) {
s.synced = !syncing
}

type testStreamHandler struct {
Expand Down
8 changes: 8 additions & 0 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"os/signal"
"path"
"runtime/debug"
"sync"
"syscall"

Expand Down Expand Up @@ -158,6 +159,13 @@ func NodeInitialized(basepath string, expected bool) bool {

// NewNode creates a new dot node from a dot node configuration
func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, error) {
// set garbage collection percent to 10%
// can be overwritten by setting the GOGC env veriable, which defaults to 100
prev := debug.SetGCPercent(10)
if prev != 100 {
debug.SetGCPercent(prev)
}

setupLogger(cfg)

// if authority node, should have at least 1 key in keystore
Expand Down
2 changes: 2 additions & 0 deletions dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (s *mockSyncer) IsSynced() bool {
return false
}

func (s *mockSyncer) SetSyncing(_ bool) {}

type mockBlockState struct{}

func (s *mockBlockState) BestBlockHeader() (*types.Header, error) {
Expand Down
14 changes: 14 additions & 0 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type StorageState struct {
// change notifiers
changedLock sync.RWMutex
subscriptions map[byte]*StorageSubscription

syncing bool
}

// NewStorageState creates a new StorageState backed by the given trie and database located at basePath.
Expand All @@ -76,6 +78,11 @@ func NewStorageState(db chaindb.Database, blockState *BlockState, t *trie.Trie)
}, nil
}

// SetSyncing sets whether the node is currently syncing or not
func (s *StorageState) SetSyncing(syncing bool) {
s.syncing = syncing
}

func (s *StorageState) pruneKey(keyHeader *types.Header) {
s.lock.Lock()
defer s.lock.Unlock()
Expand All @@ -93,6 +100,12 @@ func (s *StorageState) pruneKey(keyHeader *types.Header) {
func (s *StorageState) StoreTrie(ts *rtstorage.TrieState) error {
s.lock.Lock()
root := ts.MustRoot()
if s.syncing {
// keep only the trie at the head of the chain when syncing
for key := range s.tries {
delete(s.tries, key)
}
}
s.tries[root] = ts.Trie()
s.lock.Unlock()

Expand All @@ -108,6 +121,7 @@ func (s *StorageState) StoreTrie(ts *rtstorage.TrieState) error {
logger.Warn("failed to notify storage subscriptions", "error", err)
}
}()

return nil
}

Expand Down
30 changes: 30 additions & 0 deletions dot/state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,33 @@ func TestStorage_LoadFromDB(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(entries))
}

func TestStorage_StoreTrie_Syncing(t *testing.T) {
storage := newTestStorageState(t)
ts, err := storage.TrieState(&trie.EmptyHash)
require.NoError(t, err)

key := []byte("testkey")
value := []byte("testvalue")
ts.Set(key, value)

storage.SetSyncing(true)
err = storage.StoreTrie(ts)
require.NoError(t, err)
require.Equal(t, 1, len(storage.tries))
}

func TestStorage_StoreTrie_NotSyncing(t *testing.T) {
storage := newTestStorageState(t)
ts, err := storage.TrieState(&trie.EmptyHash)
require.NoError(t, err)

key := []byte("testkey")
value := []byte("testvalue")
ts.Set(key, value)

storage.SetSyncing(false)
err = storage.StoreTrie(ts)
require.NoError(t, err)
require.Equal(t, 2, len(storage.tries))
}
1 change: 1 addition & 0 deletions dot/sync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type StorageState interface {
TrieState(root *common.Hash) (*rtstorage.TrieState, error)
StoreTrie(ts *rtstorage.TrieState) error
LoadCodeHash(*common.Hash) (common.Hash, error)
SetSyncing(bool)
}

// TransactionState is the interface for transaction queue methods
Expand Down
6 changes: 6 additions & 0 deletions dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,9 @@ func (s *Service) handleDigests(header *types.Header) {
func (s *Service) IsSynced() bool {
return s.synced
}

// SetSyncing sets whether the node is currently syncing or not
func (s *Service) SetSyncing(syncing bool) {
s.synced = !syncing
s.storageState.SetSyncing(syncing)
}
2 changes: 1 addition & 1 deletion tests/rpc/rpc_01-system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestSystemRPC(t *testing.T) {
expected: modules.SystemHealthResponse{

Peers: 2,
IsSyncing: false,
IsSyncing: true,
ShouldHavePeers: true,
},
params: "{}",
Expand Down

0 comments on commit 3b2ad8d

Please sign in to comment.