Skip to content

Commit

Permalink
feat: implement peering service
Browse files Browse the repository at this point in the history
MVP for #6097

This feature will repeatedly reconnect (with a randomized exponential backoff)
to peers in a set of "peered" peers.

In the future, this should be extended to:

1. Include a CLI for modifying this list at runtime.
2. Include additional options for peers we want to _protect_ but not connect to.
3. Allow configuring timeouts, backoff, etc.
4. Allow groups? Possibly through textile threads.
5. Allow for runtime-only peering rules.
6. Different reconnect policies.

But this MVP should be a significant step forward.
  • Loading branch information
Stebalien committed May 26, 2020
1 parent 413ab31 commit e43b7e3
Show file tree
Hide file tree
Showing 9 changed files with 456 additions and 1 deletion.
2 changes: 2 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/ipfs/go-ipfs/namesys"
ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher"
"github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/peering"
"github.com/ipfs/go-ipfs/repo"
)

Expand Down Expand Up @@ -83,6 +84,7 @@ type IpfsNode struct {

// Online
PeerHost p2phost.Host `optional:"true"` // the network host (server+client)
Peering peering.PeeringService `optional:"true"`
Filters *ma.Filters `optional:"true"`
Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper
Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht
Expand Down
2 changes: 2 additions & 0 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(OnlineExchange(shouldBitswapProvide)),
maybeProvide(Graphsync, cfg.Experimental.GraphsyncEnabled),
fx.Provide(Namesys(ipnsCacheSize)),
fx.Provide(Peering),
PeerWith(cfg.Peering.Peers...),

fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)),

Expand Down
34 changes: 34 additions & 0 deletions core/node/peering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package node

import (
"context"

"github.com/ipfs/go-ipfs/peering"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx"
)

// Peering constructs the peering service and hooks it into fx's lifetime
// management system.
func Peering(lc fx.Lifecycle, host host.Host) *peering.PeeringService {
ps := peering.NewPeeringService(host)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
return ps.Start()
},
OnStop: func(context.Context) error {
return ps.Stop()
},
})
return ps
}

