Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bootstrap): save connected peers as backup bootstrap peers #8856

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions config/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package config

type Internal struct {
// All marked as omitempty since we are expecting to make changes to all subcomponents of Internal
Bitswap *InternalBitswap `json:",omitempty"`
UnixFSShardingSizeThreshold *OptionalString `json:",omitempty"`
Libp2pForceReachability *OptionalString `json:",omitempty"`
Bitswap *InternalBitswap `json:",omitempty"`
UnixFSShardingSizeThreshold *OptionalString `json:",omitempty"`
Libp2pForceReachability *OptionalString `json:",omitempty"`
BackupBootstrapInterval *OptionalDuration `json:",omitempty"`
}

type InternalBitswap struct {
Expand Down
229 changes: 180 additions & 49 deletions core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package bootstrap
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"sync"
"sync/atomic"
"time"

logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
"github.com/jbenet/goprocess/context"
"github.com/jbenet/goprocess/periodic"
goprocessctx "github.com/jbenet/goprocess/context"
periodicproc "github.com/jbenet/goprocess/periodic"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -50,13 +50,26 @@ type BootstrapConfig struct {
// for the bootstrap process to use. This makes it possible for clients
// to control the peers the process uses at any moment.
BootstrapPeers func() []peer.AddrInfo

// BackupBootstrapInterval governs the periodic interval at which the node will
// attempt to save connected nodes to use as temporary bootstrap peers.
BackupBootstrapInterval time.Duration

// MaxBackupBootstrapSize controls the maximum number of peers we're saving
// as backup bootstrap peers.
MaxBackupBootstrapSize int

SaveBackupBootstrapPeers func(context.Context, []peer.AddrInfo)
LoadBackupBootstrapPeers func(context.Context) []peer.AddrInfo
}

// DefaultBootstrapConfig specifies default sane parameters for bootstrapping.
var DefaultBootstrapConfig = BootstrapConfig{
MinPeerThreshold: 4,
Period: 30 * time.Second,
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
MinPeerThreshold: 4,
Period: 30 * time.Second,
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
BackupBootstrapInterval: 1 * time.Hour,
MaxBackupBootstrapSize: 20,
Comment on lines +68 to +72
Copy link
Member

@lidel lidel May 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ We can expose these later under Internal.* if needed (in future PRs).

For now, only Internal.BackupBootstrapInterval is exposed for use in the test/cli/backup_bootstrap_test.go test. Not documenting it, as we may change the name/location later.

}

func BootstrapConfigWithPeers(pis []peer.AddrInfo) BootstrapConfig {
Expand Down Expand Up @@ -90,6 +103,9 @@ func Bootstrap(id peer.ID, host host.Host, rt routing.Routing, cfg BootstrapConf
log.Debugf("%s bootstrap error: %s", id, err)
}

// Exit the first call (triggered independently by `proc.Go`, not `Tick`)
// only after being done with the *single* Routing.Bootstrap call. Following
// periodic calls (`Tick`) will not block on this.
<-doneWithRound
}

Expand All @@ -108,9 +124,100 @@ func Bootstrap(id peer.ID, host host.Host, rt routing.Routing, cfg BootstrapConf

doneWithRound <- struct{}{}
close(doneWithRound) // it no longer blocks periodic

startSavePeersAsTemporaryBootstrapProc(cfg, host, proc)

return proc, nil
}

// Aside of the main bootstrap process we also run a secondary one that saves
// connected peers as a backup measure if we can't connect to the official
// bootstrap ones. These peers will serve as *temporary* bootstrap nodes.
func startSavePeersAsTemporaryBootstrapProc(cfg BootstrapConfig, host host.Host, bootstrapProc goprocess.Process) {
savePeersFn := func(worker goprocess.Process) {
ctx := goprocessctx.OnClosingContext(worker)

if err := saveConnectedPeersAsTemporaryBootstrap(ctx, host, cfg); err != nil {
log.Debugf("saveConnectedPeersAsTemporaryBootstrap error: %s", err)
}
}
savePeersProc := periodicproc.Tick(cfg.BackupBootstrapInterval, savePeersFn)

// When the main bootstrap process ends also terminate the 'save connected
// peers' ones. Coupling the two seems the easiest way to handle this backup
// process without additional complexity.
go func() {
<-bootstrapProc.Closing()
savePeersProc.Close()
}()

// Run the first round now (after the first bootstrap process has finished)
// as the SavePeersPeriod can be much longer than bootstrap.
savePeersProc.Go(savePeersFn)
}

func saveConnectedPeersAsTemporaryBootstrap(ctx context.Context, host host.Host, cfg BootstrapConfig) error {
// Randomize the list of connected peers, we don't prioritize anyone.
connectedPeers := randomizeList(host.Network().Peers())

bootstrapPeers := cfg.BootstrapPeers()
backupPeers := make([]peer.AddrInfo, 0, cfg.MaxBackupBootstrapSize)

// Choose peers to save and filter out the ones that are already bootstrap nodes.
for _, p := range connectedPeers {
found := false
for _, bootstrapPeer := range bootstrapPeers {
if p == bootstrapPeer.ID {
found = true
break
}
}
if !found {
backupPeers = append(backupPeers, peer.AddrInfo{
ID: p,
Addrs: host.Network().Peerstore().Addrs(p),
})
}

if len(backupPeers) >= cfg.MaxBackupBootstrapSize {
break
}
}

// If we didn't reach the target number use previously stored connected peers.
if len(backupPeers) < cfg.MaxBackupBootstrapSize {
oldSavedPeers := cfg.LoadBackupBootstrapPeers(ctx)
log.Debugf("missing %d peers to reach backup bootstrap target of %d, trying from previous list of %d saved peers",
cfg.MaxBackupBootstrapSize-len(backupPeers), cfg.MaxBackupBootstrapSize, len(oldSavedPeers))

// Add some of the old saved peers. Ensure we don't duplicate them.
for _, p := range oldSavedPeers {
found := false
for _, sp := range backupPeers {
if p.ID == sp.ID {
found = true
break
}
}

if !found {
backupPeers = append(backupPeers, p)
}

if len(backupPeers) >= cfg.MaxBackupBootstrapSize {
break
}
}
}

cfg.SaveBackupBootstrapPeers(ctx, backupPeers)
log.Debugf("saved %d peers (of %d target) as bootstrap backup in the config", len(backupPeers), cfg.MaxBackupBootstrapSize)
return nil
}

// Connect to as many peers needed to reach the BootstrapConfig.MinPeerThreshold.
// Peers can be original bootstrap or temporary ones (drawn from a list of
// persisted previously connected peers).
func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) error {

ctx, cancel := context.WithTimeout(ctx, cfg.ConnectionTimeout)
Expand All @@ -127,35 +234,58 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er
id, len(connected), cfg.MinPeerThreshold)
return nil
}
numToDial := cfg.MinPeerThreshold - len(connected)
numToDial := cfg.MinPeerThreshold - len(connected) // numToDial > 0

