Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Networking Fixes #5349

Merged
merged 4 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type PeerManager interface {
PeerID() peer.ID
RefreshENR(epoch uint64)
FindPeersWithSubnet(index uint64) (bool, error)
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}

// Sender abstracts the sending functionality from libp2p.
Expand Down
16 changes: 9 additions & 7 deletions beacon-chain/p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (

// Send a message to a specific peer. The returned stream may be used for reading, but has been
// closed for writing.
func (s *Service) Send(ctx context.Context, message interface{}, topic string, pid peer.ID) (network.Stream, error) {
func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error) {
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
topic = topic + s.Encoding().ProtocolSuffix()
topic := baseTopic + s.Encoding().ProtocolSuffix()
span.AddAttributes(trace.StringAttribute("topic", topic))

// TTFB_TIME (5s) + RESP_TIMEOUT (10s).
Expand All @@ -38,11 +38,13 @@ func (s *Service) Send(ctx context.Context, message interface{}, topic string, p
return nil, err
}
// do not encode anything if we are sending a metadata request
if topic != RPCMetaDataTopic {
if _, err := s.Encoding().EncodeWithLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
}
if baseTopic == RPCMetaDataTopic {
return stream, nil
}

if _, err := s.Encoding().EncodeWithLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
}

// Close stream for writing.
Expand Down
22 changes: 22 additions & 0 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Service struct {
genesisValidatorsRoot []byte
metaData *pb.MetaData
stateNotifier statefeed.Notifier
pingMethod func(ctx context.Context, id peer.ID) error
}

// NewService initializes a new p2p service compatible with shared.Service interface. No
Expand Down Expand Up @@ -383,6 +384,8 @@ func (s *Service) RefreshENR(epoch uint64) {
return
}
s.updateSubnetRecordWithMetadata(bitV)
// ping all peers to inform them of new metadata
s.pingPeers()
}

// FindPeersWithSubnet performs a network search for peers
Expand Down Expand Up @@ -429,6 +432,25 @@ func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
return exists, nil
}

// AddPingMethod adds the metadata ping rpc method to the p2p service, so that it can
// be used to refresh ENR.
func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) {
s.pingMethod = reqFunc
}

func (s *Service) pingPeers() {
if s.pingMethod == nil {
return
}
for _, pid := range s.peers.Connected() {
go func(id peer.ID) {
if err := s.pingMethod(s.ctx, id); err != nil {
log.WithField("peer", id).WithError(err).Error("Failed to ping peer")
}
}(pid)
}
}

// Waits for the beacon state to be initialized, important
// for initializing the p2p service as p2p needs to be aware
// of genesis information for peering.
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,8 @@ func (p *TestP2P) Metadata() *pb.MetaData {
func (p *TestP2P) MetadataSeq() uint64 {
return p.LocalMetadata.SeqNumber
}

// AddPingMethod mocks the p2p func.
func (p *TestP2P) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) {
// no-op
}
4 changes: 4 additions & 0 deletions beacon-chain/sync/rpc_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (r *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta
if err != nil {
return nil, err
}
// we close the stream outside of `send` because
// metadata requests send no payload, so closing the
// stream early leads it to a reset.
defer stream.Close()
code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding())
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (r *Service) Start() {

r.p2p.AddConnectionHandler(r.reValidatePeer)
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
r.p2p.AddPingMethod(r.sendPingRequest)
r.processPendingBlocksQueue()
r.processPendingAttsQueue()
r.maintainPeerStatuses()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
)

func (r *Service) committeeIndexBeaconAttestationSubscriber(ctx context.Context, msg proto.Message) error {
Expand Down Expand Up @@ -56,7 +57,7 @@ func (r *Service) aggregatorCommitteeIndices(currentSlot uint64) []uint64 {
for i := currentSlot; i <= endSlot; i++ {
commIds = append(commIds, cache.CommitteeIDs.GetAggregatorCommitteeIDs(i)...)
}
return commIds
return sliceutil.SetUint64(commIds)
}

func (r *Service) attesterCommitteeIndices(currentSlot uint64) []uint64 {
Expand All @@ -66,5 +67,5 @@ func (r *Service) attesterCommitteeIndices(currentSlot uint64) []uint64 {
for i := currentSlot; i <= endSlot; i++ {
commIds = append(commIds, cache.CommitteeIDs.GetAttesterCommitteeIDs(i)...)
}
return commIds
return sliceutil.SetUint64(commIds)
}