// PeerWith configures the peering service to peer with the specified peers.
func PeerWith(peers ...peer.AddrInfo) fx.Option {
return fx.Invoke(func(ps *peering.PeeringService) {
for _, ai := range peers {
ps.AddPeer(ai)
}
})
}
23 changes: 23 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ documented in `ipfs config profile --help`.
- [`Pubsub`](#pubsub)
- [`Pubsub.Router`](#pubsubrouter)
- [`Pubsub.DisableSigning`](#pubsubdisablesigning)
- [`Peering`](#peering)
- [`Peering.Peers`](#peeringpeers)
- [`Reprovider`](#reprovider)
- [`Reprovider.Interval`](#reproviderinterval)
- [`Reprovider.Strategy`](#reproviderstrategy)
Expand All @@ -157,6 +159,7 @@ documented in `ipfs config profile --help`.
- [`Swarm.ConnMgr.HighWater`](#swarmconnmgrhighwater)
- [`Swarm.ConnMgr.GracePeriod`](#swarmconnmgrgraceperiod)


## `Addresses`

Contains information about various listener addresses to be used by this node.
Expand Down Expand Up @@ -703,6 +706,26 @@ intentionally re-using the real message's message ID.

Default: `false`

### `Peering`

Configures the peering subsystem. The peering subsystem configures go-ipfs to
connect to, remain connected to, and reconnect to a set of peers. Peers should
use this subsystem to create "sticky" links between frequently used peers for
improved reliability.

#### `Peering.Peers`

The set of peers with which to peer. Each entry is of the form:

```js
{
"ID": "QmSomePeerID", # The peers ID.
"Addrs": ["/ip4/1.2.3.4/tcp/1234"] # Known addresses for the peer. If none are specified, the DHT will be queried.
}
```

Additional fields may be added in the future.

## `Reprovider`

### `Reprovider.Interval`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-cmds v0.2.9
github.com/ipfs/go-ipfs-config v0.6.1
github.com/ipfs/go-ipfs-config v0.6.2-0.20200525210524-789a84dfdbcb
github.com/ipfs/go-ipfs-ds-help v0.1.1
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ github.com/ipfs/go-ipfs-cmds v0.2.9 h1:zQTENe9UJrtCb2bOtRoDGjtuo3rQjmuPdPnVlqoBV
github.com/ipfs/go-ipfs-cmds v0.2.9/go.mod h1:ZgYiWVnCk43ChwoH8hAmI1IRbuVtq3GSTHwtRB/Kqhk=
github.com/ipfs/go-ipfs-config v0.6.1 h1:d1f0fEEpUQ9R+6c0VZMNy2P+wCl4K4DO4VHJBvgWwFw=
github.com/ipfs/go-ipfs-config v0.6.1/go.mod h1:GQUxqb0NfkZmEU92PxqqqLVVFTLpoGGUlBaTyDaAqrE=
github.com/ipfs/go-ipfs-config v0.6.2-0.20200525210524-789a84dfdbcb h1:ABkTXEf534DQh3fF8KMfppgRH3RfHiGXf3b8DCfYWXI=
github.com/ipfs/go-ipfs-config v0.6.2-0.20200525210524-789a84dfdbcb/go.mod h1:GQUxqb0NfkZmEU92PxqqqLVVFTLpoGGUlBaTyDaAqrE=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ=
github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
Expand Down
259 changes: 259 additions & 0 deletions peering/peering.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package peering

import (
"context"
"errors"
"math/rand"
"sync"
"time"

"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
)

// maxBackoff is the maximum time between reconnect attempts.
const (
maxBackoff = 10 * time.Minute
connmgrTag = "ipfs-peering"
// This needs to be sufficient to prevent two sides from simultaneously
// dialing.
initialDelay = 5 * time.Second
)

var logger = log.Logger("peering")

type state int

const (
stateInit state = iota
stateRunning
stateStopped
)

// peerHandler keeps track of all state related to a specific "peering" peer.
type peerHandler struct {
peer peer.ID
host host.Host
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
addrs []multiaddr.Multiaddr
timer *time.Timer

nextDelay time.Duration
}

func (ph *peerHandler) stop() {
ph.mu.Lock()
defer ph.mu.Unlock()

if ph.timer != nil {
ph.timer.Stop()
ph.timer = nil
}
}

func (ph *peerHandler) nextBackoff() time.Duration {
// calculate the timeout
if ph.nextDelay < maxBackoff {
ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay)))
}
return ph.nextDelay
}

func (ph *peerHandler) reconnect() {
// Try connecting

ph.mu.Lock()
addrs := append(([]multiaddr.Multiaddr)(nil), ph.addrs...)
ph.mu.Unlock()

logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs)

err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs})
if err != nil {
logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err)
// Ok, we failed. Extend the timeout.
ph.mu.Lock()
if ph.timer != nil {
// Only counts if the timer still exists. If not, a
// connection _was_ somehow established.
ph.timer.Reset(ph.nextBackoff())
}
// Otherwise, someone else has stopped us so we can assume that
// we're either connected or someone else will start us.
ph.mu.Unlock()
}

// Always call this. We could have connected since we processed the
// error.
ph.stopIfConnected()
}

func (ph *peerHandler) stopIfConnected() {
ph.mu.Lock()
defer ph.mu.Unlock()

if ph.timer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected {
logger.Debugw("successfully reconnected", "peer", ph.peer)
ph.timer.Stop()
ph.timer = nil
ph.nextDelay = initialDelay
}
}

// startIfDisconnected is the inverse of stopIfConnected.
func (ph *peerHandler) startIfDisconnected() {
ph.mu.Lock()
defer ph.mu.Unlock()

if ph.timer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected {
logger.Debugw("disconnected from peer", "peer", ph.peer)
// Always start with a short timeout so we can stagger things a bit.
ph.timer = time.AfterFunc(ph.nextBackoff(), ph.reconnect)
}
}

// PeeringService maintains connections to specified peers, reconnecting on
// disconnect with a back-off.
type PeeringService struct {
host host.Host

mu sync.RWMutex
peers map[peer.ID]*peerHandler

ctx context.Context
cancel context.CancelFunc
state state
}

// NewPeeringService constructs a new peering service. Peers can be added and
// removed immediately, but connections won't be formed until `Start` is called.
func NewPeeringService(host host.Host) *PeeringService {
ps := &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)}
ps.ctx, ps.cancel = context.WithCancel(context.Background())
return ps
}

// Start starts the peering service, connecting and maintaining connections to
// all registered peers. It returns an error if the service has already been
// stopped.
func (ps *PeeringService) Start() error {
ps.mu.Lock()
defer ps.mu.Unlock()

switch ps.state {
case stateInit:
logger.Infow("starting")
case stateRunning:
return nil
case stateStopped:
return errors.New("already stopped")
}
ps.host.Network().Notify((*netNotifee)(ps))
ps.state = stateRunning
for _, handler := range ps.peers {
go handler.startIfDisconnected()
}
return nil
}

// Stop stops the peering service.
func (ps *PeeringService) Stop() error {
ps.cancel()
ps.host.Network().StopNotify((*netNotifee)(ps))

ps.mu.Lock()
defer ps.mu.Unlock()

if ps.state == stateRunning {
logger.Infow("stopping")
for _, handler := range ps.peers {
handler.stop()
}
}
return nil
}

// AddPeer adds a peer to the peering service. This function may be safely
// called at any time: before the service is started, while running, or after it
// stops.
//
// Add peer may also be called multiple times for the same peer. The new
// addresses will replace the old.
func (ps *PeeringService) AddPeer(info peer.AddrInfo) {
ps.mu.Lock()
defer ps.mu.Unlock()

if handler, ok := ps.peers[info.ID]; ok {
logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs)
handler.addrs = info.Addrs
} else {
logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs)
ps.host.ConnManager().Protect(info.ID, connmgrTag)

handler = &peerHandler{
host: ps.host,
peer: info.ID,
addrs: info.Addrs,
nextDelay: initialDelay,
}
handler.ctx, handler.cancel = context.WithCancel(ps.ctx)
ps.peers[info.ID] = handler
if ps.state == stateRunning {
go handler.startIfDisconnected()
}
}
}

