Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter observed addresses #917

Merged
merged 4 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 70 additions & 14 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package identify

import (
"context"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -30,6 +31,10 @@ var GCInterval = 10 * time.Minute
// for adding to an ObservedAddrManager.
var observedAddrManagerWorkerChannelSize = 16

// maxObservedAddrsPerIPAndTransport is the maximum number of observed addresses
// we will return for each (IPx/TCP or UDP) group.
var maxObservedAddrsPerIPAndTransport = 2

type observation struct {
seenTime time.Time
connDirection network.Direction
Expand All @@ -46,13 +51,36 @@ type ObservedAddr struct {
LastSeen time.Time
}

func (oa *ObservedAddr) activated(ttl time.Duration) bool {
func (oa *ObservedAddr) activated() bool {

// We only activate if other peers observed the same address
// of ours at least 4 times. SeenBy peers are removed by GC if
// they say the address more than ttl*ActivationThresh
return len(oa.SeenBy) >= ActivationThresh
}

func (oa *ObservedAddr) numInbound() int {
count := 0
for obs := range oa.SeenBy {
if oa.SeenBy[obs].connDirection == network.DirInbound {
count++
}
}

return count
}

func (oa *ObservedAddr) GroupKey() string {
key := ""
protos := oa.Addr.Protocols()

for i := range protos {
key = key + "/" + protos[i].Name
}

return key
}
Stebalien marked this conversation as resolved.
Show resolved Hide resolved

type newObservation struct {
conn network.Conn
observed ma.Multiaddr
Expand Down Expand Up @@ -111,33 +139,61 @@ func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiadd
return
}

now := time.Now()
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}

return addrs
return oas.filter(observedAddrs)
}

// Addrs return all activated observed addresses
func (oas *ObservedAddrManager) Addrs() (addrs []ma.Multiaddr) {
func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr {
oas.mu.RLock()
defer oas.mu.RUnlock()

if len(oas.addrs) == 0 {
return nil
}

var allObserved []*ObservedAddr
for k := range oas.addrs {
allObserved = append(allObserved, oas.addrs[k]...)
}
return oas.filter(allObserved)
}

func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multiaddr {
pmap := make(map[string][]*ObservedAddr)
now := time.Now()
for _, observedAddrs := range oas.addrs {
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)

for i := range observedAddrs {
a := observedAddrs[i]
if now.Sub(a.LastSeen) <= oas.ttl && a.activated() {
// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
pat := a.GroupKey()
pmap[pat] = append(pmap[pat], a)

}
}

addrs := make([]ma.Multiaddr, 0, len(observedAddrs))
for pat := range pmap {
s := pmap[pat]

// We prefer inbound connection observations over outbound.
// For ties, we prefer the ones with more votes.
sort.Slice(s, func(i int, j int) bool {
first := s[i]
second := s[j]

if first.numInbound() > second.numInbound() {
return true
}

return len(first.SeenBy) > len(second.SeenBy)
})

for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ {
addrs = append(addrs, s[i].Addr)
}
}

return addrs
}

Expand Down
91 changes: 87 additions & 4 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
p2putil "github.com/libp2p/go-libp2p-netutil"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
ma "github.com/multiformats/go-multiaddr"

identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

type harness struct {
Expand Down Expand Up @@ -51,10 +52,11 @@ func (h *harness) conn(observer peer.ID) network.Conn {
return c
}

func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) {
func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) network.Conn {
c := h.conn(observer)
h.oas.Record(c, observed)
time.Sleep(1 * time.Millisecond) // let the worker run
time.Sleep(50 * time.Millisecond) // let the worker run
return c
}

func newHarness(ctx context.Context, t *testing.T) harness {
Expand Down Expand Up @@ -245,3 +247,84 @@ func TestAddAddrsProfile(t *testing.T) {

wg.Wait()
}

func TestObservedAddrFiltering(t *testing.T) {
addrsMarch := func(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}

for _, aa := range a {
found := false
for _, bb := range b {
if aa.Equal(bb) {
found = true
break
}
}
if !found {
return false
}
}
return true
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
if !addrsMarch(harness.oas.Addrs(), nil) {
t.Error("addrs should be empty")
}

// IP4/TCP
it1 := ma.StringCast("/ip4/1.2.3.4/tcp/1231")
it2 := ma.StringCast("/ip4/1.2.3.4/tcp/1232")
it3 := ma.StringCast("/ip4/1.2.3.4/tcp/1233")
it4 := ma.StringCast("/ip4/1.2.3.4/tcp/1234")
it5 := ma.StringCast("/ip4/1.2.3.4/tcp/1235")
it6 := ma.StringCast("/ip4/1.2.3.4/tcp/1236")
it7 := ma.StringCast("/ip4/1.2.3.4/tcp/1237")

// observers
b1 := ma.StringCast("/ip4/1.2.3.6/tcp/1236")
b2 := ma.StringCast("/ip4/1.2.3.7/tcp/1237")
b3 := ma.StringCast("/ip4/1.2.3.8/tcp/1237")
b4 := ma.StringCast("/ip4/1.2.3.9/tcp/1237")
b5 := ma.StringCast("/ip4/1.2.3.10/tcp/1237")

peers := []peer.ID{harness.add(b1), harness.add(b2), harness.add(b3), harness.add(b4), harness.add(b5)}
for i := 0; i < 4; i++ {
harness.observe(it1, peers[i])
harness.observe(it2, peers[i])
harness.observe(it3, peers[i])
harness.observe(it4, peers[i])
harness.observe(it5, peers[i])
harness.observe(it6, peers[i])
harness.observe(it7, peers[i])
}

harness.observe(it1, peers[4])
harness.observe(it7, peers[4])

addrs := harness.oas.Addrs()
require.Len(t, addrs, 2)
require.Contains(t, addrs, it1)
require.Contains(t, addrs, it7)

}

func TestObservedAddrGroupKey(t *testing.T) {
oa1 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")}
require.Equal(t, "/ip4/tcp", oa1.GroupKey())

oa2 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.5/tcp/2222")}
require.Equal(t, "/ip4/tcp", oa2.GroupKey())

oa3 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")}
require.Equal(t, "/ip4/udp", oa3.GroupKey())
oa4 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531")}
require.Equal(t, "/ip4/udp", oa4.GroupKey())

oa5 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531/quic")}
require.Equal(t, "/ip4/udp/quic", oa5.GroupKey())
}