// filter out bootstrap nodes we are already connected to
var notConnected []peer.AddrInfo
for _, p := range peers {
if host.Network().Connectedness(p.ID) != network.Connected {
notConnected = append(notConnected, p)
if len(peers) > 0 {
numToDial -= int(peersConnect(ctx, host, peers, numToDial, true))
if numToDial <= 0 {
return nil
}
}

// if connected to all bootstrap peer candidates, exit
if len(notConnected) < 1 {
log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial)
return ErrNotEnoughBootstrapPeers
log.Debugf("not enough bootstrap peers to fill the remaining target of %d connections, trying backup list", numToDial)

tempBootstrapPeers := cfg.LoadBackupBootstrapPeers(ctx)
if len(tempBootstrapPeers) > 0 {
numToDial -= int(peersConnect(ctx, host, tempBootstrapPeers, numToDial, false))
if numToDial <= 0 {
return nil
}
}

// connect to a random susbset of bootstrap candidates
randSubset := randomSubsetOfPeers(notConnected, numToDial)
log.Debugf("tried both original bootstrap peers and temporary ones but still missing target of %d connections", numToDial)

log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset)
return bootstrapConnect(ctx, host, randSubset)
return ErrNotEnoughBootstrapPeers
}

func bootstrapConnect(ctx context.Context, ph host.Host, peers []peer.AddrInfo) error {
if len(peers) < 1 {
return ErrNotEnoughBootstrapPeers
}
// Attempt to make `needed` connections from the `availablePeers` list. Mark
// peers as either `permanent` or temporary when adding them to the Peerstore.
// Return the number of connections completed. We eagerly over-connect in parallel,
// so we might connect to more than needed.
// (We spawn as many routines and attempt connections as the number of availablePeers,
// but this list comes from restricted sets of original or temporary bootstrap
// nodes which will keep it under a sane value.)
func peersConnect(ctx context.Context, ph host.Host, availablePeers []peer.AddrInfo, needed int, permanent bool) uint64 {
peers := randomizeList(availablePeers)

// Monitor the number of connections and stop if we reach the target.
var connected uint64
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
if int(atomic.LoadUint64(&connected)) >= needed {
cancel()
return
}
}
}
}()

