Skip to content

Commit

Permalink
Boostrap empty RT and Optimize allocs when we discover new peers (#631)
Browse files Browse the repository at this point in the history
* Bootstrap when RT is empty and optimize allocations.

Co-authored-by: Steven Allen <[email protected]>
  • Loading branch information
aarshkshah1992 and Stebalien authored May 14, 2020
1 parent 9be7169 commit 08ab423
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 15 deletions.
66 changes: 60 additions & 6 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -86,7 +87,8 @@ type IpfsDHT struct {

// DHT protocols we query with. We'll only add peers to our routing
// table if they speak these protocols.
protocols []protocol.ID
protocols []protocol.ID
protocolsStrs []string

// DHT protocols we can respond to.
serverProtocols []protocol.ID
Expand All @@ -108,6 +110,11 @@ type IpfsDHT struct {
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error

// A set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
bootstrapPeers []peer.AddrInfo

maxRecordAge time.Duration

// Allows disabling dht subsystems. These should _only_ be set on
Expand Down Expand Up @@ -254,6 +261,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
protocols: protocols,
protocolsStrs: protocol.ConvertToStrings(protocols),
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
Expand All @@ -262,7 +270,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}),
fixLowPeersChan: make(chan struct{}, 1),
}

// construct routing table
Expand All @@ -271,6 +279,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt
dht.bootstrapPeers = cfg.bootstrapPeers

// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)
Expand Down Expand Up @@ -323,22 +332,70 @@ func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
timer := time.NewTimer(periodicBootstrapInterval)
defer timer.Stop()

for {
select {
case <-dht.fixLowPeersChan:
case <-timer.C:
case <-proc.Closing():
return
}

if dht.routingTable.Size() > minRTRefreshThreshold {
continue
}

// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.Context(), p, false)
}

// TODO Active Bootstrapping
// We should first use non-bootstrap peers we knew of from previous
// snapshots of the Routing Table before we connect to the bootstrappers.
// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
if dht.routingTable.Size() == 0 {
if len(dht.bootstrapPeers) == 0 {
// No point in continuing, we have no peers!
continue
}

found := 0
for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
ai := dht.bootstrapPeers[i]
err := dht.Host().Connect(dht.Context(), ai)
if err == nil {
found++
} else {
logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
}

// Wait for two bootstrap peers, or try them all.
//
// Why two? In theory, one should be enough
// normally. However, if the network were to
// restart and everyone connected to just one
// bootstrapper, we'll end up with a mostly
// partitioned network.
//
// So we always bootstrap with two random peers.
if found == maxNBoostrappers {
break
}
}
}

// if we still don't have peers in our routing table(probably because Identify hasn't completed),
// there is no point in triggering a Refresh.
if dht.routingTable.Size() == 0 {
continue
}

if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
Expand Down Expand Up @@ -504,9 +561,6 @@ func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.RemovePeer(p)

// since we lost a peer from the RT, we should do this here
dht.fixRTIfNeeded()
}

func (dht *IpfsDHT) fixRTIfNeeded() {
Expand Down
4 changes: 4 additions & 0 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var minRTRefreshThreshold = 10

// timeout for pinging one peer
const peerPingTimeout = 10 * time.Second
const (
periodicBootstrapInterval = 2 * time.Minute
maxNBoostrappers = 2
)

func init() {
for _, s := range []string{
Expand Down
15 changes: 15 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record "github.com/libp2p/go-libp2p-record"
ma "github.com/multiformats/go-multiaddr"
)

// ModeOpt describes what mode the dht should operate in
Expand Down Expand Up @@ -60,6 +61,7 @@ type config struct {

// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -393,3 +395,16 @@ func V1CompatibleMode(enable bool) Option {
return nil
}
}

// BootstrapPeers configures the bootstrapping nodes that we will connect to to seed
// and refresh our Routing Table if it becomes empty.
func BootstrapPeers(addrs ...ma.Multiaddr) Option {
return func(c *config) error {
bootstrappers, err := peer.AddrInfosFromP2pAddrs(addrs...)
if err != nil {
return fmt.Errorf("failed to parse bootstrap peer addresses: %w", err)
}
c.bootstrapPeers = bootstrappers
return nil
}
}
104 changes: 104 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1966,3 +1966,107 @@ func TestRoutingFilter(t *testing.T) {
case <-time.After(time.Millisecond * 200):
}
}

