diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index a34e42de584d..685d8c18ef49 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "time" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -15,7 +14,8 @@ import ( // AddConnectionHandler adds a callback function which handles the connection with a // newly added peer. It performs a handshake with that peer by sending a hello request // and validating the response from the peer. -func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error) { +func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error, + goodbyeFunc func(ctx context.Context, id peer.ID) error) { s.host.Network().Notify(&network.NotifyBundle{ ConnectedF: func(net network.Network, conn network.Conn) { log := log.WithField("peer", conn.RemotePeer().Pretty()) @@ -28,10 +28,15 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer } s.peers.Add(nil /* ENR */, conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction) if len(s.peers.Active()) >= int(s.cfg.MaxPeers) { - log.WithField("reason", "at peer limit").Trace("Ignoring connection request") - if err := s.Disconnect(conn.RemotePeer()); err != nil { - log.WithError(err).Error("Unable to disconnect from peer") - } + go func() { + log.WithField("reason", "at peer limit").Trace("Ignoring connection request") + if err := goodbyeFunc(context.Background(), conn.RemotePeer()); err != nil { + log.WithError(err).Error("Unable to send goodbye message to peer") + } + if err := s.Disconnect(conn.RemotePeer()); err != nil { + log.WithError(err).Error("Unable to disconnect from peer") + } + }() return } if s.peers.IsBad(conn.RemotePeer()) { @@ -52,9 +57,7 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer "activePeers": len(s.peers.Active()), }) s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := reqFunc(ctx, conn.RemotePeer()); err != nil && err != io.EOF { + if err := reqFunc(context.Background(), conn.RemotePeer()); err != nil && err != io.EOF { log.WithError(err).Trace("Handshake failed") if err.Error() == "protocol not supported" { // This is only to ensure the smooth running of our testnets. This will not be diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 953ca45f25b9..155469d9c9f0 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -37,7 +37,7 @@ type SetStreamHandler interface { // ConnectionHandler configures p2p to handle connections with a peer. type ConnectionHandler interface { - AddConnectionHandler(f func(ctx context.Context, id peer.ID) error) + AddConnectionHandler(f func(ctx context.Context, id peer.ID) error, g func(context.Context, peer.ID) error) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) } diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 5885d9b3f4d0..3a0f5593041a 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -168,7 +168,8 @@ func (p *TestP2P) PeerID() peer.ID { } // AddConnectionHandler handles the connection with a newly connected peer. -func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) error) { +func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) error, + g func(context.Context, peer.ID) error) { p.Host.Network().Notify(&network.NotifyBundle{ ConnectedF: func(net network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. diff --git a/beacon-chain/sync/rpc_goodbye.go b/beacon-chain/sync/rpc_goodbye.go index 8c31b212a7c2..4ed735521c37 100644 --- a/beacon-chain/sync/rpc_goodbye.go +++ b/beacon-chain/sync/rpc_goodbye.go @@ -6,6 +6,8 @@ import ( "time" libp2pcore "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p" ) const ( @@ -41,6 +43,24 @@ func (r *Service) goodbyeRPCHandler(ctx context.Context, msg interface{}, stream return r.p2p.Disconnect(stream.Conn().RemotePeer()) } +func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.ID) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + stream, err := r.p2p.Send(ctx, &code, p2p.RPCGoodByeTopic, id) + if err != nil { + return err + } + log := log.WithField("Reason", goodbyeMessage(code)) + log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer") + return nil +} + +// sends a goodbye message for a generic error +func (r *Service) sendGenericGoodbyeMessage(ctx context.Context, id peer.ID) error { + return r.sendGoodByeMessage(ctx, codeGenericError, id) +} + func goodbyeMessage(num uint64) string { reason, ok := goodByes[num] if ok { diff --git a/beacon-chain/sync/rpc_goodbye_test.go b/beacon-chain/sync/rpc_goodbye_test.go index 95e1d95ff34b..934c6c8ff22a 100644 --- a/beacon-chain/sync/rpc_goodbye_test.go +++ b/beacon-chain/sync/rpc_goodbye_test.go @@ -58,3 +58,52 @@ func TestGoodByeRPCHandler_Disconnects_With_Peer(t *testing.T) { t.Error("Peer is still not disconnected despite sending a goodbye message") } } + +func TestSendGoodbye_SendsMessage(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + if len(p1.Host.Network().Peers()) != 1 { + t.Error("Expected peers to be connected") + } + + // Set up a head state in the database with data we expect. + d := db.SetupDB(t) + defer db.TeardownDB(t, d) + + r := &Service{ + db: d, + p2p: p1, + } + failureCode := codeClientShutdown + + // Setup streams + pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz") + var wg sync.WaitGroup + wg.Add(1) + p2.Host.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + out := new(uint64) + if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + t.Fatal(err) + } + if *out != failureCode { + t.Fatalf("Wanted goodbye code of %d but got %d", failureCode, *out) + } + + }) + + err := r.sendGoodByeMessage(context.Background(), failureCode, p2.Host.ID()) + if err != nil { + t.Errorf("Unxpected error: %v", err) + } + + if testutil.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + + conns := p1.Host.Network().ConnsToPeer(p1.Host.ID()) + if len(conns) > 0 { + t.Error("Peer is still not disconnected despite sending a goodbye message") + } +} diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index d4d99b45b1d9..dc3ef54d92bd 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -18,7 +18,7 @@ func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2 log.WithError(err).Error("Failed to close stream") } }() - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + _, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() setRPCStreamDeadlines(stream) diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index c9a81e1e0ba8..d6338d094def 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -384,7 +384,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) { } }) - p1.AddConnectionHandler(r.sendRPCStatusRequest) + p1.AddConnectionHandler(r.sendRPCStatusRequest, r.sendGenericGoodbyeMessage) p1.Connect(p2) if testutil.WaitTimeout(&wg, 1*time.Second) { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 362d1f74ddee..95a0af848e74 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -135,7 +135,7 @@ func (r *Service) Start() { panic(err) } - r.p2p.AddConnectionHandler(r.reValidatePeer) + r.p2p.AddConnectionHandler(r.reValidatePeer, r.sendGenericGoodbyeMessage) r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus) r.p2p.AddPingMethod(r.sendPingRequest) r.processPendingBlocksQueue()