diff --git a/core/core.go b/core/core.go index 2cc67eb6f175..d422a1aa80c2 100644 --- a/core/core.go +++ b/core/core.go @@ -48,6 +48,7 @@ import ( "github.com/ipfs/go-ipfs/namesys" ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" "github.com/ipfs/go-ipfs/p2p" + "github.com/ipfs/go-ipfs/peering" "github.com/ipfs/go-ipfs/repo" ) @@ -83,6 +84,7 @@ type IpfsNode struct { // Online PeerHost p2phost.Host `optional:"true"` // the network host (server+client) + Peering peering.PeeringService `optional:"true"` Filters *ma.Filters `optional:"true"` Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht diff --git a/core/node/groups.go b/core/node/groups.go index 9078f52b69fd..ad51473452e0 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -250,6 +250,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(OnlineExchange(shouldBitswapProvide)), maybeProvide(Graphsync, cfg.Experimental.GraphsyncEnabled), fx.Provide(Namesys(ipnsCacheSize)), + fx.Provide(Peering), + PeerWith(cfg.Peering.Peers...), fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)), diff --git a/core/node/peering.go b/core/node/peering.go new file mode 100644 index 000000000000..b5e7caadc335 --- /dev/null +++ b/core/node/peering.go @@ -0,0 +1,34 @@ +package node + +import ( + "context" + + "github.com/ipfs/go-ipfs/peering" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "go.uber.org/fx" +) + +// Peering constructs the peering service and hooks it into fx's lifetime +// management system. +func Peering(lc fx.Lifecycle, host host.Host) *peering.PeeringService { + ps := peering.NewPeeringService(host) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + return ps.Start() + }, + OnStop: func(context.Context) error { + return ps.Stop() + }, + }) + return ps +} + +// PeerWith configures the peering service to peer with the specified peers. +func PeerWith(peers ...peer.AddrInfo) fx.Option { + return fx.Invoke(func(ps *peering.PeeringService) { + for _, ai := range peers { + ps.AddPeer(ai) + } + }) +} diff --git a/docs/config.md b/docs/config.md index f2211954cb89..dc6db166c491 100644 --- a/docs/config.md +++ b/docs/config.md @@ -139,6 +139,8 @@ documented in `ipfs config profile --help`. - [`Pubsub`](#pubsub) - [`Pubsub.Router`](#pubsubrouter) - [`Pubsub.DisableSigning`](#pubsubdisablesigning) + - [`Peering`](#peering) + - [`Peering.Peers`](#peeringpeers) - [`Reprovider`](#reprovider) - [`Reprovider.Interval`](#reproviderinterval) - [`Reprovider.Strategy`](#reproviderstrategy) @@ -157,6 +159,7 @@ documented in `ipfs config profile --help`. - [`Swarm.ConnMgr.HighWater`](#swarmconnmgrhighwater) - [`Swarm.ConnMgr.GracePeriod`](#swarmconnmgrgraceperiod) + ## `Addresses` Contains information about various listener addresses to be used by this node. @@ -703,6 +706,26 @@ intentionally re-using the real message's message ID. Default: `false` +### `Peering` + +Configures the peering subsystem. The peering subsystem configures go-ipfs to +connect to, remain connected to, and reconnect to a set of peers. Peers should +use this subsystem to create "sticky" links between frequently used peers for +improved reliability. + +#### `Peering.Peers` + +The set of peers with which to peer. Each entry is of the form: + +```js +{ + "ID": "QmSomePeerID", # The peers ID. + "Addrs": ["/ip4/1.2.3.4/tcp/1234"] # Known addresses for the peer. If none are specified, the DHT will be queried. +} +``` + +Additional fields may be added in the future. + ## `Reprovider` ### `Reprovider.Interval` diff --git a/go.mod b/go.mod index eca31182162c..d6ba6a93ef51 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-cmds v0.2.9 - github.com/ipfs/go-ipfs-config v0.6.1 + github.com/ipfs/go-ipfs-config v0.6.2-0.20200525210524-789a84dfdbcb github.com/ipfs/go-ipfs-ds-help v0.1.1 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 diff --git a/go.sum b/go.sum index 41238c0571b8..36d44b083dc4 100644 --- a/go.sum +++ b/go.sum @@ -303,6 +303,8 @@ github.com/ipfs/go-ipfs-cmds v0.2.9 h1:zQTENe9UJrtCb2bOtRoDGjtuo3rQjmuPdPnVlqoBV github.com/ipfs/go-ipfs-cmds v0.2.9/go.mod h1:ZgYiWVnCk43ChwoH8hAmI1IRbuVtq3GSTHwtRB/Kqhk= github.com/ipfs/go-ipfs-config v0.6.1 h1:d1f0fEEpUQ9R+6c0VZMNy2P+wCl4K4DO4VHJBvgWwFw= github.com/ipfs/go-ipfs-config v0.6.1/go.mod h1:GQUxqb0NfkZmEU92PxqqqLVVFTLpoGGUlBaTyDaAqrE= +github.com/ipfs/go-ipfs-config v0.6.2-0.20200525210524-789a84dfdbcb h1:ABkTXEf534DQh3fF8KMfppgRH3RfHiGXf3b8DCfYWXI= +github.com/ipfs/go-ipfs-config v0.6.2-0.20200525210524-789a84dfdbcb/go.mod h1:GQUxqb0NfkZmEU92PxqqqLVVFTLpoGGUlBaTyDaAqrE= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= diff --git a/peering/peering.go b/peering/peering.go new file mode 100644 index 000000000000..c543712c5127 --- /dev/null +++ b/peering/peering.go @@ -0,0 +1,259 @@ +package peering + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" +) + +// maxBackoff is the maximum time between reconnect attempts. +const ( + maxBackoff = 10 * time.Minute + connmgrTag = "ipfs-peering" + // This needs to be sufficient to prevent two sides from simultaneously + // dialing. + initialDelay = 5 * time.Second +) + +var logger = log.Logger("peering") + +type state int + +const ( + stateInit state = iota + stateRunning + stateStopped +) + +// peerHandler keeps track of all state related to a specific "peering" peer. +type peerHandler struct { + peer peer.ID + host host.Host + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + addrs []multiaddr.Multiaddr + timer *time.Timer + + nextDelay time.Duration +} + +func (ph *peerHandler) stop() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer != nil { + ph.timer.Stop() + ph.timer = nil + } +} + +func (ph *peerHandler) nextBackoff() time.Duration { + // calculate the timeout + if ph.nextDelay < maxBackoff { + ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay))) + } + return ph.nextDelay +} + +func (ph *peerHandler) reconnect() { + // Try connecting + + ph.mu.Lock() + addrs := append(([]multiaddr.Multiaddr)(nil), ph.addrs...) + ph.mu.Unlock() + + logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs) + + err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs}) + if err != nil { + logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err) + // Ok, we failed. Extend the timeout. + ph.mu.Lock() + if ph.timer != nil { + // Only counts if the timer still exists. If not, a + // connection _was_ somehow established. + ph.timer.Reset(ph.nextBackoff()) + } + // Otherwise, someone else has stopped us so we can assume that + // we're either connected or someone else will start us. + ph.mu.Unlock() + } + + // Always call this. We could have connected since we processed the + // error. + ph.stopIfConnected() +} + +func (ph *peerHandler) stopIfConnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { + logger.Debugw("successfully reconnected", "peer", ph.peer) + ph.timer.Stop() + ph.timer = nil + ph.nextDelay = initialDelay + } +} + +// startIfDisconnected is the inverse of stopIfConnected. +func (ph *peerHandler) startIfDisconnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { + logger.Debugw("disconnected from peer", "peer", ph.peer) + // Always start with a short timeout so we can stagger things a bit. + ph.timer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) + } +} + +// PeeringService maintains connections to specified peers, reconnecting on +// disconnect with a back-off. +type PeeringService struct { + host host.Host + + mu sync.RWMutex + peers map[peer.ID]*peerHandler + + ctx context.Context + cancel context.CancelFunc + state state +} + +// NewPeeringService constructs a new peering service. Peers can be added and +// removed immediately, but connections won't be formed until `Start` is called. +func NewPeeringService(host host.Host) *PeeringService { + ps := &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} + ps.ctx, ps.cancel = context.WithCancel(context.Background()) + return ps +} + +// Start starts the peering service, connecting and maintaining connections to +// all registered peers. It returns an error if the service has already been +// stopped. +func (ps *PeeringService) Start() error { + ps.mu.Lock() + defer ps.mu.Unlock() + + switch ps.state { + case stateInit: + logger.Infow("starting") + case stateRunning: + return nil + case stateStopped: + return errors.New("already stopped") + } + ps.host.Network().Notify((*netNotifee)(ps)) + ps.state = stateRunning + for _, handler := range ps.peers { + go handler.startIfDisconnected() + } + return nil +} + +// Stop stops the peering service. +func (ps *PeeringService) Stop() error { + ps.cancel() + ps.host.Network().StopNotify((*netNotifee)(ps)) + + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.state == stateRunning { + logger.Infow("stopping") + for _, handler := range ps.peers { + handler.stop() + } + } + return nil +} + +// AddPeer adds a peer to the peering service. This function may be safely +// called at any time: before the service is started, while running, or after it +// stops. +// +// Add peer may also be called multiple times for the same peer. The new +// addresses will replace the old. +func (ps *PeeringService) AddPeer(info peer.AddrInfo) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[info.ID]; ok { + logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs) + handler.addrs = info.Addrs + } else { + logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) + ps.host.ConnManager().Protect(info.ID, connmgrTag) + + handler = &peerHandler{ + host: ps.host, + peer: info.ID, + addrs: info.Addrs, + nextDelay: initialDelay, + } + handler.ctx, handler.cancel = context.WithCancel(ps.ctx) + ps.peers[info.ID] = handler + if ps.state == stateRunning { + go handler.startIfDisconnected() + } + } +} + +// RemovePeer removes a peer from the peering service. This function may be +// safely called at any time: before the service is started, while running, or +// after it stops. +func (ps *PeeringService) RemovePeer(id peer.ID) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[id]; ok { + logger.Infow("peer removed", "peer", id) + ps.host.ConnManager().Unprotect(id, connmgrTag) + + handler.stop() + handler.cancel() + delete(ps.peers, id) + } +} + +type netNotifee PeeringService + +func (nn *netNotifee) Connected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.stopIfConnected() + } +} +func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.startIfDisconnected() + } +} +func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {} +func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {} +func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {} +func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {} diff --git a/peering/peering_test.go b/peering/peering_test.go new file mode 100644 index 000000000000..0be08dcdc806 --- /dev/null +++ b/peering/peering_test.go @@ -0,0 +1,6 @@ +package peering + +import "testing" + +func TestPeeringService(t *testing.T) { +} diff --git a/test/sharness/t0171-peering.sh b/test/sharness/t0171-peering.sh new file mode 100755 index 000000000000..ed1cbb436c6e --- /dev/null +++ b/test/sharness/t0171-peering.sh @@ -0,0 +1,127 @@ +#!/usr/bin/env bash + +test_description="Test peering service" + +. lib/test-lib.sh + +NUM_NODES=3 + +test_expect_success 'init iptb' ' + rm -rf .iptb/ && + iptb testbed create -type localipfs -count $NUM_NODES -init +' + +test_expect_success 'disabling routing' ' + iptb run -- ipfs config Routing.Type none +' + +for i in $(seq 0 2); do + ADDR="$(printf '["/ip4/127.0.0.1/tcp/%s"]' "$(( 3000 + ( RANDOM % 1000 ) ))")" + test_expect_success "configuring node $i to listen on $ADDR" ' + ipfsi "$i" config --json Addresses.Swarm "$ADDR" + ' +done + +peer_id() { + ipfsi "$1" config Identity.PeerID +} + +peer_addrs() { + ipfsi "$1" config Addresses.Swarm +} + +peer() { + PEER1="$1" && + PEER2="$2" && + PEER_LIST="$(ipfsi "$PEER1" config Peering.Peers)" && + { [[ "$PEER_LIST" == "null" ]] || PEER_LIST_INNER="${PEER_LIST:1:-1}"; } && + ADDR_INFO="$(printf '[%s{"ID": "%s", "Addrs": %s}]' \ + "${PEER_LIST_INNER:+${PEER_LIST_INNER},}" \ + "$(peer_id "$PEER2")" \ + "$(peer_addrs "$PEER2")")" && + ipfsi "$PEER1" config --json Peering.Peers "${ADDR_INFO}" +} + +# Peer: +# - 0 <-> 1 +# - 1 -> 2 +test_expect_success 'configure peering' ' + peer 0 1 && + peer 1 0 && + peer 1 2 +' + +list_peers() { + ipfsi "$1" swarm peers | sed 's|.*/p2p/\([^/]*\)$|\1|' | sort -u +} + +check_peers() { + sleep 20 # give it some time to settle. + test_expect_success 'verifying peering for peer 0' ' + list_peers 0 > peers_0_actual && + peer_id 1 > peers_0_expected && + test_cmp peers_0_expected peers_0_actual + ' + + test_expect_success 'verifying peering for peer 1' ' + list_peers 1 > peers_1_actual && + { peer_id 0 && peer_id 2 ; } | sort -u > peers_1_expected && + test_cmp peers_1_expected peers_1_actual + ' + + test_expect_success 'verifying peering for peer 2' ' + list_peers 2 > peers_2_actual && + peer_id 1 > peers_2_expected && + test_cmp peers_2_expected peers_2_actual + ' +} + +test_expect_success 'startup cluster' ' + iptb start -wait && + iptb run -- ipfs log level peering debug +' + +check_peers + +disconnect() { + ipfsi "$1" swarm disconnect "/p2p/$(peer_id "$2")" +} + +# Bidiractional peering shouldn't cause problems (e.g., simultaneous connect +# issues). +test_expect_success 'disconnecting 0->1' ' + disconnect 0 1 +' + +check_peers + +# 1 should reconnect to 2 when 2 disconnects from 1. +test_expect_success 'disconnecting 2->1' ' + disconnect 2 1 +' + +check_peers + +# 2 isn't peering. This test ensures that 1 will re-peer with 2 when it comes +# back online. +test_expect_success 'stopping 2' ' + iptb stop 2 +' + +# Wait to disconnect +sleep 30 + +test_expect_success 'starting 2' ' + iptb start 2 +' + +# Wait for backoff +sleep 30 + +check_peers + +test_expect_success "stop testbed" ' + iptb stop +' + +test_done