Skip to content

Commit

Permalink
Configurable Peer Filtering (libp2p#471)
Browse files Browse the repository at this point in the history
Allows specifying peer filter functions on query and on adding peers to the routing table. This patch also includes some reasonable default functions for a public-only and private-only DHT.

Co-authored-by: Will Scott <[email protected]>
Co-authored-by: Steven Allen <[email protected]>
  • Loading branch information
3 people committed Apr 3, 2020
1 parent fa52ddd commit 0543571
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 21 deletions.
35 changes: 19 additions & 16 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type IpfsDHT struct {
alpha int // The concurrency parameter per path
beta int // The number of peers closest to a target that must have responded for a query path to terminate

queryPeerFilter QueryFilterFunc
routingTablePeerFilter RouteTableFilterFunc

autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
Expand Down Expand Up @@ -198,7 +201,6 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
}

func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {

protocols := []protocol.ID{cfg.protocolPrefix + kad2}
serverProtocols := []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}

Expand All @@ -213,19 +215,21 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}

dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
}

// construct routing table
Expand Down Expand Up @@ -265,7 +269,7 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
return false
}

return b
return b && cfg.routingTable.peerFilter(dht, dht.Host().Network().ConnsToPeer(p))
}

rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
Expand Down Expand Up @@ -418,7 +422,6 @@ func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerDisconnected", p)
dht.routingTable.HandlePeerDisconnect(p)

}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Expand Down
139 changes: 139 additions & 0 deletions dht_filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package dht

import (
"bytes"
"net"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
netroute "github.com/libp2p/go-netroute"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

// QueryFilterFunc is a filter applied when considering peers to dial when querying
type QueryFilterFunc func(dht *IpfsDHT, ai peer.AddrInfo) bool

// RouteTableFilterFunc is a filter applied when considering connections to keep in
// the local route table.
type RouteTableFilterFunc func(dht *IpfsDHT, conns []network.Conn) bool

// PublicQueryFilter returns true if the peer is suspected of being publicly accessible
func PublicQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool {
if len(ai.Addrs) == 0 {
return false
}

var hasPublicAddr bool
for _, a := range ai.Addrs {
if !isRelayAddr(a) && manet.IsPublicAddr(a) {
hasPublicAddr = true
}
}
return hasPublicAddr
}

var _ QueryFilterFunc = PublicQueryFilter

// PublicRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a public network
func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
if len(conns) == 0 {
return false
}

// Do we have a public address for this peer?
id := conns[0].RemotePeer()
known := dht.peerstore.PeerInfo(id)
for _, a := range known.Addrs {
if !isRelayAddr(a) && manet.IsPublicAddr(a) {
return true
}
}

return false
}

var _ RouteTableFilterFunc = PublicRoutingTableFilter

// PrivateQueryFilter doens't currently restrict which peers we are willing to query from the local DHT.
func PrivateQueryFilter(dht *IpfsDHT, ai peer.AddrInfo) bool {
return len(ai.Addrs) > 0
}

var _ QueryFilterFunc = PrivateQueryFilter

// PrivateRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a private network
func PrivateRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
router, _ := netroute.New()
myAdvertisedIPs := make([]net.IP, 0)
for _, a := range dht.Host().Addrs() {
if manet.IsPublicAddr(a) && !isRelayAddr(a) {
ip, err := manet.ToIP(a)
if err != nil {
continue
}
myAdvertisedIPs = append(myAdvertisedIPs, ip)
}
}

for _, c := range conns {
ra := c.RemoteMultiaddr()
if manet.IsPrivateAddr(ra) && !isRelayAddr(ra) {
return true
}

if manet.IsPublicAddr(ra) {
ip, err := manet.ToIP(ra)
if err != nil {
continue
}

// if the ip is the same as one of the local host's public advertised IPs - then consider it local
for _, i := range myAdvertisedIPs {
if i.Equal(ip) {
return true
}
if ip.To4() == nil {
if i.To4() == nil && isEUI(ip) && sameV6Net(i, ip) {
return true
}
}
}

// if there's no gateway - a direct host in the OS routing table - then consider it local
// This is relevant in particular to ipv6 networks where the addresses may all be public,
// but the nodes are aware of direct links between each other.
if router != nil {
_, gw, _, err := router.Route(ip)
if gw == nil && err == nil {
return true
}
}
}
}

