From 79ead33e327c99cc95589202e1164e391c320455 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 20 May 2020 11:39:45 +0530 Subject: [PATCH] Filter observed addresses (#917) * filter observed addrs --- p2p/protocol/identify/obsaddr.go | 84 ++++++++++++++++++++----- p2p/protocol/identify/obsaddr_test.go | 91 +++++++++++++++++++++++++-- 2 files changed, 157 insertions(+), 18 deletions(-) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 3822ba8daa..5d339a806d 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -2,6 +2,7 @@ package identify import ( "context" + "sort" "sync" "time" @@ -30,6 +31,10 @@ var GCInterval = 10 * time.Minute // for adding to an ObservedAddrManager. var observedAddrManagerWorkerChannelSize = 16 +// maxObservedAddrsPerIPAndTransport is the maximum number of observed addresses +// we will return for each (IPx/TCP or UDP) group. +var maxObservedAddrsPerIPAndTransport = 2 + type observation struct { seenTime time.Time connDirection network.Direction @@ -46,13 +51,36 @@ type ObservedAddr struct { LastSeen time.Time } -func (oa *ObservedAddr) activated(ttl time.Duration) bool { +func (oa *ObservedAddr) activated() bool { + // We only activate if other peers observed the same address // of ours at least 4 times. SeenBy peers are removed by GC if // they say the address more than ttl*ActivationThresh return len(oa.SeenBy) >= ActivationThresh } +func (oa *ObservedAddr) numInbound() int { + count := 0 + for obs := range oa.SeenBy { + if oa.SeenBy[obs].connDirection == network.DirInbound { + count++ + } + } + + return count +} + +func (oa *ObservedAddr) GroupKey() string { + key := "" + protos := oa.Addr.Protocols() + + for i := range protos { + key = key + "/" + protos[i].Name + } + + return key +} + type newObservation struct { conn network.Conn observed ma.Multiaddr @@ -111,18 +139,11 @@ func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiadd return } - now := time.Now() - for _, a := range observedAddrs { - if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) { - addrs = append(addrs, a.Addr) - } - } - - return addrs + return oas.filter(observedAddrs) } // Addrs return all activated observed addresses -func (oas *ObservedAddrManager) Addrs() (addrs []ma.Multiaddr) { +func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr { oas.mu.RLock() defer oas.mu.RUnlock() @@ -130,14 +151,49 @@ func (oas *ObservedAddrManager) Addrs() (addrs []ma.Multiaddr) { return nil } + var allObserved []*ObservedAddr + for k := range oas.addrs { + allObserved = append(allObserved, oas.addrs[k]...) + } + return oas.filter(allObserved) +} + +func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multiaddr { + pmap := make(map[string][]*ObservedAddr) now := time.Now() - for _, observedAddrs := range oas.addrs { - for _, a := range observedAddrs { - if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) { - addrs = append(addrs, a.Addr) + + for i := range observedAddrs { + a := observedAddrs[i] + if now.Sub(a.LastSeen) <= oas.ttl && a.activated() { + // group addresses by their IPX/Transport Protocol(TCP or UDP) pattern. + pat := a.GroupKey() + pmap[pat] = append(pmap[pat], a) + + } + } + + addrs := make([]ma.Multiaddr, 0, len(observedAddrs)) + for pat := range pmap { + s := pmap[pat] + + // We prefer inbound connection observations over outbound. + // For ties, we prefer the ones with more votes. + sort.Slice(s, func(i int, j int) bool { + first := s[i] + second := s[j] + + if first.numInbound() > second.numInbound() { + return true } + + return len(first.SeenBy) > len(second.SeenBy) + }) + + for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ { + addrs = append(addrs, s[i].Addr) } } + return addrs } diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index 9c0d2183d4..2beab1a674 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -12,9 +12,10 @@ import ( "github.com/libp2p/go-libp2p-core/peer" p2putil "github.com/libp2p/go-libp2p-netutil" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" - ma "github.com/multiformats/go-multiaddr" - identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" + + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" ) type harness struct { @@ -51,10 +52,11 @@ func (h *harness) conn(observer peer.ID) network.Conn { return c } -func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) { +func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) network.Conn { c := h.conn(observer) h.oas.Record(c, observed) - time.Sleep(1 * time.Millisecond) // let the worker run + time.Sleep(50 * time.Millisecond) // let the worker run + return c } func newHarness(ctx context.Context, t *testing.T) harness { @@ -245,3 +247,84 @@ func TestAddAddrsProfile(t *testing.T) { wg.Wait() } + +func TestObservedAddrFiltering(t *testing.T) { + addrsMarch := func(a, b []ma.Multiaddr) bool { + if len(a) != len(b) { + return false + } + + for _, aa := range a { + found := false + for _, bb := range b { + if aa.Equal(bb) { + found = true + break + } + } + if !found { + return false + } + } + return true + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + harness := newHarness(ctx, t) + if !addrsMarch(harness.oas.Addrs(), nil) { + t.Error("addrs should be empty") + } + + // IP4/TCP + it1 := ma.StringCast("/ip4/1.2.3.4/tcp/1231") + it2 := ma.StringCast("/ip4/1.2.3.4/tcp/1232") + it3 := ma.StringCast("/ip4/1.2.3.4/tcp/1233") + it4 := ma.StringCast("/ip4/1.2.3.4/tcp/1234") + it5 := ma.StringCast("/ip4/1.2.3.4/tcp/1235") + it6 := ma.StringCast("/ip4/1.2.3.4/tcp/1236") + it7 := ma.StringCast("/ip4/1.2.3.4/tcp/1237") + + // observers + b1 := ma.StringCast("/ip4/1.2.3.6/tcp/1236") + b2 := ma.StringCast("/ip4/1.2.3.7/tcp/1237") + b3 := ma.StringCast("/ip4/1.2.3.8/tcp/1237") + b4 := ma.StringCast("/ip4/1.2.3.9/tcp/1237") + b5 := ma.StringCast("/ip4/1.2.3.10/tcp/1237") + + peers := []peer.ID{harness.add(b1), harness.add(b2), harness.add(b3), harness.add(b4), harness.add(b5)} + for i := 0; i < 4; i++ { + harness.observe(it1, peers[i]) + harness.observe(it2, peers[i]) + harness.observe(it3, peers[i]) + harness.observe(it4, peers[i]) + harness.observe(it5, peers[i]) + harness.observe(it6, peers[i]) + harness.observe(it7, peers[i]) + } + + harness.observe(it1, peers[4]) + harness.observe(it7, peers[4]) + + addrs := harness.oas.Addrs() + require.Len(t, addrs, 2) + require.Contains(t, addrs, it1) + require.Contains(t, addrs, it7) + +} + +func TestObservedAddrGroupKey(t *testing.T) { + oa1 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")} + require.Equal(t, "/ip4/tcp", oa1.GroupKey()) + + oa2 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.5/tcp/2222")} + require.Equal(t, "/ip4/tcp", oa2.GroupKey()) + + oa3 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")} + require.Equal(t, "/ip4/udp", oa3.GroupKey()) + oa4 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531")} + require.Equal(t, "/ip4/udp", oa4.GroupKey()) + + oa5 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.3.3.4/udp/1531/quic")} + require.Equal(t, "/ip4/udp/quic", oa5.GroupKey()) +}