// RemovePeer removes a peer from the peering service. This function may be
// safely called at any time: before the service is started, while running, or
// after it stops.
func (ps *PeeringService) RemovePeer(id peer.ID) {
ps.mu.Lock()
defer ps.mu.Unlock()

if handler, ok := ps.peers[id]; ok {
logger.Infow("peer removed", "peer", id)
ps.host.ConnManager().Unprotect(id, connmgrTag)

handler.stop()
handler.cancel()
delete(ps.peers, id)
}
}

type netNotifee PeeringService

func (nn *netNotifee) Connected(_ network.Network, c network.Conn) {
ps := (*PeeringService)(nn)

p := c.RemotePeer()
ps.mu.RLock()
defer ps.mu.RUnlock()

if handler, ok := ps.peers[p]; ok {
// use a goroutine to avoid blocking events.
go handler.stopIfConnected()
}
}
func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) {
ps := (*PeeringService)(nn)

p := c.RemotePeer()
ps.mu.RLock()
defer ps.mu.RUnlock()

if handler, ok := ps.peers[p]; ok {
// use a goroutine to avoid blocking events.
go handler.startIfDisconnected()
}
}
func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {}
func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {}
func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {}
func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {}
6 changes: 6 additions & 0 deletions peering/peering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package peering

import "testing"

func TestPeeringService(t *testing.T) {
}
Loading

0 comments on commit e43b7e3

Please sign in to comment.