return false
}

var _ RouteTableFilterFunc = PrivateRoutingTableFilter

func isEUI(ip net.IP) bool {
// per rfc 2373
return ip[11] == 0xff && ip[12] == 0xfe
}

func sameV6Net(a, b net.IP) bool {
return bytes.Equal(a[0:8], b[0:8])
}

func isRelayAddr(a ma.Multiaddr) bool {
for _, p := range a.Protocols() {
if p.Code == ma.P_CIRCUIT {
return true
}
}
return false
}
23 changes: 23 additions & 0 deletions dht_filters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dht

import (
"testing"

"github.com/multiformats/go-multiaddr"
)

func TestIsRelay(t *testing.T) {
a, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/5002/p2p/QmdPU7PfRyKehdrP5A3WqmjyD6bhVpU1mLGKppa2FjGDjZ/p2p-circuit/p2p/QmVT6GYwjeeAF5TR485Yc58S3xRF5EFsZ5YAF4VcP3URHt")
if !isRelayAddr(a) {
t.Fatalf("thought %s was not a relay", a)
}
a, _ = multiaddr.NewMultiaddr("/p2p-circuit/p2p/QmVT6GYwjeeAF5TR485Yc58S3xRF5EFsZ5YAF4VcP3URHt")
if !isRelayAddr(a) {
t.Fatalf("thought %s was not a relay", a)
}
a, _ = multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/5002/p2p/QmdPU7PfRyKehdrP5A3WqmjyD6bhVpU1mLGKppa2FjGDjZ")
if isRelayAddr(a) {
t.Fatalf("thought %s was a relay", a)
}

}
28 changes: 27 additions & 1 deletion dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (

ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-record"
record "github.com/libp2p/go-libp2p-record"
)

