Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added subscription specific configurations to PubSub #171

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
}
}

func (fs *FloodSubRouter) Join(topic string) {}
func (fs *FloodSubRouter) Join(topic string, proto protocol.ID) {}

func (fs *FloodSubRouter) Leave(topic string) {}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/libp2p/go-libp2p-net v0.0.1
github.com/libp2p/go-libp2p-peer v0.0.1
github.com/libp2p/go-libp2p-protocol v0.0.1
github.com/libp2p/go-libp2p-record v0.0.1
github.com/libp2p/go-libp2p-swarm v0.0.1
github.com/multiformats/go-multiaddr v0.0.1
github.com/multiformats/go-multistream v0.0.1
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46U
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2 h1:vhC1OXXiT9R2pczegwz6moDvuRpggaroAXhPIseh57A=
Expand Down Expand Up @@ -91,6 +93,8 @@ github.com/libp2p/go-libp2p-peerstore v0.0.1 h1:twKovq8YK5trLrd3nB7PD2Zu9JcyAIdm
github.com/libp2p/go-libp2p-peerstore v0.0.1/go.mod h1:RabLyPVJLuNQ+GFyoEkfi8H4Ti6k/HtZJ7YKgtSq+20=
github.com/libp2p/go-libp2p-protocol v0.0.1 h1:+zkEmZ2yFDi5adpVE3t9dqh/N9TbpFWywowzeEzBbLM=
github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s=
github.com/libp2p/go-libp2p-record v0.0.1 h1:zN7AS3X46qmwsw5JLxdDuI43cH5UYwovKxHPjKBYQxw=
github.com/libp2p/go-libp2p-record v0.0.1/go.mod h1:grzqg263Rug/sRex85QrDOLntdFAymLDLm7lxMgU79Q=
github.com/libp2p/go-libp2p-secio v0.0.1 h1:CqE/RdsizOwItdgLe632iyft/w0tshDLmZGAiKDcUAI=
github.com/libp2p/go-libp2p-secio v0.0.1/go.mod h1:IdG6iQybdcYmbTzxp4J5dwtUEDTOvZrT0opIDVNPrJs=
github.com/libp2p/go-libp2p-swarm v0.0.1 h1:Vne+hjaDwXqzgNwQ2vb2YKbnbOTyXjtS47stT66Apc4=
Expand Down Expand Up @@ -126,6 +130,7 @@ github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XC
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
Expand Down
83 changes: 83 additions & 0 deletions gossipconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package pubsub

