Skip to content

Commit

Permalink
Filter observed addresses (#917)
Browse files Browse the repository at this point in the history
* filter observed addrs
  • Loading branch information
aarshkshah1992 authored May 20, 2020
1 parent 9370679 commit 79ead33
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 18 deletions.
84 changes: 70 additions & 14 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package identify

import (
"context"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -30,6 +31,10 @@ var GCInterval = 10 * time.Minute
// for adding to an ObservedAddrManager.
var observedAddrManagerWorkerChannelSize = 16

// maxObservedAddrsPerIPAndTransport is the maximum number of observed addresses
// we will return for each (IPx/TCP or UDP) group.
var maxObservedAddrsPerIPAndTransport = 2

type observation struct {
seenTime time.Time
connDirection network.Direction
Expand All @@ -46,13 +51,36 @@ type ObservedAddr struct {
LastSeen time.Time
}

func (oa *ObservedAddr) activated(ttl time.Duration) bool {
func (oa *ObservedAddr) activated() bool {

// We only activate if other peers observed the same address
// of ours at least 4 times. SeenBy peers are removed by GC if
// they say the address more than ttl*ActivationThresh
return len(oa.SeenBy) >= ActivationThresh
}

func (oa *ObservedAddr) numInbound() int {
count := 0
for obs := range oa.SeenBy {
if oa.SeenBy[obs].connDirection == network.DirInbound {
count++
}
}

return count
}

func (oa *ObservedAddr) GroupKey() string {
key := ""
protos := oa.Addr.Protocols()

for i := range protos {
key = key + "/" + protos[i].Name
}

return key
}

type newObservation struct {
conn network.Conn
observed ma.Multiaddr
Expand Down Expand Up @@ -111,33 +139,61 @@ func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiadd
return
}

now := time.Now()
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}

return addrs
return oas.filter(observedAddrs)
}

// Addrs return all activated observed addresses
func (oas *ObservedAddrManager) Addrs() (addrs []ma.Multiaddr) {
func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr {
oas.mu.RLock()
defer oas.mu.RUnlock()

if len(oas.addrs) == 0 {
return nil
}

var allObserved []*ObservedAddr
for k := range oas.addrs {
allObserved = append(allObserved, oas.addrs[k]...)
}
return oas.filter(allObserved)
}

func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multiaddr {
pmap := make(map[string][]*ObservedAddr)
now := time.Now()
for _, observedAddrs := range oas.addrs {
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)

for i := range observedAddrs {
a := observedAddrs[i]
if now.Sub(a.LastSeen) <= oas.ttl && a.activated() {
// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
pat := a.GroupKey()
pmap[pat] = append(pmap[pat], a)

}
}

addrs := make([]ma.Multiaddr, 0, len(observedAddrs))
for pat := range pmap {
s := pmap[pat]

// We prefer inbound connection observations over outbound.
// For ties, we prefer the ones with more votes.
sort.Slice(s, func(i int, j int) bool {
first := s[i]
second := s[j]

if first.numInbound() > second.numInbound() {
return true
}

return len(first.SeenBy) > len(second.SeenBy)
})

for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ {
addrs = append(addrs, s[i].Addr)
}
}

return addrs
}

Expand Down
91 changes: 87 additions & 4 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
p2putil "github.com/libp2p/go-libp2p-netutil"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"

identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

type harness struct {
Expand Down Expand Up @@ -51,10 +52,11 @@ func (h *harness) conn(observer peer.ID) network.Conn {
return c
}

func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) {
func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) network.Conn {
c := h.conn(observer)
h.oas.Record(c, observed)
time.Sleep(1 * time.Millisecond) // let the worker run
time.Sleep(50 * time.Millisecond) // let the worker run
return c
}

func newHarness(ctx context.Context, t *testing.T) harness {
Expand Down Expand Up @@ -245,3 +247,84 @@ func TestAddAddrsProfile(t *testing.T) {

wg.Wait()
}

func TestObservedAddrFiltering(t *testing.T) {
addrsMarch := func(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}

for _, aa := range a {
found := false
for _, bb := range b {
if aa.Equal(bb) {
found = true
break
}
}
if !found {
return false
}
}
return true
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should be empty")
}

// IP4/TCP
it1 := ma.StringCast("/ip4/1.2.3.4/tcp/1231")
it2 := ma.StringCast("/ip4/1.2.3.4/tcp/1232")
it3 := ma.StringCast("/ip4/1.2.3.4/tcp/1233")
it4 := ma.StringCast("/ip4/1.2.3.4/tcp/1234")
it5 := ma.StringCast("/ip4/1.2.3.4/tcp/1235")
it6 := ma.StringCast("/ip4/1.2.3.4/tcp/1236")
it7 := ma.StringCast("/ip4/1.2.3.4/tcp/1237")

// observers
b1 := ma.StringCast("/ip4/1.2.3.6/tcp/1236")
b2 := ma.StringCast("/ip4/1.2.3.7/tcp/1237")
b3 := ma.StringCast("/ip4/1.2.3.8/tcp/1237")
b4 := ma.StringCast("/ip4/1.2.3.9/tcp/1237")
b5 := ma.StringCast("/ip4/1.2.3.10/tcp/1237")

peers := []peer.ID{harness.add(b1), harness.add(b2), harness.add(b3), harness.add(b4), harness.add(b5)}
for i := 0; i < 4; i++ {
harness.observe(it1, peers[i])
harness.observe(it2, peers[i])
harness.observe(it3, peers[i])
harness.observe(it4, peers[i])
harness.observe(it5, peers[i])
harness.observe(it6, peers[i])
harness.observe(it7, peers[i])
}

harness.observe(it1, peers[4])
harness.observe(it7, peers[4])

addrs := harness.oas.Addrs()
require.Len(t, addrs, 2)
require.Contains(t, addrs, it1)
require.Contains(t, addrs, it7)

}

func TestObservedAddrGroupKey(t *testing.T) {
oa1 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")}
require.Equal(t, "/ip4/tcp", oa1.GroupKey())

oa2 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.5/tcp/2222")}
require.Equal(t, "/ip4/tcp", oa2.GroupKey())

oa3 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")}
require.Equal(t, "/ip4/udp", oa3.GroupKey())
oa4 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531")}
require.Equal(t, "/ip4/udp", oa4.GroupKey())

oa5 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531/quic")}
require.Equal(t, "/ip4/udp/quic", oa5.GroupKey())
}

0 comments on commit 79ead33

Please sign in to comment.