// ModeOpt describes what mode the dht should operate in
Expand Down Expand Up @@ -37,13 +39,15 @@ type config struct {
maxRecordAge time.Duration
enableProviders bool
enableValues bool
queryPeerFilter QueryFilterFunc

routingTable struct {
refreshQueryTimeout time.Duration
refreshPeriod time.Duration
autoRefresh bool
latencyTolerance time.Duration
checkInterval time.Duration
peerFilter RouteTableFilterFunc
}

// internal parameters, not publicly exposed
Expand All @@ -53,6 +57,9 @@ type config struct {
testProtocols []protocol.ID
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
func emptyRTFilter(_ *IpfsDHT, conns []network.Conn) bool { return true }

// apply applies the given options to this Option
func (c *config) apply(opts ...Option) error {
for i, opt := range opts {
Expand All @@ -78,11 +85,13 @@ var defaults = func(o *config) error {
o.protocolPrefix = DefaultPrefix
o.enableProviders = true
o.enableValues = true
o.queryPeerFilter = emptyQueryFilter

o.routingTable.latencyTolerance = time.Minute
o.routingTable.refreshQueryTimeout = 30 * time.Second
o.routingTable.refreshPeriod = 10 * time.Minute
o.routingTable.autoRefresh = true
o.routingTable.peerFilter = emptyRTFilter
o.maxRecordAge = time.Hour * 36

o.bucketSize = defaultBucketSize
Expand Down Expand Up @@ -292,6 +301,23 @@ func DisableValues() Option {
}
}

// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
return func(c *config) error {
c.queryPeerFilter = filter
return nil
}
}

// RoutingTableFilter sets a function that approves which peers may be added to the routing table. The host should
// already have at least one connection to the peer under consideration.
func RoutingTableFilter(filter RouteTableFilterFunc) Option {
return func(c *config) error {
c.routingTable.peerFilter = filter
return nil
}
}

// customProtocols is only to be used for testing. It sets the protocols that the DHT listens on and queries with to be
// the ones passed in. The custom protocols are still augmented by the Prefix.
func customProtocols(protos ...protocol.ID) Option {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ require (
github.com/libp2p/go-libp2p-swarm v0.2.2
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/libp2p/go-msgio v0.0.4
github.com/libp2p/go-netroute v0.1.0
github.com/mr-tron/base58 v1.1.3
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multiaddr-net v0.1.3
github.com/multiformats/go-multihash v0.0.13
github.com/multiformats/go-multistream v0.1.1
github.com/stretchr/testify v1.5.1
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/gopacket v1.1.17 h1:rMrlX2ZY2UbvT+sdz3+6J+pp2z+msCq9MxTU6ymxbBY=
github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
Expand Down Expand Up @@ -268,6 +270,8 @@ github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-nat v0.0.4 h1:KbizNnq8YIf7+Hn7+VFL/xE0eDrkPru2zIO9NMwL8UQ=
github.com/libp2p/go-nat v0.0.4/go.mod h1:Nmw50VAvKuk38jUBcmNh6p9lUJLoODbJRvYAa/+KSDo=
github.com/libp2p/go-netroute v0.1.0 h1:29isad7URkbMp3GMIVrxCPq3yo6UKJyYCLDFCa2+cTk=
github.com/libp2p/go-netroute v0.1.0/go.mod h1:SuYDERn9+Jpg1hdHgY2ucfmuN4bMaRgRCSemKOJ9u60=
github.com/libp2p/go-openssl v0.0.2 h1:9pP2d3Ubaxkv7ZisLjx9BFwgOGnQdQYnfcH29HNY3ls=
github.com/libp2p/go-openssl v0.0.2/go.mod h1:v8Zw2ijCSWBQi8Pq5GAixw6DbFfa9u6VIYDXnvOXkc0=
github.com/libp2p/go-openssl v0.0.3 h1:wjlG7HvQkt4Fq4cfH33Ivpwp0omaElYEi9z26qaIkIk=
Expand All @@ -278,6 +282,8 @@ github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FW
github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQzaaxIFYDhcYEA=
github.com/libp2p/go-reuseport-transport v0.0.2 h1:WglMwyXyBu61CMkjCCtnmqNqnjib0GIEjMiHTwR/KN4=
github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA20obEtsW9BLkuOUyQAOCbs=
github.com/libp2p/go-sockaddr v0.0.1 h1:yM3fy0n5oommws7FmCeOh+IoyEFpmlO77+2eXiaWIko=
github.com/libp2p/go-sockaddr v0.0.1/go.mod h1:EBeYKMYs5LFgoaBSN2nA2jPQVmnA4gv7WkY64CBqYqQ=
github.com/libp2p/go-stream-muxer v0.0.1 h1:Ce6e2Pyu+b5MC1k3eeFtAax0pW4gc6MosYSLV05UeLw=
github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14=
github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg=
Expand Down Expand Up @@ -520,6 +526,7 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpbl
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e h1:ZytStCyV048ZqDsWHiYDdoI2Vd4msMcrDECFxS+tL9c=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190405154228-4b34438f7a67/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
4 changes: 2 additions & 2 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"time"

ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-record"
dht "github.com/libp2p/go-libp2p-kad-dht"
record "github.com/libp2p/go-libp2p-record"
)

type Option = dht.Option
Expand Down
10 changes: 8 additions & 2 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,15 @@ func (q *query) queryPeer(ch chan<- *queryUpdate, p peer.ID) {
continue
}

// add any other know addresses for the candidate peer.
curInfo := q.dht.peerstore.PeerInfo(next.ID)
next.Addrs = append(next.Addrs, curInfo.Addrs...)

// add their addresses to the dialer's peerstore
q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
saw = append(saw, next.ID)
if q.dht.queryPeerFilter(q.dht, *next) {
q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
saw = append(saw, next.ID)
}
}

ch <- &queryUpdate{seen: saw, queried: []peer.ID{p}}
Expand Down

0 comments on commit 0543571

Please sign in to comment.