Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boostrap empty RT and Optimize allocs when we discover new peers #631

Merged
merged 9 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -86,7 +87,8 @@ type IpfsDHT struct {

// DHT protocols we query with. We'll only add peers to our routing
// table if they speak these protocols.
protocols []protocol.ID
protocols []protocol.ID
protocolsStrs []string

// DHT protocols we can respond to.
serverProtocols []protocol.ID
Expand All @@ -108,6 +110,11 @@ type IpfsDHT struct {
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error

// A set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
bootstrapPeers []peer.AddrInfo

maxRecordAge time.Duration

// Allows disabling dht subsystems. These should _only_ be set on
Expand Down Expand Up @@ -254,6 +261,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
protocols: protocols,
protocolsStrs: protocol.ConvertToStrings(protocols),
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
Expand All @@ -262,7 +270,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}),
fixLowPeersChan: make(chan struct{}, 1),
}

// construct routing table
Expand All @@ -271,6 +279,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt
dht.bootstrapPeers = cfg.bootstrapPeers

// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)
Expand Down Expand Up @@ -323,22 +332,70 @@ func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
timer := time.NewTimer(periodicBootstrapInterval)
defer timer.Stop()

for {
select {
case <-dht.fixLowPeersChan:
case <-timer.C:
case <-proc.Closing():
return
}

if dht.routingTable.Size() > minRTRefreshThreshold {
continue
}

// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.Context(), p, false)
}

// TODO Active Bootstrapping
// We should first use non-bootstrap peers we knew of from previous
// snapshots of the Routing Table before we connect to the bootstrappers.
// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
if dht.routingTable.Size() == 0 {
if len(dht.bootstrapPeers) == 0 {
// No point in continuing, we have no peers!
continue
}

found := 0
for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
ai := dht.bootstrapPeers[i]
err := dht.Host().Connect(dht.Context(), ai)
if err == nil {
found++
} else {
logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
}

// Wait for two bootstrap peers, or try them all.
//
// Why two? In theory, one should be enough
// normally. However, if the network were to
// restart and everyone connected to just one
// bootstrapper, we'll end up with a mostly
// partitioned network.
//
// So we always bootstrap with two random peers.
if found == maxNBoostrappers {
break
}
}
}

// if we still don't have peers in our routing table(probably because Identify hasn't completed),
// there is no point in triggering a Refresh.
if dht.routingTable.Size() == 0 {
continue
}

if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
Expand Down Expand Up @@ -504,9 +561,6 @@ func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.RemovePeer(p)

// since we lost a peer from the RT, we should do this here
dht.fixRTIfNeeded()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not do this? The idea was to fix our routing table if we drop below some number of peers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the periodic bootstrap will help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien Should have added a comment here. The dht.routingTable.RemovePeer call will result into a call to the Peer Removed callback which in turns calls fixRTIfNeeded .

}

func (dht *IpfsDHT) fixRTIfNeeded() {
Expand Down
4 changes: 4 additions & 0 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var minRTRefreshThreshold = 10

// timeout for pinging one peer
const peerPingTimeout = 10 * time.Second
const (
periodicBootstrapInterval = 2 * time.Minute
maxNBoostrappers = 2
)

func init() {
for _, s := range []string{
Expand Down
15 changes: 15 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record "github.com/libp2p/go-libp2p-record"
ma "github.com/multiformats/go-multiaddr"
)

// ModeOpt describes what mode the dht should operate in
Expand Down Expand Up @@ -60,6 +61,7 @@ type config struct {

// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -393,3 +395,16 @@ func V1CompatibleMode(enable bool) Option {
return nil
}
}

// BootstrapPeers configures the bootstrapping nodes that we will connect to to seed
// and refresh our Routing Table if it becomes empty.
func BootstrapPeers(addrs ...ma.Multiaddr) Option {
return func(c *config) error {
bootstrappers, err := peer.AddrInfosFromP2pAddrs(addrs...)
if err != nil {
return fmt.Errorf("failed to parse bootstrap peer addresses: %w", err)
}
c.bootstrapPeers = bootstrappers
return nil
}
}
104 changes: 104 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1957,3 +1957,107 @@ func TestRoutingFilter(t *testing.T) {
case <-time.After(time.Millisecond * 200):
}
}

