Skip to content

Commit

Permalink
filter observed addrs
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed May 14, 2020
1 parent b42ba0f commit 2b5fbaa
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 16 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/libp2p/go-ws-transport v0.3.1
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multiaddr-fmt v0.1.0
github.com/multiformats/go-multiaddr-net v0.1.4
github.com/multiformats/go-multistream v0.1.1
github.com/stretchr/testify v1.5.1
Expand Down
103 changes: 89 additions & 14 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package identify

import (
"context"
"sort"
"sync"
"time"

Expand All @@ -10,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr-net"
)

Expand All @@ -30,6 +32,24 @@ var GCInterval = 10 * time.Minute
// for adding to an ObservedAddrManager.
var observedAddrManagerWorkerChannelSize = 16

// maxObservedAddrsPerIPAndTransport is the maximum number of observed addresses
// we will return for each (IPx/TCP or UDP) group.
var maxObservedAddrsPerIPAndTransport = 2

var (
ip4 = mafmt.Base(ma.P_IP4)
ip6 = mafmt.Base(ma.P_IP6)
udp = mafmt.Base(ma.P_UDP)
tcp = mafmt.Base(ma.P_TCP)

addrPatterns = []mafmt.Pattern{
mafmt.And(ip4, udp),
mafmt.And(ip4, tcp),
mafmt.And(ip6, udp),
mafmt.And(ip6, tcp),
}
)

type observation struct {
seenTime time.Time
connDirection network.Direction
Expand All @@ -46,13 +66,36 @@ type ObservedAddr struct {
LastSeen time.Time
}

func (oa *ObservedAddr) activated(ttl time.Duration) bool {
func (oa *ObservedAddr) activated() bool {

// We only activate if other peers observed the same address
// of ours at least 4 times. SeenBy peers are removed by GC if
// they say the address more than ttl*ActivationThresh
return len(oa.SeenBy) >= ActivationThresh
}

func (oa *ObservedAddr) nInbound() int {
count := 0
for obs := range oa.SeenBy {
if oa.SeenBy[obs].connDirection == network.DirInbound {
count++
}
}

return count
}

func (oa *ObservedAddr) GroupKey() string {
for i := range addrPatterns {
pat := addrPatterns[i]
if pat.Matches(oa.Addr) {
return pat.String()
}
}

return ""
}

type newObservation struct {
conn network.Conn
observed ma.Multiaddr
Expand Down Expand Up @@ -111,33 +154,65 @@ func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiadd
return
}

now := time.Now()
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}

return addrs
return oas.filter(observedAddrs)
}

// Addrs return all activated observed addresses
func (oas *ObservedAddrManager) Addrs() (addrs []ma.Multiaddr) {
func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr {
oas.mu.RLock()
defer oas.mu.RUnlock()

if len(oas.addrs) == 0 {
return nil
}

var allObserved []*ObservedAddr
for k := range oas.addrs {
allObserved = append(allObserved, oas.addrs[k]...)
}
return oas.filter(allObserved)
}

func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multiaddr {
pmap := make(map[string][]*ObservedAddr, len(addrPatterns))
now := time.Now()
for _, observedAddrs := range oas.addrs {
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)

for i := range observedAddrs {
a := observedAddrs[i]
if now.Sub(a.LastSeen) <= oas.ttl && a.activated() {
// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
pat := a.GroupKey()
if len(pat) != 0 {
pmap[pat] = append(pmap[pat], a)
} else {
log.Debugw("unable to group observed addr into IPx/(TCP or UDP) patterm", "address",
a.Addr.String())
}
}
}

addrs := make([]ma.Multiaddr, 0, len(observedAddrs))
for pat := range pmap {
s := pmap[pat]

// We prefer inbound connection observations over outbound.
// For ties, we prefer the ones with more votes.
sort.Slice(s, func(i int, j int) bool {
first := s[i]
second := s[j]

if first.nInbound() > second.nInbound() {
return true
}

return len(first.SeenBy) > len(second.SeenBy)
})

for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ {
addrs = append(addrs, s[i].Addr)
}
}

return addrs
}

Expand Down
106 changes: 104 additions & 2 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
p2putil "github.com/libp2p/go-libp2p-netutil"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"

identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
"github.com/stretchr/testify/require"
)

