From e76c8a9c388c0b94a1feb126ee8d47844a2d7ebc Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 9 Apr 2019 12:13:30 +0300 Subject: [PATCH 1/7] don't eagerly update the observed address set in Addrs allocates and eats a lot of cpu time under the lock; let a background worker gc periodically. --- p2p/protocol/identify/obsaddr.go | 26 +++++--------------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index af153edc60..374722b1ac 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -70,21 +70,11 @@ func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) { } now := time.Now() - filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs)) for _, a := range observedAddrs { - // leave only alive observed addresses - if now.Sub(a.LastSeen) <= oas.ttl { - filteredAddrs = append(filteredAddrs, a) - if a.activated(oas.ttl) { - addrs = append(addrs, a.Addr) - } + if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) { + addrs = append(addrs, a.Addr) } } - if len(filteredAddrs) > 0 { - oas.addrs[key] = filteredAddrs - } else { - delete(oas.addrs, key) - } return addrs } @@ -100,18 +90,12 @@ func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) { } now := time.Now() - for local, observedAddrs := range oas.addrs { - filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs)) + for _, observedAddrs := range oas.addrs { for _, a := range observedAddrs { - // leave only alive observed addresses - if now.Sub(a.LastSeen) <= oas.ttl { - filteredAddrs = append(filteredAddrs, a) - if a.activated(oas.ttl) { - addrs = append(addrs, a.Addr) - } + if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) { + addrs = append(addrs, a.Addr) } } - oas.addrs[local] = filteredAddrs } return addrs } From b8a152f593afe50043c4a12607f1642284aaa8ec Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 9 Apr 2019 12:44:13 +0300 Subject: [PATCH 2/7] use a background worker for updating/gcing the observed address set --- p2p/host/basic/basic_host.go | 2 +- p2p/protocol/identify/id.go | 9 +++-- p2p/protocol/identify/obsaddr.go | 65 ++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 5 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 6127d34001..0725fa702d 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -132,7 +132,7 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, h.ids = opts.IdentifyService } else { // we can't set this as a default above because it depends on the *BasicHost. - h.ids = identify.NewIDService(h) + h.ids = identify.NewIDService(ctx, h) } if uint64(opts.NegotiationTimeout) != 0 { diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 01fc7a7b2c..f2e118b466 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -53,15 +53,16 @@ type IDService struct { // our own observed addresses. // TODO: instead of expiring, remove these when we disconnect - observedAddrs ObservedAddrSet + observedAddrs *ObservedAddrSet } // NewIDService constructs a new *IDService and activates it by // attaching its stream handler to the given host.Host. -func NewIDService(h host.Host) *IDService { +func NewIDService(ctx context.Context, h host.Host) *IDService { s := &IDService{ - Host: h, - currid: make(map[inet.Conn]chan struct{}), + Host: h, + currid: make(map[inet.Conn]chan struct{}), + observedAddrs: NewObservedAddrSet(ctx), } h.SetStreamHandler(ID, s.requestHandler) h.SetStreamHandler(IDPush, s.pushHandler) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 374722b1ac..f693763ab5 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -1,6 +1,7 @@ package identify import ( + "context" "sync" "time" @@ -11,6 +12,8 @@ import ( const ActivationThresh = 4 +var GCInterval = 10 * time.Minute + type observation struct { seenTime time.Time connDirection net.Direction @@ -42,6 +45,11 @@ func (oa *ObservedAddr) activated(ttl time.Duration) bool { return len(oa.SeenBy) >= ActivationThresh } +type newObservation struct { + observed, local, observer ma.Multiaddr + direction net.Direction +} + // ObservedAddrSet keeps track of a set of ObservedAddrs // the zero-value is ready to be used. type ObservedAddrSet struct { @@ -50,6 +58,18 @@ type ObservedAddrSet struct { // local(internal) address -> list of observed(external) addresses addrs map[string][]*ObservedAddr ttl time.Duration + + // this is the worker channel + wch chan newObservation +} + +func NewObservedAddrSet(ctx context.Context) *ObservedAddrSet { + oas := &ObservedAddrSet{ + addrs: make(map[string][]*ObservedAddr), + wch: make(chan newObservation, 1), + } + go oas.worker(ctx) + return oas } // AddrsFor return all activated observed addresses associated with the given @@ -102,6 +122,51 @@ func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) { func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr, direction net.Direction) { + select { + case oas.wch <- newObservation{observed: observed, local: local, observer: observer, direction: direction}: + default: + log.Debugf("dropping address observation of %s; buffer full", observed) + } +} + +func (oas *ObservedAddrSet) worker(ctx context.Context) { + ticker := time.NewTicker(GCInterval) + defer ticker.Stop() + + for { + select { + case obs := <-oas.wch: + oas.doAdd(obs.observed, obs.local, obs.observer, obs.direction) + + case <-ticker.C: + oas.gc() + + case <-ctx.Done(): + return + } + } +} + +func (oas *ObservedAddrSet) gc() { + oas.Lock() + defer oas.Unlock() + + now := time.Now() + for local, observedAddrs := range oas.addrs { + // TODO we can do this without allocating by compacting the array in place + filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs)) + for _, a := range observedAddrs { + // leave only alive observed addresses + if now.Sub(a.LastSeen) <= oas.ttl { + filteredAddrs = append(filteredAddrs, a) + } + } + oas.addrs[local] = filteredAddrs + } +} + +func (oas *ObservedAddrSet) doAdd(observed, local, observer ma.Multiaddr, + direction net.Direction) { now := time.Now() observerString := observerGroup(observer) From 9b8192b7d265723e985d1f3ecdef0bee03490dd2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 9 Apr 2019 13:19:54 +0300 Subject: [PATCH 3/7] initialize ttl in ObservedAddrSet --- p2p/protocol/identify/obsaddr.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index f693763ab5..5c7e0f99f1 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -66,6 +66,7 @@ type ObservedAddrSet struct { func NewObservedAddrSet(ctx context.Context) *ObservedAddrSet { oas := &ObservedAddrSet{ addrs: make(map[string][]*ObservedAddr), + ttl: pstore.OwnObservedAddrTTL, wch: make(chan newObservation, 1), } go oas.worker(ctx) @@ -78,7 +79,6 @@ func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) { oas.Lock() defer oas.Unlock() - // for zero-value. if len(oas.addrs) == 0 { return nil } @@ -104,7 +104,6 @@ func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) { oas.Lock() defer oas.Unlock() - // for zero-value. if len(oas.addrs) == 0 { return nil } @@ -179,12 +178,6 @@ func (oas *ObservedAddrSet) doAdd(observed, local, observer ma.Multiaddr, oas.Lock() defer oas.Unlock() - // for zero-value. - if oas.addrs == nil { - oas.addrs = make(map[string][]*ObservedAddr) - oas.ttl = pstore.OwnObservedAddrTTL - } - observedAddrs := oas.addrs[localString] // check if observed address seen yet, if so, update it for i, previousObserved := range observedAddrs { From f4af8d493df64fdcb4f0e3d32c9a23686e9924ec Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 9 Apr 2019 13:20:38 +0300 Subject: [PATCH 4/7] fix identify tests --- p2p/protocol/identify/id_test.go | 4 ++-- p2p/protocol/identify/obsaddr_test.go | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index 621ce1aa0c..fd56f28ca5 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -26,8 +26,8 @@ func subtestIDService(t *testing.T) { h1p := h1.ID() h2p := h2.ID() - ids1 := identify.NewIDService(h1) - ids2 := identify.NewIDService(h2) + ids1 := identify.NewIDService(ctx, h1) + ids2 := identify.NewIDService(ctx, h2) testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index 12bdb54a8f..440b009b46 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -1,6 +1,7 @@ package identify import ( + "context" "sync" "testing" "time" @@ -52,7 +53,9 @@ func TestObsAddrSet(t *testing.T) { b4 := m("/ip4/1.2.3.9/tcp/1237") b5 := m("/ip4/1.2.3.10/tcp/1237") - oas := &ObservedAddrSet{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + oas := NewObservedAddrSet(ctx) if !addrsMarch(oas.Addrs(), nil) { t.Error("addrs should be empty") @@ -63,6 +66,7 @@ func TestObsAddrSet(t *testing.T) { dummyDirection := net.DirOutbound oas.Add(observed, dummyLocal, observer, dummyDirection) + time.Sleep(1 * time.Millisecond) // let the worker run } add(oas, a1, a4) @@ -131,13 +135,17 @@ func TestAddAddrsProfile(b *testing.T) { } return m } - oas := &ObservedAddrSet{} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + oas := NewObservedAddrSet(ctx) add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) { dummyLocal := m("/ip4/127.0.0.1/tcp/10086") dummyDirection := net.DirOutbound oas.Add(observed, dummyLocal, observer, dummyDirection) + time.Sleep(1 * time.Millisecond) // let the worker run } a1 := m("/ip4/1.2.3.4/tcp/1231") From 17001b24ddaed5d4ba59abc9481cc9b1622ef0d9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 9 Apr 2019 13:40:46 +0300 Subject: [PATCH 5/7] use a read/write lock for observed address set --- p2p/protocol/identify/obsaddr.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 5c7e0f99f1..53a5f9c5b9 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -53,7 +53,7 @@ type newObservation struct { // ObservedAddrSet keeps track of a set of ObservedAddrs // the zero-value is ready to be used. type ObservedAddrSet struct { - sync.Mutex // guards whole datastruct. + sync.RWMutex // guards whole datastruct. // local(internal) address -> list of observed(external) addresses addrs map[string][]*ObservedAddr @@ -76,8 +76,8 @@ func NewObservedAddrSet(ctx context.Context) *ObservedAddrSet { // AddrsFor return all activated observed addresses associated with the given // (resolved) listen address. func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) { - oas.Lock() - defer oas.Unlock() + oas.RLock() + defer oas.RUnlock() if len(oas.addrs) == 0 { return nil @@ -101,8 +101,8 @@ func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) { // Addrs return all activated observed addresses func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) { - oas.Lock() - defer oas.Unlock() + oas.RLock() + defer oas.RUnlock() if len(oas.addrs) == 0 { return nil @@ -220,11 +220,7 @@ func (oas *ObservedAddrSet) SetTTL(ttl time.Duration) { } func (oas *ObservedAddrSet) TTL() time.Duration { - oas.Lock() - defer oas.Unlock() - // for zero-value. - if oas.addrs == nil { - oas.ttl = pstore.OwnObservedAddrTTL - } + oas.RLock() + defer oas.RUnlock() return oas.ttl } From fe7ba05e38c294f2163d0a2c1ea226712e20201b Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 9 Apr 2019 14:00:03 +0300 Subject: [PATCH 6/7] delete empty address sets on observed address set gc --- p2p/protocol/identify/obsaddr.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 53a5f9c5b9..b0c8b19c7b 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -160,7 +160,11 @@ func (oas *ObservedAddrSet) gc() { filteredAddrs = append(filteredAddrs, a) } } - oas.addrs[local] = filteredAddrs + if len(filteredAddrs) > 0 { + oas.addrs[local] = filteredAddrs + } else { + delete(oas.addrs, local) + } } } From efdc140264ee2fa5deefbf572700a0dab1e07241 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 9 Apr 2019 20:35:36 +0300 Subject: [PATCH 7/7] raise activation channel capacity to 16 --- p2p/protocol/identify/obsaddr.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index b0c8b19c7b..b568ba5055 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -67,7 +67,7 @@ func NewObservedAddrSet(ctx context.Context) *ObservedAddrSet { oas := &ObservedAddrSet{ addrs: make(map[string][]*ObservedAddr), ttl: pstore.OwnObservedAddrTTL, - wch: make(chan newObservation, 1), + wch: make(chan newObservation, 16), } go oas.worker(ctx) return oas