import (
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

type MessageCacher interface {
MessageCacheReader
Put(msg *pb.Message)
}

type GetFilteredPeers func(count int, filter func(peer.ID) bool) []peer.ID
type EmittingStrategy interface {
GetEmitPeers(topicPeers GetFilteredPeers, meshPeers map[peer.ID]struct{}) map[peer.ID]struct{}
}

type noMeshPeersStrategy struct {
numPeers int
}

func NewNoMeshPeersStrategy(numPeers int) EmittingStrategy {
return &noMeshPeersStrategy{numPeers: numPeers}
}

func (s *noMeshPeersStrategy) GetEmitPeers(topicPeers GetFilteredPeers, meshPeers map[peer.ID]struct{}) map[peer.ID]struct{} {
gpeers := topicPeers(s.numPeers, func(peer.ID) bool { return true })
emitPeers := make(map[peer.ID]struct{})
for _, p := range gpeers {
// skip mesh peers
_, ok := meshPeers[p]
if !ok {
emitPeers[p] = struct{}{}
}
}
return emitPeers
}

type ClassicGossipSubStrategy struct {
mcache MessageCacher
emitter EmittingStrategy
supportedProtocols []protocol.ID
protocol protocol.ID
}

func (gs *ClassicGossipSubStrategy) GetCacher() MessageCacheReader {
return gs.mcache
}

func (gs *ClassicGossipSubStrategy) SupportedProtocols() []protocol.ID {
return gs.supportedProtocols
}
func (gs *ClassicGossipSubStrategy) Protocol() protocol.ID {
return gs.protocol
}

func (gs *ClassicGossipSubStrategy) Publish(rt *GossipSubRouter, from peer.ID, msg *pb.Message) {
gs.mcache.Put(msg)

tosend := make(map[peer.ID]struct{})
rt.AddGossipPeers(tosend, msg.GetTopicIDs(), true)

_, ok := tosend[from]
if ok {
delete(tosend, from)
}

calculatedFrom := peer.ID(msg.GetFrom())
_, ok = tosend[calculatedFrom]
if ok {
delete(tosend, calculatedFrom)
}

rt.PropagateMSG(tosend, msg)
}

func (gs *ClassicGossipSubStrategy) GetEmitPeers(topicPeers GetFilteredPeers, meshPeers map[peer.ID]struct{}) map[peer.ID]struct{} {
return gs.emitter.GetEmitPeers(topicPeers, meshPeers)
}

func (gs *ClassicGossipSubStrategy) OnGraft(rt *GossipSubRouter, topic string, peer peer.ID) {}
93 changes: 93 additions & 0 deletions gossiplww.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package pubsub

import (
"context"
record "github.com/libp2p/go-libp2p-record"

host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

type LWWMessageCache struct {
validator record.Validator
IDMsgMap map[string]*pb.Message
}

func NewLWWMessageCache(validator record.Validator) *LWWMessageCache {
return &LWWMessageCache{
validator: validator,
IDMsgMap: make(map[string]*pb.Message),
}
}

func (mc *LWWMessageCache) Put(msg *pb.Message) {
for _, topic := range msg.TopicIDs {
lastMsg, ok := mc.IDMsgMap[topic]
if !ok {
mc.IDMsgMap[topic] = msg
continue
}

msgBytes := [][]byte{msg.Data, lastMsg.Data}
winningIndex, err := mc.validator.Select("", msgBytes)
if err != nil {
panic(err)
}

if winningIndex == 0 {
mc.IDMsgMap[topic] = msg
}
}
}

func (mc *LWWMessageCache) Get(mid string) (*pb.Message, bool) {
m, ok := mc.IDMsgMap[mid]
return m, ok
}

func (mc *LWWMessageCache) GetGossipIDs(topic string) []string {
_, ok := mc.IDMsgMap[topic]
if ok {
return []string{topic}
}
return []string{}
}

func (mc *LWWMessageCache) Shift() {}

var _ MessageCacheReader = (*LWWMessageCache)(nil)

type randomPeersStrategy struct {
numPeers int
}

func NewRandomPeersStrategy(numPeers int) EmittingStrategy {
return &randomPeersStrategy{numPeers: numPeers}
}

func (s *randomPeersStrategy) GetEmitPeers(topicPeers GetFilteredPeers, _ map[peer.ID]struct{}) map[peer.ID]struct{} {
gpeers := topicPeers(s.numPeers, func(peer.ID) bool { return true })
return peerListToMap(gpeers)
}

type LWWGossipSubConfig struct {
ClassicGossipSubStrategy
}

func (cfg *LWWGossipSubConfig) OnGraft(rt *GossipSubRouter, topic string, peer peer.ID) {
rt.RequestMessage(topic, peer)
}

// NewGossipBaseSub returns a new PubSub object using GossipSubRouter as the router.
func NewGossipSyncLWW(ctx context.Context, h host.Host, mcache *LWWMessageCache, protocolID protocol.ID, opts ...Option) (*PubSub, error) {
rt := NewGossipSubRouterWithStrategies(&LWWGossipSubConfig{ClassicGossipSubStrategy{
mcache: mcache,
emitter: NewRandomPeersStrategy(GossipSubD),
supportedProtocols: []protocol.ID{protocolID},
protocol: protocolID,
}})
return NewPubSub(ctx, h, rt, opts...)
}
Loading