diff --git a/go.mod b/go.mod index 4e3da09e99..a2dc634e0b 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,11 @@ require ( lukechampine.com/blake3 v1.1.7 // indirect ) +require ( + github.com/0xPolygon/go-ibft v0.0.0-20220810095021-e43142f8d267 + go.uber.org/atomic v1.10.0 +) + require ( github.com/0xPolygon/go-ibft v0.0.0-20220810095021-e43142f8d267 go.uber.org/atomic v1.10.0 diff --git a/network/discovery/discovery.go b/network/discovery/discovery.go index d200dca03e..2d5e899c3f 100644 --- a/network/discovery/discovery.go +++ b/network/discovery/discovery.go @@ -76,6 +76,9 @@ type networkingServer interface { // RemoveTemporaryDial removes a peer from the temporary dial map RemoveTemporaryDial(peerID peer.ID) + // TemporaryDialPeer dials the peer temporarily + TemporaryDialPeer(peerAddrInfo *peer.AddrInfo) + // CONNECTION INFORMATION // // HasFreeConnectionSlot checks if there is an available connection slot for the set direction [Thread safe] @@ -300,6 +303,7 @@ func (d *DiscoveryService) regularPeerDiscovery() { return } + d.logger.Debug("running regular peer discovery", "peer", peerID.String()) // Try to discover the peers connected to the reference peer if err := d.attemptToFindPeers(*peerID); err != nil { d.logger.Error( @@ -312,7 +316,7 @@ func (d *DiscoveryService) regularPeerDiscovery() { } } -// bootnodeDiscovery queries a random (unconnected) bootnode for new peers +// bootnodePeerDiscovery queries a random (unconnected) bootnode for new peers // and adds them to the routing table func (d *DiscoveryService) bootnodePeerDiscovery() { if !d.baseServer.HasFreeConnectionSlot(network.DirOutbound) { @@ -331,7 +335,6 @@ func (d *DiscoveryService) bootnodePeerDiscovery() { // Get a random unconnected bootnode from the bootnode set bootnode = d.baseServer.GetRandomBootnode() if bootnode == nil { - // No bootnodes available return } @@ -361,10 +364,16 @@ func (d *DiscoveryService) bootnodePeerDiscovery() { } }() + // Make sure we are peered with a bootnode + d.baseServer.TemporaryDialPeer(bootnode) + // Find peers from the referenced bootnode foundNodes, err := d.findPeersCall(bootnode.ID, true) if err != nil { - d.logger.Error("Unable to execute bootnode peer discovery, %w", err) + d.logger.Error("Unable to execute bootnode peer discovery", + "bootnode", bootnode.ID.String(), + "err", err.Error(), + ) return } diff --git a/network/server.go b/network/server.go index 9d1754261b..b181313b7d 100644 --- a/network/server.go +++ b/network/server.go @@ -46,8 +46,8 @@ const ( DefaultLibp2pPort int = 1478 - MinimumPeerConnections int64 = 1 MinimumBootNodes int = 1 + MinimumPeerConnections int64 = 1 ) var ( @@ -262,7 +262,7 @@ func (s *Server) Start() error { } go s.runDial() - go s.checkPeerConnections() + go s.keepAliveMinimumPeerConnections() // watch for disconnected peers s.host.Network().Notify(&network.NotifyBundle{ @@ -318,8 +318,9 @@ func (s *Server) setupBootnodes() error { return nil } -// checkPeerCount will attempt to make new connections if the active peer count is lesser than the specified limit. -func (s *Server) checkPeerConnections() { +// keepAliveMinimumPeerConnections will attempt to make new connections +// if the active peer count is lesser than the specified limit. +func (s *Server) keepAliveMinimumPeerConnections() { for { select { case <-time.After(10 * time.Second): @@ -329,10 +330,16 @@ func (s *Server) checkPeerConnections() { if s.numPeers() < MinimumPeerConnections { if s.config.NoDiscover || !s.bootnodes.hasBootnodes() { - // TODO: dial peers from the peerstore + // dial unconnected peer + randPeer := s.GetRandomPeer() + if randPeer != nil && !s.IsConnected(*randPeer) { + s.addToDialQueue(s.GetPeerInfo(*randPeer), common.PriorityRandomDial) + } } else { - randomNode := s.GetRandomBootnode() - s.addToDialQueue(randomNode, common.PriorityRandomDial) + // dial random unconnected bootnode + if randomNode := s.GetRandomBootnode(); randomNode != nil { + s.addToDialQueue(randomNode, common.PriorityRandomDial) + } } } } @@ -397,7 +404,7 @@ func (s *Server) runDial() { // the connection process is async because it involves connection (here) + // the handshake done in the identity service. if err := s.host.Connect(context.Background(), *peerInfo); err != nil { - s.logger.Debug("failed to dial", "addr", peerInfo.String(), "err", err) + s.logger.Debug("failed to dial", "addr", peerInfo.String(), "err", err.Error()) s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect) } diff --git a/network/server_discovery.go b/network/server_discovery.go index 605881ce0f..5d2475b63a 100644 --- a/network/server_discovery.go +++ b/network/server_discovery.go @@ -2,7 +2,6 @@ package network import ( "crypto/rand" - "errors" "fmt" "math/big" "time" @@ -17,10 +16,6 @@ import ( rawGrpc "google.golang.org/grpc" ) -var ( - errPeerDisconnected = errors.New("peer disconnected before the discovery client was initialized") -) - // GetRandomBootnode fetches a random bootnode that's currently // NOT connected, if any func (s *Server) GetRandomBootnode() *peer.AddrInfo { @@ -69,7 +64,8 @@ func (s *Server) NewDiscoveryClient(peerID peer.ID) (proto.DiscoveryClient, erro // Check if there is a peer connection at this point in time, // as there might have been a disconnection previously if !s.IsConnected(peerID) && !isTemporaryDial { - return nil, errPeerDisconnected + return nil, fmt.Errorf("could not initialize new discovery client - peer [%s] not connected", + peerID.String()) } // Check if there is an active stream connection already @@ -93,7 +89,7 @@ func (s *Server) NewDiscoveryClient(peerID peer.ID) (proto.DiscoveryClient, erro return proto.NewDiscoveryClient(protoStream), nil } -// saveProtocolStream saves the protocol stream to the peer +// SaveProtocolStream saves the protocol stream to the peer // protocol stream reference [Thread safe] func (s *Server) SaveProtocolStream( protocol string, @@ -246,6 +242,11 @@ func (s *Server) setupDiscovery() error { return nil } +func (s *Server) TemporaryDialPeer(peerAddrInfo *peer.AddrInfo) { + s.logger.Debug("creating new temporary dial to peer", "peer", peerAddrInfo.ID) + s.addToDialQueue(peerAddrInfo, common.PriorityRandomDial) +} + // registerDiscoveryService registers the discovery protocol to be available func (s *Server) registerDiscoveryService(discovery *discovery.DiscoveryService) { grpcStream := grpc.NewGrpcStream() diff --git a/network/testing/testing.go b/network/testing/testing.go index 6a1b2999c4..01d57176f4 100644 --- a/network/testing/testing.go +++ b/network/testing/testing.go @@ -42,6 +42,7 @@ type MockNetworkingServer struct { getRandomPeerFn getRandomPeerDelegate fetchAndSetTemporaryDialFn fetchAndSetTemporaryDialDelegate removeTemporaryDialFn removeTemporaryDialDelegate + temporaryDialPeerFn temporaryDialPeerDelegate } func NewMockNetworkingServer() *MockNetworkingServer { @@ -85,6 +86,17 @@ type getPeerInfoDelegate func(peer.ID) *peer.AddrInfo type getRandomPeerDelegate func() *peer.ID type fetchAndSetTemporaryDialDelegate func(peer.ID, bool) bool type removeTemporaryDialDelegate func(peer.ID) +type temporaryDialPeerDelegate func(peerAddrInfo *peer.AddrInfo) + +func (m *MockNetworkingServer) TemporaryDialPeer(peerAddrInfo *peer.AddrInfo) { + if m.temporaryDialPeerFn != nil { + m.temporaryDialPeerFn(peerAddrInfo) + } +} + +func (m *MockNetworkingServer) HookTemporaryDialPeer(fn temporaryDialPeerDelegate) { + m.temporaryDialPeerFn = fn +} func (m *MockNetworkingServer) NewIdentityClient(peerID peer.ID) (proto.IdentityClient, error) { if m.newIdentityClientFn != nil {