Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/state): actually prune finalized tries from memory #2196

Merged
merged 9 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) {
func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error {
msgs := s.messages[stream.Conn().RemotePeer()]
s.messages[stream.Conn().RemotePeer()] = append(msgs, msg)

announceHandshake := &BlockAnnounceHandshake{
BestBlockNumber: 0,
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
}

if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) {
// message has already been sent
logger.Tracef("message has already been sent, ignoring: peer=%s msg=%s", peer, msg)
return
}

Expand Down
11 changes: 9 additions & 2 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,20 @@ func Test_HandshakeTimeout(t *testing.T) {
}
require.NoError(t, err)

// clear handshake data from connection handler
time.Sleep(time.Millisecond * 100)
info.outboundHandshakeData.Delete(nodeB.host.id())
connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
for _, stream := range connAToB[0].GetStreams() {
_ = stream.Close()
}

testHandshakeMsg := &BlockAnnounceHandshake{
Roles: 4,
BestBlockNumber: 77,
BestBlockHash: common.Hash{1},
GenesisHash: common.Hash{2},
}
nodeA.GossipMessage(testHandshakeMsg)

info.outboundHandshakeMutexes.Store(nodeB.host.id(), new(sync.Mutex))
go nodeA.sendData(nodeB.host.id(), testHandshakeMsg, info, nil)
Expand All @@ -292,7 +299,7 @@ func Test_HandshakeTimeout(t *testing.T) {
require.False(t, ok)

// a stream should be open until timeout
connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
require.Len(t, connAToB, 1)
require.Len(t, connAToB[0].GetStreams(), 1)

Expand Down
27 changes: 27 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,33 @@ func (s *Service) sentBlockIntervalTelemetry() {
func (s *Service) handleConn(conn libp2pnetwork.Conn) {
// TODO: currently we only have one set so setID is 0, change this once we have more set in peerSet.
s.host.cm.peerSetHandler.Incoming(0, conn.RemotePeer())

// exchange BlockAnnounceHandshake with peer so we can start to
// sync if necessary.
prtl, has := s.notificationsProtocols[BlockAnnounceMsgType]
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
if !has {
return
}

hs, err := prtl.getHandshake()
if err != nil {
logger.Warnf("failed to get handshake for protocol %s: %s",
prtl.protocolID,
err,
)
return
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}

_, err = s.sendHandshake(conn.RemotePeer(), hs, prtl)
if err != nil {
logger.Debugf("failed to send handshake to peer %s on connection: %s",
conn.RemotePeer(),
err,
)
return
}

// leave stream open if there's no error
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}

// Stop closes running instances of the host and network services as well as
Expand Down
13 changes: 9 additions & 4 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/utils"
)
Expand Down Expand Up @@ -100,7 +101,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {
Port: availablePort(t),
NoBootstrap: true,
NoMDNS: true,
LogLvl: 4,
LogLvl: log.Warn,
SlotDuration: time.Second,
}
}
Expand Down Expand Up @@ -285,6 +286,7 @@ func TestBroadcastDuplicateMessage(t *testing.T) {
nodeB := createTestService(t, configB)
nodeB.noGossip = true

// TODO: create a decoder that handles both handshakes and messages
handler := newTestStreamHandler(testBlockAnnounceHandshakeDecoder)
nodeB.host.registerStreamHandler(nodeB.host.protocolID+blockAnnounceID, handler.handleStream)

Expand Down Expand Up @@ -313,14 +315,16 @@ func TestBroadcastDuplicateMessage(t *testing.T) {
Digest: types.NewDigest(),
}

delete(handler.messages, nodeA.host.id())

// Only one message will be sent.
for i := 0; i < 5; i++ {
nodeA.GossipMessage(announceMessage)
time.Sleep(time.Millisecond * 10)
}

time.Sleep(time.Millisecond * 200)
require.Equal(t, 1, len(handler.messages[nodeA.host.id()]))
time.Sleep(time.Millisecond * 500)
require.Equal(t, 2, len(handler.messages[nodeA.host.id()]))

nodeA.host.messageCache = nil

Expand All @@ -329,7 +333,8 @@ func TestBroadcastDuplicateMessage(t *testing.T) {
nodeA.GossipMessage(announceMessage)
time.Sleep(time.Millisecond * 10)
}
require.Equal(t, 6, len(handler.messages[nodeA.host.id()]))

require.Equal(t, 7, len(handler.messages[nodeA.host.id()]))
}

func TestService_NodeRoles(t *testing.T) {
Expand Down
15 changes: 9 additions & 6 deletions dot/state/block_finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er
}

logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash)

go func(header *types.Header) {
bs.pruneKeyCh <- header
}(&block.Header)
bs.pruneKeyCh <- &block.Header
}

// if nothing was previously finalised, set the first slot of the network to the
Expand Down Expand Up @@ -235,8 +232,14 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error {
return err
}

// the block will be deleted from the unfinalisedBlockMap in the pruning loop
// in `SetFinalisedHash()`, which calls this function
// delete from the unfinalisedBlockMap and delete reference to in-memory trie
block, has = bs.getAndDeleteUnfinalisedBlock(hash)
if !has {
continue
}

logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash)
bs.pruneKeyCh <- &block.Header
}

return batch.Flush()
Expand Down
1 change: 1 addition & 0 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (s *StorageState) SetSyncing(syncing bool) {
}

func (s *StorageState) pruneKey(keyHeader *types.Header) {
logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash())
s.tries.Delete(keyHeader.StateRoot)
}

Expand Down