From 94c4e7a515e12f8543d8373035125504859fbbb9 Mon Sep 17 00:00:00 2001 From: noot Date: Fri, 14 Jan 2022 14:24:04 -0500 Subject: [PATCH 1/9] actually prune finalized tries from memory; exchange BlockAnnounceHandshake on peer connect --- dot/network/service.go | 19 +++++++++++++++++++ dot/state/block_finalisation.go | 13 +++++++++++-- dot/state/storage.go | 1 + lib/blocktree/blocktree.go | 8 ++++---- 4 files changed, 35 insertions(+), 6 deletions(-) diff --git a/dot/network/service.go b/dot/network/service.go index 25464f4b02..404da18e7b 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -430,6 +430,25 @@ func (s *Service) sentBlockIntervalTelemetry() { func (s *Service) handleConn(conn libp2pnetwork.Conn) { // TODO: currently we only have one set so setID is 0, change this once we have more set in peerSet. s.host.cm.peerSetHandler.Incoming(0, conn.RemotePeer()) + + // exchange BlockAnnounceHandshake with peer so we can start to + // sync if necessary. + prtl, has := s.notificationsProtocols[BlockAnnounceMsgType] + if !has { + return + } + + hs, err := prtl.getHandshake() + if err != nil { + return + } + + stream, err := s.sendHandshake(conn.RemotePeer(), hs, prtl) + if err != nil { + return + } + + _ = stream.Close() } // Stop closes running instances of the host and network services as well as diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index 8557bf4721..0ad32b1aa7 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -235,8 +235,17 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error { return err } - // the block will be deleted from the unfinalisedBlockMap in the pruning loop - // in `SetFinalisedHash()`, which calls this function + // delete from the unfinalisedBlockMap and delete reference to in-memory trie + block, has = bs.getAndDeleteUnfinalisedBlock(hash) + if !has { + continue + } + + logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash) + + go func(header *types.Header) { + bs.pruneKeyCh <- header + }(&block.Header) } return batch.Flush() diff --git a/dot/state/storage.go b/dot/state/storage.go index 5b71fa4592..5e3c749efa 100644 --- a/dot/state/storage.go +++ b/dot/state/storage.go @@ -84,6 +84,7 @@ func (s *StorageState) SetSyncing(syncing bool) { } func (s *StorageState) pruneKey(keyHeader *types.Header) { + logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash()) s.tries.Delete(keyHeader.StateRoot) } diff --git a/lib/blocktree/blocktree.go b/lib/blocktree/blocktree.go index f3aaf88da7..87cdbb9a13 100644 --- a/lib/blocktree/blocktree.go +++ b/lib/blocktree/blocktree.go @@ -133,20 +133,20 @@ func (bt *BlockTree) getNode(h Hash) (ret *node) { // Prune sets the given hash as the new blocktree root, // removing all nodes that are not the new root node or its descendant // It returns an array of hashes that have been pruned -func (bt *BlockTree) Prune(finalised Hash) (pruned []Hash) { +func (bt *BlockTree) Prune(finalised Hash) []Hash { bt.Lock() defer bt.Unlock() if finalised == bt.root.hash { - return pruned + return []Hash{} } n := bt.getNode(finalised) if n == nil { - return pruned + return []Hash{} } - pruned = bt.root.prune(n, nil) + pruned := bt.root.prune(n, nil) bt.root = n bt.root.parent = nil From 53eea32970919c894708513b84bd0cc196cfee2b Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 17 Jan 2022 15:01:32 -0500 Subject: [PATCH 2/9] fix dot/network tests --- dot/network/helpers_test.go | 9 ++++++--- dot/network/notifications.go | 1 + dot/network/notifications_test.go | 11 +++++++++-- dot/network/service.go | 4 ++-- dot/network/service_test.go | 12 ++++++++---- 5 files changed, 26 insertions(+), 11 deletions(-) diff --git a/dot/network/helpers_test.go b/dot/network/helpers_test.go index dc374d6e50..7449deb5fa 100644 --- a/dot/network/helpers_test.go +++ b/dot/network/helpers_test.go @@ -19,14 +19,16 @@ import ( type testStreamHandler struct { messages map[peer.ID][]Message - decoder messageDecoder - exit bool + //handshakes map[peer.ID][]Message + decoder messageDecoder + exit bool } func newTestStreamHandler(decoder messageDecoder) *testStreamHandler { return &testStreamHandler{ messages: make(map[peer.ID][]Message), - decoder: decoder, + //handshakes: make(map[peer.ID][]Message), + decoder: decoder, } } @@ -44,6 +46,7 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) { func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error { msgs := s.messages[stream.Conn().RemotePeer()] s.messages[stream.Conn().RemotePeer()] = append(msgs, msg) + announceHandshake := &BlockAnnounceHandshake{ BestBlockNumber: 0, } diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 281006265f..84e5f32525 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -290,6 +290,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) { // message has already been sent + logger.Tracef("not sending message, already sent: peer=%s msg=%s", peer, msg) return } diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index d23b712533..946fbbdf19 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -274,13 +274,20 @@ func Test_HandshakeTimeout(t *testing.T) { } require.NoError(t, err) + // clear handshake data from connection handler + time.Sleep(time.Millisecond * 100) + info.outboundHandshakeData.Delete(nodeB.host.id()) + connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) + for _, stream := range connAToB[0].GetStreams() { + _ = stream.Close() + } + testHandshakeMsg := &BlockAnnounceHandshake{ Roles: 4, BestBlockNumber: 77, BestBlockHash: common.Hash{1}, GenesisHash: common.Hash{2}, } - nodeA.GossipMessage(testHandshakeMsg) info.outboundHandshakeMutexes.Store(nodeB.host.id(), new(sync.Mutex)) go nodeA.sendData(nodeB.host.id(), testHandshakeMsg, info, nil) @@ -292,7 +299,7 @@ func Test_HandshakeTimeout(t *testing.T) { require.False(t, ok) // a stream should be open until timeout - connAToB := nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) + connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id()) require.Len(t, connAToB, 1) require.Len(t, connAToB[0].GetStreams(), 1) diff --git a/dot/network/service.go b/dot/network/service.go index 404da18e7b..4f4cada0d5 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -443,12 +443,12 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) { return } - stream, err := s.sendHandshake(conn.RemotePeer(), hs, prtl) + _, err = s.sendHandshake(conn.RemotePeer(), hs, prtl) if err != nil { return } - _ = stream.Close() + // leave stream open if there's no error } // Stop closes running instances of the host and network services as well as diff --git a/dot/network/service_test.go b/dot/network/service_test.go index d15fdf93ee..3ce9aac8b5 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -100,7 +100,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { Port: availablePort(t), NoBootstrap: true, NoMDNS: true, - LogLvl: 4, + LogLvl: 5, SlotDuration: time.Second, } } @@ -285,6 +285,7 @@ func TestBroadcastDuplicateMessage(t *testing.T) { nodeB := createTestService(t, configB) nodeB.noGossip = true + // TODO: create a decoder that handles both handshakes and messages handler := newTestStreamHandler(testBlockAnnounceHandshakeDecoder) nodeB.host.registerStreamHandler(nodeB.host.protocolID+blockAnnounceID, handler.handleStream) @@ -313,14 +314,16 @@ func TestBroadcastDuplicateMessage(t *testing.T) { Digest: types.NewDigest(), } + delete(handler.messages, nodeA.host.id()) + // Only one message will be sent. for i := 0; i < 5; i++ { nodeA.GossipMessage(announceMessage) time.Sleep(time.Millisecond * 10) } - time.Sleep(time.Millisecond * 200) - require.Equal(t, 1, len(handler.messages[nodeA.host.id()])) + time.Sleep(time.Millisecond * 500) + require.Equal(t, 2, len(handler.messages[nodeA.host.id()])) nodeA.host.messageCache = nil @@ -329,7 +332,8 @@ func TestBroadcastDuplicateMessage(t *testing.T) { nodeA.GossipMessage(announceMessage) time.Sleep(time.Millisecond * 10) } - require.Equal(t, 6, len(handler.messages[nodeA.host.id()])) + + require.Equal(t, 7, len(handler.messages[nodeA.host.id()])) } func TestService_NodeRoles(t *testing.T) { From fe4eb40795fe2f16e59fd1230f4a70d8bb3b83b5 Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 17 Jan 2022 15:01:47 -0500 Subject: [PATCH 3/9] revert blocktree changes --- lib/blocktree/blocktree.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/blocktree/blocktree.go b/lib/blocktree/blocktree.go index 87cdbb9a13..f3aaf88da7 100644 --- a/lib/blocktree/blocktree.go +++ b/lib/blocktree/blocktree.go @@ -133,20 +133,20 @@ func (bt *BlockTree) getNode(h Hash) (ret *node) { // Prune sets the given hash as the new blocktree root, // removing all nodes that are not the new root node or its descendant // It returns an array of hashes that have been pruned -func (bt *BlockTree) Prune(finalised Hash) []Hash { +func (bt *BlockTree) Prune(finalised Hash) (pruned []Hash) { bt.Lock() defer bt.Unlock() if finalised == bt.root.hash { - return []Hash{} + return pruned } n := bt.getNode(finalised) if n == nil { - return []Hash{} + return pruned } - pruned := bt.root.prune(n, nil) + pruned = bt.root.prune(n, nil) bt.root = n bt.root.parent = nil From 8a6b7fad06e5c101f6bb8a1e01e9579b6aba7ef4 Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 17 Jan 2022 15:04:46 -0500 Subject: [PATCH 4/9] log errors in conn handler --- dot/network/service.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dot/network/service.go b/dot/network/service.go index 4f4cada0d5..55b034ea00 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -440,11 +440,19 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) { hs, err := prtl.getHandshake() if err != nil { + logger.Warnf("failed to get handshake for protocol %s: %s", + prtl.protocolID, + err, + ) return } _, err = s.sendHandshake(conn.RemotePeer(), hs, prtl) if err != nil { + logger.Debugf("failed to send handshake to peer on connection, peer=%s: %s", + conn.RemotePeer(), + err, + ) return } From 217a7099ba6e47fda754a3926ced828810ba7b00 Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 17 Jan 2022 15:06:44 -0500 Subject: [PATCH 5/9] make write to pruneKeyCh synchronous --- dot/state/block_finalisation.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index 0ad32b1aa7..eed7fafa91 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -152,10 +152,7 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er } logger.Tracef("pruned block number %s with hash %s", block.Header.Number, hash) - - go func(header *types.Header) { - bs.pruneKeyCh <- header - }(&block.Header) + bs.pruneKeyCh <- &block.Header } // if nothing was previously finalised, set the first slot of the network to the From 195a877c6d20aa2da54f0b8e45d61903fd9ecb5d Mon Sep 17 00:00:00 2001 From: noot Date: Mon, 17 Jan 2022 20:22:38 -0500 Subject: [PATCH 6/9] cleanup --- dot/network/helpers_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dot/network/helpers_test.go b/dot/network/helpers_test.go index 7449deb5fa..e063c82717 100644 --- a/dot/network/helpers_test.go +++ b/dot/network/helpers_test.go @@ -19,16 +19,14 @@ import ( type testStreamHandler struct { messages map[peer.ID][]Message - //handshakes map[peer.ID][]Message - decoder messageDecoder - exit bool + decoder messageDecoder + exit bool } func newTestStreamHandler(decoder messageDecoder) *testStreamHandler { return &testStreamHandler{ messages: make(map[peer.ID][]Message), - //handshakes: make(map[peer.ID][]Message), - decoder: decoder, + decoder: decoder, } } From c51414c1fa2d66c48d509bc0b3861f7b46a28972 Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 18 Jan 2022 15:48:04 -0500 Subject: [PATCH 7/9] update logs --- dot/network/notifications.go | 3 +-- dot/network/service.go | 2 +- dot/network/service_test.go | 3 ++- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 84e5f32525..a3b0006557 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -289,8 +289,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc } if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) { - // message has already been sent - logger.Tracef("not sending message, already sent: peer=%s msg=%s", peer, msg) + logger.Tracef("message has already been sent, ignoring: peer=%s msg=%s", peer, msg) return } diff --git a/dot/network/service.go b/dot/network/service.go index 55b034ea00..d266aad6dd 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -449,7 +449,7 @@ func (s *Service) handleConn(conn libp2pnetwork.Conn) { _, err = s.sendHandshake(conn.RemotePeer(), hs, prtl) if err != nil { - logger.Debugf("failed to send handshake to peer on connection, peer=%s: %s", + logger.Debugf("failed to send handshake to peer %s on connection: %s", conn.RemotePeer(), err, ) diff --git a/dot/network/service_test.go b/dot/network/service_test.go index 3ce9aac8b5..d695545f55 100644 --- a/dot/network/service_test.go +++ b/dot/network/service_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/utils" ) @@ -100,7 +101,7 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) { Port: availablePort(t), NoBootstrap: true, NoMDNS: true, - LogLvl: 5, + LogLvl: log.Warn, SlotDuration: time.Second, } } From b1e9f9c0de910b27b9d316ae8f09f35fb16182ff Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 19 Jan 2022 12:34:30 -0500 Subject: [PATCH 8/9] fix --- dot/state/block_finalisation.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index eed7fafa91..31e242103f 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -239,10 +239,7 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error { } logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash) - - go func(header *types.Header) { - bs.pruneKeyCh <- header - }(&block.Header) + bs.pruneKeyCh <- header } return batch.Flush() From 3ffbe68e76b075b9bc3cd9d629e39ed8038f04fd Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 19 Jan 2022 12:35:08 -0500 Subject: [PATCH 9/9] fix --- dot/state/block_finalisation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index 31e242103f..ac19f469bc 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -239,7 +239,7 @@ func (bs *BlockState) handleFinalisedBlock(curr common.Hash) error { } logger.Tracef("cleaned out finalised block from memory; block number %s with hash %s", block.Header.Number, hash) - bs.pruneKeyCh <- header + bs.pruneKeyCh <- &block.Header } return batch.Flush()