Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to handle newly subscribed peers #190

Merged
merged 12 commits into from
Aug 7, 2019
Merged
147 changes: 147 additions & 0 deletions floodsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,3 +1063,150 @@ func TestImproperlySignedMessageRejected(t *testing.T) {
)
}
}

func TestSubscriptionJoinNotification(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numLateSubscribers = 10
const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Have some peers subscribe earlier than other peers.
// This exercises whether we get subscription notifications from
// existing peers.
for i, ps := range psubs[numLateSubscribers:] {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

// Have the rest subscribe
for i, ps := range psubs[:numLateSubscribers] {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i+numLateSubscribers] = subch
}

wg := sync.WaitGroup{}
for i := 0; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-1 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PEER_JOIN {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound {
if len(peersFound) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
}
}

func TestSubscriptionLeaveNotification(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Subscribe all peers and wait until they've all been found
for i, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

wg := sync.WaitGroup{}
for i := 0; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-1 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PEER_JOIN {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound {
if len(peersFound) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
}

// Test removing peers and verifying that they cause events
msgs[1].Cancel()
hosts[2].Close()
psubs[0].BlacklistPeer(hosts[3].ID())

leavingPeers := make(map[peer.ID]struct{})
for len(leavingPeers) < 3 {
event, err := msgs[0].NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PEER_LEAVE {
leavingPeers[event.Peer] = struct{}{}
}
}

if _, ok := leavingPeers[hosts[1].ID()]; !ok {
t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event"))
}
if _, ok := leavingPeers[hosts[2].ID()]; !ok {
t.Fatal(fmt.Errorf("closing host did not cause a leave event"))
}
if _, ok := leavingPeers[hosts[3].ID()]; !ok {
t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event"))
}
}
60 changes: 53 additions & 7 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,11 @@ func (p *PubSub) processLoop(ctx context.Context) {
}

delete(p.peers, pid)
for _, t := range p.topics {
delete(t, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}

p.rt.RemovePeer(pid)
Expand Down Expand Up @@ -392,8 +395,11 @@ func (p *PubSub) processLoop(ctx context.Context) {
if ok {
close(ch)
delete(p.peers, pid)
for _, t := range p.topics {
delete(t, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}
p.rt.RemovePeer(pid)
}
Expand All @@ -417,7 +423,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) {
}

sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()")
close(sub.ch)
sub.close()
delete(subs, sub)

if len(subs) == 0 {
Expand Down Expand Up @@ -447,9 +453,21 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
subs = p.myTopics[sub.topic]
}

tmap := p.topics[sub.topic]
inboundBufSize := len(tmap)
if inboundBufSize < 32 {
inboundBufSize = 32
}

sub.ch = make(chan *Message, 32)
sub.joinCh = make(chan peer.ID, inboundBufSize)
sub.leaveCh = make(chan peer.ID, 32)
sub.cancelCh = p.cancelCh

for pid := range tmap {
sub.joinCh <- pid
}

p.myTopics[sub.topic][sub] = struct{}{}

req.resp <- sub
Expand Down Expand Up @@ -560,6 +578,18 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
return false
}

func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
if subs, ok := p.myTopics[topic]; ok {
for s := range subs {
select {
case s.leaveCh <- pid:
default:
log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic)
}
}
}
}

func (p *PubSub) handleIncomingRPC(rpc *RPC) {
for _, subopt := range rpc.GetSubscriptions() {
t := subopt.GetTopicid()
Expand All @@ -570,13 +600,29 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
p.topics[t] = tmap
}

tmap[rpc.from] = struct{}{}
if _, ok = tmap[rpc.from]; !ok {
tmap[rpc.from] = struct{}{}
if subs, ok := p.myTopics[t]; ok {
peer := rpc.from
for s := range subs {
select {
case s.joinCh <- peer:
default:
log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t)
}
}
}
}
} else {
tmap, ok := p.topics[t]
if !ok {
continue
}
delete(tmap, rpc.from)

if _, ok := tmap[rpc.from]; ok {
delete(tmap, rpc.from)
p.notifyLeave(t, rpc.from)
}
}
}

Expand Down
47 changes: 47 additions & 0 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,36 @@ package pubsub

import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
)

type EventType int8

const (
UNKNOWN EventType = iota
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
PEER_JOIN
PEER_LEAVE
)

type Subscription struct {
topic string
ch chan *Message
cancelCh chan<- *Subscription
joinCh chan peer.ID
leaveCh chan peer.ID
err error
}

type PeerEvent struct {
Type EventType
Peer peer.ID
}

func (sub *Subscription) Topic() string {
return sub.topic
}

// Next returns the next message in our subscription
func (sub *Subscription) Next(ctx context.Context) (*Message, error) {
select {
case msg, ok := <-sub.ch:
Expand All @@ -31,3 +48,33 @@ func (sub *Subscription) Next(ctx context.Context) (*Message, error) {
func (sub *Subscription) Cancel() {
sub.cancelCh <- sub
}

func (sub *Subscription) close() {
close(sub.ch)
close(sub.joinCh)
close(sub.leaveCh)
}

// NextPeerEvent returns the next event regarding subscribed peers
// Note: There is no guarantee that the Peer Join event will fire before
// the related Peer Leave event for a given peer
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) {
select {
case newPeer, ok := <-sub.joinCh:
event := PeerEvent{Type: PEER_JOIN, Peer: newPeer}
if !ok {
return event, sub.err
}

return event, nil
case leavingPeer, ok := <-sub.leaveCh:
event := PeerEvent{Type: PEER_LEAVE, Peer: leavingPeer}
if !ok {
return event, sub.err
}

return event, nil
case <-ctx.Done():
return PeerEvent{}, ctx.Err()
}
}