errs := make(chan error, len(peers))
var wg sync.WaitGroup
for _, p := range peers {

Expand All @@ -164,45 +294,46 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []peer.AddrInfo)
// fail/abort due to an expiring context.
// Also, performed asynchronously for dial speed.

if int(atomic.LoadUint64(&connected)) >= needed {
cancel()
break
}

wg.Add(1)
go func(p peer.AddrInfo) {
defer wg.Done()

// Skip addresses belonging to a peer we're already connected to.
// (Not a guarantee but a best-effort policy.)
if ph.Network().Connectedness(p.ID) == network.Connected {
return
}
log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID)

ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
if err := ph.Connect(ctx, p); err != nil {
log.Debugf("failed to bootstrap with %v: %s", p.ID, err)
errs <- err
if ctx.Err() != context.Canceled {
log.Debugf("failed to bootstrap with %v: %s", p.ID, err)
}
return
}
if permanent {
// We're connecting to an original bootstrap peer, mark it as
// a permanent address (Connect will register it as TempAddrTTL).
ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
}

log.Infof("bootstrapped with %v", p.ID)
atomic.AddUint64(&connected, 1)
}(p)
}
wg.Wait()

// our failure condition is when no connection attempt succeeded.
// So drain the errs channel, counting the results.
close(errs)
count := 0
var err error
for err = range errs {
if err != nil {
count++
}
}
if count == len(peers) {
return fmt.Errorf("failed to bootstrap. %s", err)
}
return nil
return connected
}

func randomSubsetOfPeers(in []peer.AddrInfo, max int) []peer.AddrInfo {
if max > len(in) {
max = len(in)
}

out := make([]peer.AddrInfo, max)
for i, val := range rand.Perm(len(in))[:max] {
func randomizeList[T any](in []T) []T {
out := make([]T, len(in))
for i, val := range rand.Perm(len(in)) {
out[i] = in[val]
}
return out
Expand Down
6 changes: 3 additions & 3 deletions core/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/libp2p/go-libp2p/core/test"
)

func TestSubsetWhenMaxIsGreaterThanLengthOfSlice(t *testing.T) {
func TestRandomizeAddressList(t *testing.T) {
var ps []peer.AddrInfo
sizeofSlice := 100
sizeofSlice := 10
for i := 0; i < sizeofSlice; i++ {
pid, err := test.RandPeerID()
if err != nil {
Expand All @@ -18,7 +18,7 @@ func TestSubsetWhenMaxIsGreaterThanLengthOfSlice(t *testing.T) {

ps = append(ps, peer.AddrInfo{ID: pid})
}
out := randomSubsetOfPeers(ps, 2*sizeofSlice)
out := randomizeList(ps)
if len(out) != len(ps) {
t.Fail()
}
Expand Down
Loading