diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 8de65adf4e41..1ba65008aba3 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -58,6 +58,7 @@ type PeerManager interface { PeerID() peer.ID RefreshENR(epoch uint64) FindPeersWithSubnet(index uint64) (bool, error) + AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) } // Sender abstracts the sending functionality from libp2p. diff --git a/beacon-chain/p2p/sender.go b/beacon-chain/p2p/sender.go index 89677e617521..d24302056e53 100644 --- a/beacon-chain/p2p/sender.go +++ b/beacon-chain/p2p/sender.go @@ -13,10 +13,10 @@ import ( // Send a message to a specific peer. The returned stream may be used for reading, but has been // closed for writing. -func (s *Service) Send(ctx context.Context, message interface{}, topic string, pid peer.ID) (network.Stream, error) { +func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error) { ctx, span := trace.StartSpan(ctx, "p2p.Send") defer span.End() - topic = topic + s.Encoding().ProtocolSuffix() + topic := baseTopic + s.Encoding().ProtocolSuffix() span.AddAttributes(trace.StringAttribute("topic", topic)) // TTFB_TIME (5s) + RESP_TIMEOUT (10s). @@ -38,11 +38,13 @@ func (s *Service) Send(ctx context.Context, message interface{}, topic string, p return nil, err } // do not encode anything if we are sending a metadata request - if topic != RPCMetaDataTopic { - if _, err := s.Encoding().EncodeWithLength(stream, message); err != nil { - traceutil.AnnotateError(span, err) - return nil, err - } + if baseTopic == RPCMetaDataTopic { + return stream, nil + } + + if _, err := s.Encoding().EncodeWithLength(stream, message); err != nil { + traceutil.AnnotateError(span, err) + return nil, err } // Close stream for writing. diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 377de3509963..f2fd6caacd3c 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -77,6 +77,7 @@ type Service struct { genesisValidatorsRoot []byte metaData *pb.MetaData stateNotifier statefeed.Notifier + pingMethod func(ctx context.Context, id peer.ID) error } // NewService initializes a new p2p service compatible with shared.Service interface. No @@ -383,6 +384,8 @@ func (s *Service) RefreshENR(epoch uint64) { return } s.updateSubnetRecordWithMetadata(bitV) + // ping all peers to inform them of new metadata + s.pingPeers() } // FindPeersWithSubnet performs a network search for peers @@ -429,6 +432,25 @@ func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) { return exists, nil } +// AddPingMethod adds the metadata ping rpc method to the p2p service, so that it can +// be used to refresh ENR. +func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) { + s.pingMethod = reqFunc +} + +func (s *Service) pingPeers() { + if s.pingMethod == nil { + return + } + for _, pid := range s.peers.Connected() { + go func(id peer.ID) { + if err := s.pingMethod(s.ctx, id); err != nil { + log.WithField("peer", id).WithError(err).Error("Failed to ping peer") + } + }(pid) + } +} + // Waits for the beacon state to be initialized, important // for initializing the p2p service as p2p needs to be aware // of genesis information for peering. diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index c1d300d72c1f..40435de7ad37 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -263,3 +263,8 @@ func (p *TestP2P) Metadata() *pb.MetaData { func (p *TestP2P) MetadataSeq() uint64 { return p.LocalMetadata.SeqNumber } + +// AddPingMethod mocks the p2p func. +func (p *TestP2P) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) { + // no-op +} diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index 634d5403b0fa..20a26d28d1cc 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -33,6 +33,10 @@ func (r *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta if err != nil { return nil, err } + // we close the stream outside of `send` because + // metadata requests send no payload, so closing the + // stream early leads it to a reset. + defer stream.Close() code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding()) if err != nil { return nil, err diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 36fa80fca35a..bc14e292940e 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -136,6 +136,7 @@ func (r *Service) Start() { r.p2p.AddConnectionHandler(r.reValidatePeer) r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus) + r.p2p.AddPingMethod(r.sendPingRequest) r.processPendingBlocksQueue() r.processPendingAttsQueue() r.maintainPeerStatuses() diff --git a/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go b/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go index 6568aa5a2dcf..39dfe999aff2 100644 --- a/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/sliceutil" ) func (r *Service) committeeIndexBeaconAttestationSubscriber(ctx context.Context, msg proto.Message) error { @@ -56,7 +57,7 @@ func (r *Service) aggregatorCommitteeIndices(currentSlot uint64) []uint64 { for i := currentSlot; i <= endSlot; i++ { commIds = append(commIds, cache.CommitteeIDs.GetAggregatorCommitteeIDs(i)...) } - return commIds + return sliceutil.SetUint64(commIds) } func (r *Service) attesterCommitteeIndices(currentSlot uint64) []uint64 { @@ -66,5 +67,5 @@ func (r *Service) attesterCommitteeIndices(currentSlot uint64) []uint64 { for i := currentSlot; i <= endSlot; i++ { commIds = append(commIds, cache.CommitteeIDs.GetAttesterCommitteeIDs(i)...) } - return commIds + return sliceutil.SetUint64(commIds) }