func TestBootStrapWhenRTIsEmpty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create three boostrap peers each of which is connected to 1 other peer.
nBootStraps := 3
bootstrappers := setupDHTS(t, ctx, nBootStraps)
defer func() {
for i := 0; i < nBootStraps; i++ {
bootstrappers[i].Close()
defer bootstrappers[i].host.Close()
}
}()

bootstrapcons := setupDHTS(t, ctx, nBootStraps)
defer func() {
for i := 0; i < nBootStraps; i++ {
bootstrapcons[i].Close()
defer bootstrapcons[i].host.Close()
}
}()
for i := 0; i < nBootStraps; i++ {
connect(t, ctx, bootstrappers[i], bootstrapcons[i])
}

// convert the bootstrap addresses to a p2p address
bootstrapAddrs := make([]ma.Multiaddr, nBootStraps)
for i := 0; i < nBootStraps; i++ {
b, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: bootstrappers[i].self,
Addrs: bootstrappers[i].host.Addrs()})
require.NoError(t, err)
bootstrapAddrs[i] = b[0]
}

//----------------
// We will initialize a DHT with 1 bootstrapper, connect it to another DHT,
// then remove the latter from the Routing Table
// This should add the bootstrap peer and the peer that the bootstrap peer is conencted to
// to it's Routing Table.
// AutoRefresh needs to be enabled for this.
dht1, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
testPrefix,
NamespacedValidator("v", blankValidator{}),
Mode(ModeServer),
BootstrapPeers(bootstrapAddrs[0]),
)
require.NoError(t, err)
dht2 := setupDHT(ctx, t, false)
defer func() {
dht1.host.Close()
dht2.host.Close()
dht1.Close()
dht2.Close()
}()
connect(t, ctx, dht1, dht2)
require.NoError(t, dht2.Close())
require.NoError(t, dht2.host.Close())
require.NoError(t, dht1.host.Network().ClosePeer(dht2.self))
dht1.routingTable.RemovePeer(dht2.self)
require.NotContains(t, dht2.self, dht1.routingTable.ListPeers())

require.Eventually(t, func() bool {
return dht1.routingTable.Size() == 2 && dht1.routingTable.Find(bootstrappers[0].self) != "" &&
dht1.routingTable.Find(bootstrapcons[0].self) != ""
}, 5*time.Second, 500*time.Millisecond)

//----------------
// We will initialize a DHT with 2 bootstrappers, connect it to another DHT,
// then remove the DHT handler from the other DHT which should make the first DHT's
// routing table empty.
// This should add the bootstrap peers and the peer thats the bootstrap peers are connected to
// to it's Routing Table.
// AutoRefresh needs to be enabled for this.
dht1, err = New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
testPrefix,
NamespacedValidator("v", blankValidator{}),
Mode(ModeServer),
BootstrapPeers(bootstrapAddrs[1], bootstrapAddrs[2]),
)
require.NoError(t, err)

dht2 = setupDHT(ctx, t, false)
connect(t, ctx, dht1, dht2)
defer func() {
dht1.host.Close()
dht2.host.Close()
dht1.Close()
dht2.Close()
}()
connect(t, ctx, dht1, dht2)
require.NoError(t, dht2.setMode(modeClient))

require.Eventually(t, func() bool {
rt := dht1.routingTable

return rt.Size() == 4 && rt.Find(bootstrappers[1].self) != "" &&
rt.Find(bootstrappers[2].self) != "" && rt.Find(bootstrapcons[1].self) != "" && rt.Find(bootstrapcons[2].self) != ""
}, 5*time.Second, 500*time.Millisecond)
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ require (
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.8.2
github.com/libp2p/go-libp2p-core v0.5.3
github.com/libp2p/go-libp2p-kbucket v0.4.1
github.com/libp2p/go-libp2p-peerstore v0.2.3
github.com/libp2p/go-libp2p-core v0.5.4-0.20200508062439-98b95a487749
github.com/libp2p/go-libp2p-kbucket v0.4.2-0.20200508082052-a64592b8ad85
github.com/libp2p/go-libp2p-peerstore v0.2.4-0.20200508064014-7a58f873f4df
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-libp2p-swarm v0.2.3
Expand All @@ -28,7 +28,7 @@ require (
github.com/libp2p/go-netroute v0.1.2
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-net v0.1.4
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/multiformats/go-multihash v0.0.13
github.com/multiformats/go-multistream v0.1.1
github.com/stretchr/testify v1.5.1
Expand Down
Loading