-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gossipsub v1.1: prune peer exchange #234
Changes from all commits
be049a5
9d280a2
f68a02b
e2ebf99
60f6b2f
152ebc5
a9fdc41
9519116
47b221d
f42ce48
0e5a1ed
1a261fe
7c03aa0
895bcf6
8186906
22faa75
c1831f0
6a9e6d8
082b8b9
7a80d74
c65bd30
e3bd9fa
1e152d2
a198f5f
3d943c9
2066fcd
480e48f
e86314f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,12 +9,16 @@ import ( | |
pb "github.com/libp2p/go-libp2p-pubsub/pb" | ||
|
||
"github.com/libp2p/go-libp2p-core/host" | ||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/libp2p/go-libp2p-core/peerstore" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
"github.com/libp2p/go-libp2p-core/record" | ||
) | ||
|
||
const ( | ||
GossipSubID = protocol.ID("/meshsub/1.0.0") | ||
GossipSubID_v10 = protocol.ID("/meshsub/1.0.0") | ||
GossipSubID_v11 = protocol.ID("/meshsub/1.1.0") | ||
) | ||
|
||
var ( | ||
|
@@ -33,6 +37,21 @@ var ( | |
|
||
// fanout ttl | ||
GossipSubFanoutTTL = 60 * time.Second | ||
|
||
// number of peers to include in prune Peer eXchange | ||
GossipSubPrunePeers = 16 | ||
|
||
// backoff time for pruned peers | ||
GossipSubPruneBackoff = time.Minute | ||
|
||
// number of active connection attempts for peers obtained through px | ||
GossipSubConnectors = 16 | ||
|
||
// maximum number of pending connections for peers attempted through px | ||
GossipSubMaxPendingConnections = 1024 | ||
|
||
// timeout for connection attempts | ||
GossipSubConnectionTimeout = 30 * time.Second | ||
) | ||
|
||
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router. | ||
|
@@ -44,6 +63,8 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er | |
lastpub: make(map[string]int64), | ||
gossip: make(map[peer.ID][]*pb.ControlIHave), | ||
control: make(map[peer.ID]*pb.ControlMessage), | ||
backoff: make(map[string]map[peer.ID]time.Time), | ||
connect: make(chan connectInfo, GossipSubMaxPendingConnections), | ||
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), | ||
} | ||
return NewPubSub(ctx, h, rt, opts...) | ||
|
@@ -58,18 +79,25 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er | |
// messages to their topic for GossipSubFanoutTTL. | ||
type GossipSubRouter struct { | ||
p *PubSub | ||
peers map[peer.ID]protocol.ID // peer protocols | ||
mesh map[string]map[peer.ID]struct{} // topic meshes | ||
fanout map[string]map[peer.ID]struct{} // topic fanout | ||
lastpub map[string]int64 // last publish time for fanout topics | ||
gossip map[peer.ID][]*pb.ControlIHave // pending gossip | ||
control map[peer.ID]*pb.ControlMessage // pending control messages | ||
peers map[peer.ID]protocol.ID // peer protocols | ||
mesh map[string]map[peer.ID]struct{} // topic meshes | ||
fanout map[string]map[peer.ID]struct{} // topic fanout | ||
lastpub map[string]int64 // last publish time for fanout topics | ||
gossip map[peer.ID][]*pb.ControlIHave // pending gossip | ||
control map[peer.ID]*pb.ControlMessage // pending control messages | ||
backoff map[string]map[peer.ID]time.Time // prune backoff | ||
vyzo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
connect chan connectInfo // px connection requests | ||
mcache *MessageCache | ||
tracer *pubsubTracer | ||
} | ||
|
||
type connectInfo struct { | ||
p peer.ID | ||
spr *record.Envelope | ||
} | ||
|
||
func (gs *GossipSubRouter) Protocols() []protocol.ID { | ||
return []protocol.ID{GossipSubID, FloodSubID} | ||
return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID} | ||
} | ||
|
||
func (gs *GossipSubRouter) Attach(p *PubSub) { | ||
|
@@ -78,6 +106,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) { | |
// start using the same msg ID function as PubSub for caching messages. | ||
gs.mcache.SetMsgIdFn(p.msgID) | ||
go gs.heartbeatTimer() | ||
for i := 0; i < GossipSubConnectors; i++ { | ||
go gs.connector() | ||
} | ||
} | ||
|
||
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { | ||
|
@@ -226,7 +257,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. | |
|
||
cprune := make([]*pb.ControlPrune, 0, len(prune)) | ||
for _, topic := range prune { | ||
cprune = append(cprune, &pb.ControlPrune{TopicID: &topic}) | ||
cprune = append(cprune, gs.makePrune(p, topic)) | ||
} | ||
|
||
return cprune | ||
|
@@ -241,6 +272,103 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { | |
gs.tracer.Prune(p, topic) | ||
delete(peers, p) | ||
gs.untagPeer(p, topic) | ||
gs.addBackoff(p, topic) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 great! However, we're not currently doing anything about a peer continuously chasing away other peers by sending a GRAFT. If during a GRAFT we could check if we're already over the limit and send pack a PRUNE that would largely resolve this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't want to do that because we will reject all new peers and they may not be able to form a connected mesh. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to be clear, sending a PRUNE if we are over the limit is the wrong thing to do, because then the mesh can become full and fail to accept new peers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get that it used to be that way, but is that still true now that we have peer exchange? If I send back a PRUNE only when I have >Dhi peers and I prune (Dhi-D) peers and tell them about each other won't they always be able to join the mesh? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They probably will be able to do that, but I'd rather reshuffle the mesh in its entirety instead of rejecting new GRAFTs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Think about a fully connected mesh where all peers are at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, so IIUC this is really a timing problem right? Because if B is connected to C and then A tries to connect to B it's possible that when B PRUNEs A and C that A will send a GRAFT to C before C receives the PRUNE from B. My concern with the current approach is that if A wants to GRAFT to B it can always keep spamming B and eventually it will get through, even though B has given A plenty of other peers to connect to. This is annoying since if a peer is "better" in some way (e.g. they are the primary publisher) then nodes might be selfish, however it's certainly not a deal-breaker and wasn't even resolvable until the Peer Exchange changes. Since fixing this would likely require some protocol thought and is less important than the rest of the episub work, seems reasonable to push this further down the road. Would you like me to create an issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could be an attack -- I don't think it can happen naturally. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, we can create an issue to discuss further instead of it being lost in oblivion after the pr merges. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a recent win here! The decisions of which peers to retain and which to eject is now performed by evaluating the peer's score! 🎉 |
||
px := prune.GetPeers() | ||
if len(px) > 0 { | ||
gs.pxConnect(px) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have two concerns:
Ideally, we'd stick these peers in a set of known peers for the topic, connecting to them as-needed whenever we're pruned or we disconnect from a peer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For 2 I added a check that limits the number of connections to at most There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. SGTM. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I slept on it and there is a DoS vector: A malicious peer could send us in sequence GRAFT/PRUNE and cause us to spawn a goroutine; it could simply be sitting there sending GRAFT/PRUNE ad infinum, causing us to spawn a goroutine for each pair. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented a connection scheduler, which limits the number of max pending connections too. |
||
} | ||
} | ||
} | ||
} | ||
|
||
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { | ||
backoff, ok := gs.backoff[topic] | ||
if !ok { | ||
backoff = make(map[peer.ID]time.Time) | ||
gs.backoff[topic] = backoff | ||
} | ||
backoff[p] = time.Now().Add(GossipSubPruneBackoff) | ||
} | ||
|
||
func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) { | ||
if len(peers) > GossipSubPrunePeers { | ||
shufflePeerInfo(peers) | ||
peers = peers[:GossipSubPrunePeers] | ||
vyzo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
toconnect := make([]connectInfo, 0, len(peers)) | ||
|
||
for _, pi := range peers { | ||
p := peer.ID(pi.PeerID) | ||
|
||
_, connected := gs.peers[p] | ||
if connected { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we'll connect to PEX peers but we'll wait until the next heartbeat to rebalance the mesh. That's why we can safely skip topic members that we're already connected to, because we'll anyway consider them in the next heartbeat (as long as we remain connected to them). I think that's correct. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup, do you want a comment here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally ;-) |
||
continue | ||
} | ||
|
||
var spr *record.Envelope | ||
if pi.SignedPeerRecord != nil { | ||
// the peer sent us a signed record; ensure that it is valid | ||
envelope, r, err := record.ConsumeEnvelope(pi.SignedPeerRecord, peer.PeerRecordEnvelopeDomain) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We recently added https://godoc.org/github.com/libp2p/go-libp2p-core/record#ConsumeTypedEnvelope, which is much nicer on the allocator (can re-use a single |
||
if err != nil { | ||
log.Warnf("error unmarshalling peer record obtained through px: %s", err) | ||
continue | ||
} | ||
rec, ok := r.(*peer.PeerRecord) | ||
if !ok { | ||
log.Warnf("bogus peer record obtained through px: envelope payload is not PeerRecord") | ||
continue | ||
} | ||
if rec.PeerID != p { | ||
log.Warnf("bogus peer record obtained through px: peer ID %s doesn't match expected peer %s", rec.PeerID, p) | ||
continue | ||
} | ||
spr = envelope | ||
} | ||
|
||
toconnect = append(toconnect, connectInfo{p, spr}) | ||
} | ||
|
||
if len(toconnect) == 0 { | ||
return | ||
} | ||
|
||
for _, ci := range toconnect { | ||
select { | ||
case gs.connect <- ci: | ||
default: | ||
log.Debugf("ignoring peer connection attempt; too many pending connections") | ||
break | ||
} | ||
} | ||
} | ||
|
||
func (gs *GossipSubRouter) connector() { | ||
for { | ||
select { | ||
case ci := <-gs.connect: | ||
if gs.p.host.Network().Connectedness(ci.p) == network.Connected { | ||
continue | ||
} | ||
|
||
log.Debugf("connecting to %s", ci.p) | ||
cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if it is all that expensive, it's just a cast isn't it? |
||
if ok && ci.spr != nil { | ||
_, err := cab.ConsumePeerRecord(ci.spr, peerstore.TempAddrTTL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: we had to roll back the change in the peerstore that clears all addresses, and blocks subsequent uncertified address additions, once a peer record is seen. It caused downstream regressions (IPFS, and others). For now we're just accumulating all addresses. As a definitive solution, I'm enhancing the peerstore with "address labels" such that certified addresses will be tagged as such, and in the With this solution, the peerstore tracks all addresses, and it's up to the application to filter which ones it wants to actually use. |
||
if err != nil { | ||
log.Debugf("error processing peer record: %s", err) | ||
} | ||
} | ||
|
||
ctx, cancel := context.WithTimeout(gs.p.ctx, GossipSubConnectionTimeout) | ||
err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p}) | ||
cancel() | ||
if err != nil { | ||
log.Debugf("error connecting to %s: %s", ci.p, err) | ||
} | ||
|
||
case <-gs.p.ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
@@ -360,7 +488,7 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { | |
} | ||
|
||
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { | ||
prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}} | ||
prune := []*pb.ControlPrune{gs.makePrune(p, topic)} | ||
out := rpcWithControl(nil, nil, nil, nil, prune) | ||
gs.sendRPC(p, out) | ||
} | ||
|
@@ -443,16 +571,21 @@ func (gs *GossipSubRouter) heartbeat() { | |
tograft := make(map[peer.ID][]string) | ||
toprune := make(map[peer.ID][]string) | ||
|
||
// clean up expired backoffs | ||
gs.clearBackoff() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is probably fine, but we should monitor this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is, walking through each of these every heartbeat should be fine, but could get expensive in some edge cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could do it every few heartbeats instead of every heartbeat. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Every heartbeat is probably fine, I'm just calling it out so we keep it in mind. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A way more efficient manner would be to use a ring buffer of peer slices, with |
||
|
||
// maintain the mesh for topics we have joined | ||
for topic, peers := range gs.mesh { | ||
|
||
// do we have enough peers? | ||
if len(peers) < GossipSubDlo { | ||
backoff := gs.backoff[topic] | ||
ineed := GossipSubD - len(peers) | ||
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { | ||
// filter our current peers | ||
_, ok := peers[p] | ||
return !ok | ||
// filter our current peers and peers we are backing off | ||
_, inMesh := peers[p] | ||
_, doBackoff := backoff[p] | ||
return !inMesh && !doBackoff | ||
}) | ||
|
||
for _, p := range plst { | ||
|
@@ -531,6 +664,20 @@ func (gs *GossipSubRouter) heartbeat() { | |
gs.mcache.Shift() | ||
} | ||
|
||
func (gs *GossipSubRouter) clearBackoff() { | ||
now := time.Now() | ||
for topic, backoff := range gs.backoff { | ||
for p, expire := range backoff { | ||
if expire.Before(now) { | ||
delete(backoff, p) | ||
} | ||
} | ||
if len(backoff) == 0 { | ||
delete(gs.backoff, topic) | ||
} | ||
} | ||
} | ||
|
||
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) { | ||
for p, topics := range tograft { | ||
graft := make([]*pb.ControlGraft, 0, len(topics)) | ||
|
@@ -544,7 +691,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) | |
delete(toprune, p) | ||
prune = make([]*pb.ControlPrune, 0, len(pruning)) | ||
for _, topic := range pruning { | ||
prune = append(prune, &pb.ControlPrune{TopicID: &topic}) | ||
prune = append(prune, gs.makePrune(p, topic)) | ||
} | ||
} | ||
|
||
|
@@ -555,7 +702,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) | |
for p, topics := range toprune { | ||
prune := make([]*pb.ControlPrune, 0, len(topics)) | ||
for _, topic := range topics { | ||
prune = append(prune, &pb.ControlPrune{TopicID: &topic}) | ||
prune = append(prune, gs.makePrune(p, topic)) | ||
} | ||
|
||
out := rpcWithControl(nil, nil, nil, nil, prune) | ||
|
@@ -673,6 +820,40 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control | |
} | ||
} | ||
|
||
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string) *pb.ControlPrune { | ||
if gs.peers[p] == GossipSubID_v10 { | ||
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway | ||
return &pb.ControlPrune{TopicID: &topic} | ||
} | ||
|
||
// select peers for Peer eXchange | ||
peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool { | ||
return p != xp | ||
}) | ||
|
||
cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore()) | ||
px := make([]*pb.PeerInfo, 0, len(peers)) | ||
for _, p := range peers { | ||
// see if we have a signed peer record to send back; if we don't, just send | ||
// the peer ID and let the pruned peer find them in the DHT -- we can't trust | ||
// unsigned address records through px anyway. | ||
var recordBytes []byte | ||
if ok { | ||
spr := cab.GetPeerRecord(p) | ||
var err error | ||
if spr != nil { | ||
recordBytes, err = spr.Marshal() | ||
if err != nil { | ||
log.Warnf("error marshaling signed peer record for %s: %s", p, err) | ||
} | ||
} | ||
} | ||
px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes}) | ||
} | ||
|
||
return &pb.ControlPrune{TopicID: &topic, Peers: px} | ||
} | ||
|
||
func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID { | ||
tmap, ok := gs.p.topics[topic] | ||
if !ok { | ||
|
@@ -681,7 +862,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID | |
|
||
peers := make([]peer.ID, 0, len(tmap)) | ||
for p := range tmap { | ||
if gs.peers[p] == GossipSubID && filter(p) { | ||
if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) { | ||
peers = append(peers, p) | ||
} | ||
} | ||
|
@@ -731,3 +912,10 @@ func shufflePeers(peers []peer.ID) { | |
peers[i], peers[j] = peers[j], peers[i] | ||
} | ||
} | ||
|
||
func shufflePeerInfo(peers []*pb.PeerInfo) { | ||
for i := range peers { | ||
j := rand.Intn(i + 1) | ||
peers[i], peers[j] = peers[j], peers[i] | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this too much? 16 conn attempts at once, i.e. we'll try to connect to all peers recommended by the pruning one (as this value is equal to
GossipSubPrunePeers
.I'd prefer if connect was a buffered channel, and we'd have less concurrent goroutines. Right now it's a bit hit or miss, e.g. if we got pruned from two topics at once, the connector goroutines will be saturated with the first batch, and all peers from the second batch will be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a buffered channel! But yes, we can certainly lower from 16, how about 8 or 4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Max inflight dials FD limit is 160 by default. Assuming each peer has an average of 3 addresses, 16 connectors could make use 30% of our FD allowance. I'd scale this down to 8 by default, but I'd add an option so the app can increase/decrease it (it may also touch the swarm limits!).
Also, we will need some fairness heuristic here. If we're pruned from two or three topics at once, the first topic will get all slots, and the other two will be queued. Instead, we might want to balance peers from topics 1, 2, and 3 for quicker healing. Some form of priority queue could work well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the hardening branch reduces this to 8.