Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Routing Table Refresh manager #601

Merged
merged 13 commits into from
May 19, 2020
75 changes: 31 additions & 44 deletions rtrefresh/rt_refresh_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rtrefresh
import (
"context"
"fmt"
"sort"
"sync"
"time"

Expand All @@ -19,8 +18,7 @@ import (
var logger = logging.Logger("dht/RtRefreshManager")

const (
maxNoResultsAfterRefresh = 2
peerPingTimeout = 10 * time.Second
peerPingTimeout = 10 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a constant instead of a RefresherManager variable? Is it implicitly relying on other constants (e.g. we're using 2x the libp2p/go-tcp-transport's default 5 second timeout), or just a default that seems reasonable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aschmahmann Just the default seems reasonable. This is a simple dial timeout.

)

type triggerRefreshReq struct {
Expand Down Expand Up @@ -50,8 +48,7 @@ type RtRefreshManager struct {
refreshInterval time.Duration
successfulOutboundQueryGracePeriod time.Duration

triggerRefresh chan *triggerRefreshReq // channel to write refresh requests to.
noResultsForCpl map[uint]int // tracks how many times we didn't get any peer for a cpl
triggerRefresh chan *triggerRefreshReq // channel to write refresh requests to.
}

func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
Expand All @@ -77,8 +74,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool
refreshInterval: refreshInterval,
successfulOutboundQueryGracePeriod: successfulOutboundQueryGracePeriod,

triggerRefresh: make(chan *triggerRefreshReq),
noResultsForCpl: make(map[uint]int),
triggerRefresh: make(chan *triggerRefreshReq),
}, nil
}

Expand Down Expand Up @@ -202,60 +198,51 @@ func (r *RtRefreshManager) doRefresh(forceRefresh bool) error {
merr = multierror.Append(merr, err)
}

for c, lastRefreshedAt := range r.rt.GetTrackedCplsForRefresh() {
cpl := uint(c)

// skip cpls that we've stopped discovering new peers for
if r.noResultsForCpl[cpl] >= maxNoResultsAfterRefresh {
continue
}

isCplFull := r.rt.IsBucketFull(cpl)
peersBeforeRefresh := r.rt.GetPeersForCpl(cpl)
refreshCpls := r.rt.GetTrackedCplsForRefresh()

var err error
rfnc := func(c int) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be worth making forceRefresh / refreshCpls paramters, and making this a method, rather than an anonymous function

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 May 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott

Given that this anon func is ONLY used here and you'd have to pass it multiple params, I'm not sure we need the new method/abstraction. It's just a simple 4 line func.

cpl := uint(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rfnc := func(c int) (err error) {
cpl := uint(c)
rfnc := func(cpl uint) (err error) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done.

if forceRefresh {
err = r.refreshCpl(cpl)
} else {
err = r.refreshCplIfEligible(cpl, lastRefreshedAt)
err = r.refreshCplIfEligible(cpl, refreshCpls[c])
}
return
}

if err != nil {
for c := range refreshCpls {
cpl := uint(c)
if err := rfnc(c); err != nil {
merr = multierror.Append(merr, err)
} else {
if !refreshDiscoverNewPeers(isCplFull, peersBeforeRefresh, r.rt.GetPeersForCpl(cpl)) {
r.noResultsForCpl[cpl] = r.noResultsForCpl[cpl] + 1
// If we see a gap at a Cpl in the Routing table, we ONLY refresh up until the maximum cpl we
// have in the Routing Table OR (2 * Cpl with the gap), whichever is smaller.
// This is to prevent refreshes for Cpls that have no peers in the network but happen to be before a very high max Cpl
// for which we do have peers in the network.
// The number of 2 * Cpl can be proved and a proof would have been written here if the programmer
// had paid more attention in the Math classes at university.
// So, please be patient and a doc explaining it will be published soon.
if r.rt.NPeersForCpl(cpl) == 0 {
lastCpl := min(2*c, len(refreshCpls)-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's an off by one error here. i.e. lastCpl := min(2*(c+1), len(refreshCpls)-1).

@petar @Stebalien can you double check me here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok: Substitute "len(refreshCpls)-1" for "lastCpl" in the "<" condition of the for loop. You get:
i < len(refreshCpls)-1+1
That's what you want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 I talked this over with @petar and where we got was that there's still some formalization to be figured out about how we want to deal with gaps. However, using lastCpl := min(2*c, len(refreshCpls)-1) isn't necessarily great as is because if c=0 then we stop refreshing which seems bad (although it's highly unlikely in a large network).

Given that the pathological case where the first couple buckets are empty should be quite rare, it probably doesn't matter how many more buckets we refresh beyond 2*c (e.g. instead of the 2c in the min statement use 2c+1, 2(c+1), or min(2c, 5)). For now I'd just go with lastCpl := min(2*(c+1), len(refreshCpls)-1).

for i := c + 1; i < lastCpl+1; i++ {
if err := rfnc(i); err != nil {
merr = multierror.Append(merr, err)
}
}
return merr
}
}
}

return merr
}

// did we discover any new peers because of the refresh for this cpl ?
// Only if they have the exact same elements
// should we consider that we didn't discover any new peers.
// This is because peers can also randomly drop on and off the routing table.
func refreshDiscoverNewPeers(wasCplFull bool, peersBeforeRefresh []peer.ID, peersAfterRefresh []peer.ID) bool {
// we should always refresh buckets that were once full
if wasCplFull || (len(peersBeforeRefresh) != len(peersAfterRefresh)) {
return true
func min(a int, b int) int {
if a <= b {
return a
}

sort.Slice(peersBeforeRefresh, func(i, j int) bool {
return peersBeforeRefresh[i] < peersBeforeRefresh[j]
})

sort.Slice(peersAfterRefresh, func(i, j int) bool {
return peersAfterRefresh[i] < peersAfterRefresh[j]
})

for i := range peersBeforeRefresh {
if peersBeforeRefresh[i] != peersAfterRefresh[i] {
return true
}
}
return false
return b
}

func (r *RtRefreshManager) refreshCplIfEligible(cpl uint, lastRefreshedAt time.Time) error {
Expand Down
93 changes: 83 additions & 10 deletions rtrefresh/rt_refresh_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,94 @@
package rtrefresh

import (
"context"
"strconv"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"

kb "github.com/libp2p/go-libp2p-kbucket"
pstore "github.com/libp2p/go-libp2p-peerstore"

"github.com/stretchr/testify/require"
)

func TestRefreshDiscoverNewPeers(t *testing.T) {
// when cpl is full, we always get true
p1 := []peer.ID{"a"}
p2 := []peer.ID{"a"}
func TestSkipRefreshOnGapCpls(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
local := test.RandPeerIDFatal(t)

// adds a peer for a cpl
qFuncWithIgnore := func(rt *kb.RoutingTable, ignoreCpl uint) func(c context.Context, key string) error {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
return func(c context.Context, key string) error {
if key == string(local) {
return nil
}

u, err := strconv.ParseInt(key, 10, 64)
require.NoError(t, err)

if uint(u) == ignoreCpl {
return nil
}

p, err := rt.GenRandPeerID(uint(u))
require.NoError(t, err)
b, err := rt.TryAddPeer(p, true)
require.True(t, b)
require.NoError(t, err)
return nil
}
}

kfnc := func(cpl uint) (string, error) {
return strconv.FormatInt(int64(cpl), 10), nil
}

// when 2*gapcpl < maxCpl
// gap is 2 and max is 10
rt, err := kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour)
require.NoError(t, err)
r := &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local}
icpl := uint(2)
p, err := rt.GenRandPeerID(10)
require.NoError(t, err)
b, _ := rt.TryAddPeer(p, true)
require.True(t, b)
r.refreshQueryFnc = qFuncWithIgnore(rt, icpl)
require.NoError(t, r.doRefresh(true))

for i := uint(0); i < 2*icpl+1; i++ {
if i == icpl {
require.Equal(t, 0, rt.NPeersForCpl(i))
continue
}
require.Equal(t, 1, rt.NPeersForCpl(uint(i)))
}
for i := 2*icpl + 1; i < 10; i++ {
require.Equal(t, 0, rt.NPeersForCpl(i))
}

// when 2 * gapcpl > maxCpl
rt, err = kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour)
require.NoError(t, err)
r = &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local}
icpl = uint(6)
p, err = rt.GenRandPeerID(10)
require.NoError(t, err)
b, _ = rt.TryAddPeer(p, true)
require.True(t, b)
r.refreshQueryFnc = qFuncWithIgnore(rt, icpl)
require.NoError(t, r.doRefresh(true))

for i := uint(0); i < 10; i++ {
if i == icpl {
require.Equal(t, 0, rt.NPeersForCpl(i))
continue
}

require.True(t, refreshDiscoverNewPeers(true, p1, p2))
require.False(t, refreshDiscoverNewPeers(false, p1, p1))
require.False(t, refreshDiscoverNewPeers(false, nil, nil))
require.True(t, refreshDiscoverNewPeers(false, p1, append(p1, peer.ID("x"))))
require.True(t, refreshDiscoverNewPeers(false, []peer.ID{"x"}, []peer.ID{"y"}))
require.Equal(t, 1, rt.NPeersForCpl(uint(i)))
}
require.Equal(t, 2, rt.NPeersForCpl(10))
}