Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create peer filter option #451

Merged
merged 1 commit into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions floodsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ func assertReceive(t *testing.T, ch *Subscription, exp []byte) {
}
}

func assertNeverReceives(t *testing.T, ch *Subscription, timeout time.Duration) {
select {
case msg := <-ch.ch:
t.Logf("%#v\n", ch)
t.Fatal("got unexpected message: ", string(msg.GetData()))
case <-time.After(timeout):
}
}

func TestBasicFloodsub(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
15 changes: 14 additions & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,10 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
continue
}

if !gs.p.peerFilter(p, topic) {
continue
}

for _, mid := range ihave.GetMessageIDs() {
if gs.p.seenMessage(mid) {
continue
Expand Down Expand Up @@ -692,6 +696,10 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
continue
}

if !gs.p.peerFilter(p, msg.GetTopic()) {
continue
}

if count > gs.params.GossipRetransmission {
log.Debugf("IWANT: Peer %s has asked for message %s too many times; ignoring request", p, mid)
continue
Expand Down Expand Up @@ -724,6 +732,11 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.

for _, graft := range ctl.GetGraft() {
topic := graft.GetTopicID()

if !gs.p.peerFilter(p, topic) {
continue
}

peers, ok := gs.mesh[topic]
if !ok {
// don't do PX when there is an unknown topic to avoid leaking our peers
Expand Down Expand Up @@ -1857,7 +1870,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID

peers := make([]peer.ID, 0, len(tmap))
for p := range tmap {
if gs.feature(GossipSubFeatureMesh, gs.peers[p]) && filter(p) {
if gs.feature(GossipSubFeatureMesh, gs.peers[p]) && filter(p) && gs.p.peerFilter(p, topic) {
peers = append(peers, p)
}
}
Expand Down
41 changes: 41 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,47 @@ func TestGossipsubDirectPeers(t *testing.T) {
}
}

func TestGossipSubPeerFilter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h := getNetHosts(t, ctx, 3)
psubs := []*PubSub{
getGossipsub(ctx, h[0], WithPeerFilter(func(pid peer.ID, topic string) bool {
return pid == h[1].ID()
})),
getGossipsub(ctx, h[1], WithPeerFilter(func(pid peer.ID, topic string) bool {
return pid == h[0].ID()
})),
getGossipsub(ctx, h[2]),
}

connect(t, h[0], h[1])
connect(t, h[0], h[2])

// Join all peers
var subs []*Subscription
for _, ps := range psubs {
sub, err := ps.Subscribe("test")
if err != nil {
t.Fatal(err)
}
subs = append(subs, sub)
}

time.Sleep(time.Second)

msg := []byte("message")

psubs[0].Publish("test", msg)
assertReceive(t, subs[1], msg)
assertNeverReceives(t, subs[2], time.Second)

psubs[1].Publish("test", msg)
assertReceive(t, subs[0], msg)
assertNeverReceives(t, subs[2], time.Second)
}

func TestGossipsubDirectPeersFanout(t *testing.T) {
// regression test for #371
ctx, cancel := context.WithCancel(context.Background())
Expand Down
27 changes: 27 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type PubSub struct {

tracer *pubsubTracer

peerFilter PeerFilter

// maxMessageSize is the maximum message size; it applies globally to all
// topics.
maxMessageSize int
Expand Down Expand Up @@ -235,6 +237,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
ctx: ctx,
rt: rt,
val: newValidation(),
peerFilter: DefaultPeerFilter,
disc: &discover{},
maxMessageSize: DefaultMaxMessageSize,
peerOutboundQueueSize: 32,
Expand Down Expand Up @@ -332,6 +335,21 @@ func WithMessageIdFn(fn MsgIdFunction) Option {
}
}

// PeerFilter is used to filter pubsub peers. It should return true for peers that are accepted for
// a given topic. PubSub can be customized to use any implementation of this function by configuring
// it with the Option from WithPeerFilter.
type PeerFilter func(pid peer.ID, topic string) bool

// WithPeerFilter is an option to set a filter for pubsub peers.
// The default peer filter is DefaultPeerFilter (which always returns true), but it can be customized
// to any custom implementation.
func WithPeerFilter(filter PeerFilter) Option {
return func(p *PubSub) error {
p.peerFilter = filter
return nil
}
}

// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
// We start dropping messages to a peer if the outbound queue if full
func WithPeerOutboundQueueSize(size int) Option {
Expand Down Expand Up @@ -983,6 +1001,10 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
for _, subopt := range subs {
t := subopt.GetTopicid()

if !p.peerFilter(rpc.from, t) {
continue
}

if subopt.GetSubscribe() {
tmap, ok := p.topics[t]
if !ok {
Expand Down Expand Up @@ -1042,6 +1064,11 @@ func DefaultMsgIdFn(pmsg *pb.Message) string {
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
}

// DefaultPeerFilter accepts all peers on all topics
func DefaultPeerFilter(pid peer.ID, topic string) bool {
return true
}

// pushMsg pushes a message performing validation as necessary
func (p *PubSub) pushMsg(msg *Message) {
src := msg.ReceivedFrom
Expand Down