Skip to content

Commit

Permalink
Merge pull request #585 from libp2p/fix/obsaddr
Browse files Browse the repository at this point in the history
Improve observed address handling
  • Loading branch information
vyzo authored Apr 9, 2019
2 parents 2a48294 + efdc140 commit 7ae0fda
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 46 deletions.
2 changes: 1 addition & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,
h.ids = opts.IdentifyService
} else {
// we can't set this as a default above because it depends on the *BasicHost.
h.ids = identify.NewIDService(h)
h.ids = identify.NewIDService(ctx, h)
}

if uint64(opts.NegotiationTimeout) != 0 {
Expand Down
9 changes: 5 additions & 4 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ type IDService struct {

// our own observed addresses.
// TODO: instead of expiring, remove these when we disconnect
observedAddrs ObservedAddrSet
observedAddrs *ObservedAddrSet
}

// NewIDService constructs a new *IDService and activates it by
// attaching its stream handler to the given host.Host.
func NewIDService(h host.Host) *IDService {
func NewIDService(ctx context.Context, h host.Host) *IDService {
s := &IDService{
Host: h,
currid: make(map[inet.Conn]chan struct{}),
Host: h,
currid: make(map[inet.Conn]chan struct{}),
observedAddrs: NewObservedAddrSet(ctx),
}
h.SetStreamHandler(ID, s.requestHandler)
h.SetStreamHandler(IDPush, s.pushHandler)
Expand Down
4 changes: 2 additions & 2 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func subtestIDService(t *testing.T) {
h1p := h1.ID()
h2p := h2.ID()

ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
ids1 := identify.NewIDService(ctx, h1)
ids2 := identify.NewIDService(ctx, h2)

testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing
Expand Down
116 changes: 79 additions & 37 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package identify

import (
"context"
"sync"
"time"

Expand All @@ -11,6 +12,8 @@ import (

const ActivationThresh = 4

var GCInterval = 10 * time.Minute

type observation struct {
seenTime time.Time
connDirection net.Direction
Expand Down Expand Up @@ -42,23 +45,40 @@ func (oa *ObservedAddr) activated(ttl time.Duration) bool {
return len(oa.SeenBy) >= ActivationThresh
}

type newObservation struct {
observed, local, observer ma.Multiaddr
direction net.Direction
}

// ObservedAddrSet keeps track of a set of ObservedAddrs
// the zero-value is ready to be used.
type ObservedAddrSet struct {
sync.Mutex // guards whole datastruct.
sync.RWMutex // guards whole datastruct.

// local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
ttl time.Duration

// this is the worker channel
wch chan newObservation
}

func NewObservedAddrSet(ctx context.Context) *ObservedAddrSet {
oas := &ObservedAddrSet{
addrs: make(map[string][]*ObservedAddr),
ttl: pstore.OwnObservedAddrTTL,
wch: make(chan newObservation, 16),
}
go oas.worker(ctx)
return oas
}

// AddrsFor return all activated observed addresses associated with the given
// (resolved) listen address.
func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
oas.Lock()
defer oas.Unlock()
oas.RLock()
defer oas.RUnlock()

// for zero-value.
if len(oas.addrs) == 0 {
return nil
}
Expand All @@ -70,53 +90,85 @@ func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
}

now := time.Now()
filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
for _, a := range observedAddrs {
// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
if a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}
if len(filteredAddrs) > 0 {
oas.addrs[key] = filteredAddrs
} else {
delete(oas.addrs, key)
}

return addrs
}

// Addrs return all activated observed addresses
func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) {
oas.Lock()
defer oas.Unlock()
oas.RLock()
defer oas.RUnlock()

// for zero-value.
if len(oas.addrs) == 0 {
return nil
}

now := time.Now()
for _, observedAddrs := range oas.addrs {
for _, a := range observedAddrs {
if now.Sub(a.LastSeen) <= oas.ttl && a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}
}
return addrs
}

func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
direction net.Direction) {
select {
case oas.wch <- newObservation{observed: observed, local: local, observer: observer, direction: direction}:
default:
log.Debugf("dropping address observation of %s; buffer full", observed)
}
}

func (oas *ObservedAddrSet) worker(ctx context.Context) {
ticker := time.NewTicker(GCInterval)
defer ticker.Stop()

for {
select {
case obs := <-oas.wch:
oas.doAdd(obs.observed, obs.local, obs.observer, obs.direction)

case <-ticker.C:
oas.gc()

case <-ctx.Done():
return
}
}
}

func (oas *ObservedAddrSet) gc() {
oas.Lock()
defer oas.Unlock()

now := time.Now()
for local, observedAddrs := range oas.addrs {
// TODO we can do this without allocating by compacting the array in place
filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
for _, a := range observedAddrs {
// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
if a.activated(oas.ttl) {
addrs = append(addrs, a.Addr)
}
}
}
oas.addrs[local] = filteredAddrs
if len(filteredAddrs) > 0 {
oas.addrs[local] = filteredAddrs
} else {
delete(oas.addrs, local)
}
}
return addrs
}

func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
func (oas *ObservedAddrSet) doAdd(observed, local, observer ma.Multiaddr,
direction net.Direction) {

now := time.Now()
Expand All @@ -130,12 +182,6 @@ func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
oas.Lock()
defer oas.Unlock()

// for zero-value.
if oas.addrs == nil {
oas.addrs = make(map[string][]*ObservedAddr)
oas.ttl = pstore.OwnObservedAddrTTL
}

observedAddrs := oas.addrs[localString]
// check if observed address seen yet, if so, update it
for i, previousObserved := range observedAddrs {
Expand Down Expand Up @@ -178,11 +224,7 @@ func (oas *ObservedAddrSet) SetTTL(ttl time.Duration) {
}

func (oas *ObservedAddrSet) TTL() time.Duration {
oas.Lock()
defer oas.Unlock()
// for zero-value.
if oas.addrs == nil {
oas.ttl = pstore.OwnObservedAddrTTL
}
oas.RLock()
defer oas.RUnlock()
return oas.ttl
}
12 changes: 10 additions & 2 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package identify

import (
"context"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -52,7 +53,9 @@ func TestObsAddrSet(t *testing.T) {
b4 := m("/ip4/1.2.3.9/tcp/1237")
b5 := m("/ip4/1.2.3.10/tcp/1237")

oas := &ObservedAddrSet{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oas := NewObservedAddrSet(ctx)

if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should be empty")
Expand All @@ -63,6 +66,7 @@ func TestObsAddrSet(t *testing.T) {
dummyDirection := net.DirOutbound

oas.Add(observed, dummyLocal, observer, dummyDirection)
time.Sleep(1 * time.Millisecond) // let the worker run
}

add(oas, a1, a4)
Expand Down Expand Up @@ -131,13 +135,17 @@ func TestAddAddrsProfile(b *testing.T) {
}
return m
}
oas := &ObservedAddrSet{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oas := NewObservedAddrSet(ctx)

add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) {
dummyLocal := m("/ip4/127.0.0.1/tcp/10086")
dummyDirection := net.DirOutbound

oas.Add(observed, dummyLocal, observer, dummyDirection)
time.Sleep(1 * time.Millisecond) // let the worker run
}

a1 := m("/ip4/1.2.3.4/tcp/1231")
Expand Down

0 comments on commit 7ae0fda

Please sign in to comment.