type harness struct {
Expand Down Expand Up @@ -245,3 +247,103 @@ func TestAddAddrsProfile(t *testing.T) {

wg.Wait()
}

func TestObservedAddrFiltering(t *testing.T) {
addrsMarch := func(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}

for _, aa := range a {
found := false
for _, bb := range b {
if aa.Equal(bb) {
found = true
break
}
}
if !found {
return false
}
}
return true
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should be empty")
}
// Also listen on UDP so udp addresses are accepted
require.NoError(t, harness.host.Network().Listen(ma.StringCast("/ip4/127.0.0.1/udp/5555")))

// IP4/TCP
it1 := ma.StringCast("/ip4/1.2.3.4/tcp/1231")
it2 := ma.StringCast("/ip4/1.2.3.4/tcp/1232")
it3 := ma.StringCast("/ip4/1.2.3.4/tcp/1233")
it4 := ma.StringCast("/ip4/1.2.3.4/tcp/1234")
it5 := ma.StringCast("/ip4/1.2.3.4/tcp/1235")

// IP4/UDP
iu1 := ma.StringCast("/ip4/1.2.3.4/udp/1231")
iu2 := ma.StringCast("/ip4/1.2.3.4/udp/1232")
iu3 := ma.StringCast("/ip4/1.2.3.4/udp/1233")
iu4 := ma.StringCast("/ip4/1.2.3.4/udp/1234")
iu5 := ma.StringCast("/ip4/1.2.3.4/udp/1235")

// observers
b1 := ma.StringCast("/ip4/1.2.3.6/tcp/1236")
b2 := ma.StringCast("/ip4/1.2.3.7/tcp/1237")
b3 := ma.StringCast("/ip4/1.2.3.8/tcp/1237")
b4 := ma.StringCast("/ip4/1.2.3.9/tcp/1237")
b5 := ma.StringCast("/ip4/1.2.3.10/tcp/1237")

// it1 will be inbound with one vote.
// it3 will have max votes.
// iu2 will be inbound with one vote.
// iu4 will have max votes
peers := []peer.ID{harness.add(b1), harness.add(b2), harness.add(b3), harness.add(b4), harness.add(b5)}
for i := 0; i < 4; i++ {
harness.observe(it1, peers[i])
harness.observe(it2, peers[i])
harness.observe(it3, peers[i])
harness.observe(it4, peers[i])
harness.observe(it5, peers[i])

harness.observe(iu1, peers[i])
harness.observe(iu2, peers[i])
harness.observe(iu3, peers[i])
harness.observe(iu4, peers[i])
harness.observe(iu5, peers[i])
}

harness.observe(it3, peers[4])
harness.observe(it4, peers[4])
harness.observe(iu2, peers[4])
harness.observe(iu5, peers[4])

addrs := harness.oas.Addrs()
require.Len(t, addrs, 4)
require.Contains(t, addrs, it3)
require.Contains(t, addrs, it4)
require.Contains(t, addrs, iu2)
require.Contains(t, addrs, iu5)
}

func TestObservedAddrGroupKey(t *testing.T) {
ip4 := mafmt.Base(ma.P_IP4)
udp := mafmt.Base(ma.P_UDP)
tcp := mafmt.Base(ma.P_TCP)

oa1 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")}
require.Equal(t, mafmt.And(ip4, tcp).String(), oa1.GroupKey())

oa2 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.5/tcp/2222")}
require.Equal(t, mafmt.And(ip4, tcp).String(), oa2.GroupKey())

oa3 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")}
require.Equal(t, mafmt.And(ip4, udp).String(), oa3.GroupKey())
oa4 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531")}
require.Equal(t, mafmt.And(ip4, udp).String(), oa4.GroupKey())
}

0 comments on commit 2b5fbaa

Please sign in to comment.