func TestBootStrapWhenRTIsEmpty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create three boostrap peers each of which is connected to 1 other peer.
nBootStraps := 3
bootstrappers := setupDHTS(t, ctx, nBootStraps)
defer func() {
for i := 0; i < nBootStraps; i++ {
bootstrappers[i].Close()
defer bootstrappers[i].host.Close()
}
}()

bootstrapcons := setupDHTS(t, ctx, nBootStraps)
defer func() {
for i := 0; i < nBootStraps; i++ {
bootstrapcons[i].Close()
defer bootstrapcons[i].host.Close()
}
}()
for i := 0; i < nBootStraps; i++ {
connect(t, ctx, bootstrappers[i], bootstrapcons[i])
}

// convert the bootstrap addresses to a p2p address
bootstrapAddrs := make([]ma.Multiaddr, nBootStraps)
for i := 0; i < nBootStraps; i++ {
b, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: bootstrappers[i].self,
Addrs: bootstrappers[i].host.Addrs()})
require.NoError(t, err)
bootstrapAddrs[i] = b[0]
}

//----------------
// We will initialize a DHT with 1 bootstrapper, connect it to another DHT,
// then remove the latter from the Routing Table
// This should add the bootstrap peer and the peer that the bootstrap peer is conencted to
// to it's Routing Table.
// AutoRefresh needs to be enabled for this.
dht1, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
testPrefix,
NamespacedValidator("v", blankValidator{}),
Mode(ModeServer),
BootstrapPeers(bootstrapAddrs[0]),
)
require.NoError(t, err)
dht2 := setupDHT(ctx, t, false)
defer func() {
dht1.host.Close()
dht2.host.Close()
dht1.Close()
dht2.Close()
}()
connect(t, ctx, dht1, dht2)
require.NoError(t, dht2.Close())
require.NoError(t, dht2.host.Close())
require.NoError(t, dht1.host.Network().ClosePeer(dht2.self))
dht1.routingTable.RemovePeer(dht2.self)
require.NotContains(t, dht2.self, dht1.routingTable.ListPeers())

require.Eventually(t, func() bool {
return dht1.routingTable.Size() == 2 && dht1.routingTable.Find(bootstrappers[0].self) != "" &&
dht1.routingTable.Find(bootstrapcons[0].self) != ""
}, 5*time.Second, 500*time.Millisecond)

//----------------
// We will initialize a DHT with 2 bootstrappers, connect it to another DHT,
// then remove the DHT handler from the other DHT which should make the first DHT's
// routing table empty.
// This should add the bootstrap peers and the peer thats the bootstrap peers are connected to
// to it's Routing Table.
// AutoRefresh needs to be enabled for this.
dht1, err = New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
testPrefix,
NamespacedValidator("v", blankValidator{}),
Mode(ModeServer),
BootstrapPeers(bootstrapAddrs[1], bootstrapAddrs[2]),
)
require.NoError(t, err)

dht2 = setupDHT(ctx, t, false)
connect(t, ctx, dht1, dht2)
defer func() {
dht1.host.Close()
dht2.host.Close()
dht1.Close()
dht2.Close()
}()
connect(t, ctx, dht1, dht2)
require.NoError(t, dht2.setMode(modeClient))

require.Eventually(t, func() bool {
rt := dht1.routingTable

return rt.Size() == 4 && rt.Find(bootstrappers[1].self) != "" &&
rt.Find(bootstrappers[2].self) != "" && rt.Find(bootstrapcons[1].self) != "" && rt.Find(bootstrapcons[2].self) != ""
}, 5*time.Second, 500*time.Millisecond)
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ require (
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.8.2
github.com/libp2p/go-libp2p-core v0.5.3
github.com/libp2p/go-libp2p-kbucket v0.4.1
github.com/libp2p/go-libp2p-peerstore v0.2.3
github.com/libp2p/go-libp2p-core v0.5.4
github.com/libp2p/go-libp2p-kbucket v0.4.2
github.com/libp2p/go-libp2p-peerstore v0.2.4
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-libp2p-swarm v0.2.3
Expand All @@ -28,7 +28,7 @@ require (
github.com/libp2p/go-netroute v0.1.2
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-net v0.1.4
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/multiformats/go-multihash v0.0.13
github.com/multiformats/go-multistream v0.1.1
github.com/stretchr/testify v1.5.1
Expand Down
Loading

0 comments on commit 08ab423

Please sign in to comment.