Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DHT API #11

Merged
merged 21 commits into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package p2pd

import (
"context"
"errors"
"math/rand"
"time"

inet "github.com/libp2p/go-libp2p-net"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)

var BootstrapPeers = []string{
Copy link
Member

@raulk raulk Oct 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the application wanted a DHT separate from ours? I'm sure that'll be the case of Ethereum. The Ethereum Foundation runs their own bootstrap nodes per network:

https://github.com/ethereum/go-ethereum/blob/461291882edce0ac4a28f64c4e8725b7f57cbeae/params/bootnodes.go

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a related note, what if we had more than one app behind this daemon, and each wanted a different DHT?

Copy link
Collaborator Author

@vyzo vyzo Oct 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should be able to support different bootstrap peers.
I think what we want is a configuration file of sorts for the daemon profile, and there we can list bootstrap peers there.
But I left that for a subsequent pr (we can open follow up issue).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'd like to see an application "opening a session" with a daemon, and providing any initial seed configuration as part of that handshake.

I think of the daemon as a service provided by the environment/OS for applications to use, versus a sidecar for applications. As such, multi-tenancy and app-specific configuration should be part of the protocol.

For now, it's good that it's on our radar. An issue would be great, and perhaps a note in the specs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can borrow the Options style configuration from go-libp2p and have a default bootstrap configuration with our peers

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's not a bad idea at all.
Upon further thought I'll add a -bootstrapPeers flag so that we can pass a comma separated list of peers in the command line, so that we can punt on the configuration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a command-line flag in bd604c4

"/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/ipfs/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"/dnsaddr/bootstrap.libp2p.io/ipfs/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/dnsaddr/bootstrap.libp2p.io/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
"/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM", // pluto.i.ipfs.io
"/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu", // saturn.i.ipfs.io
"/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io
"/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io
"/ip6/2604:a880:1:20::203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM", // pluto.i.ipfs.io
"/ip6/2400:6180:0:d0::151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu", // saturn.i.ipfs.io
"/ip6/2604:a880:800:10::4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io
"/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io
}

const BootstrapConnections = 4

func bootstrapPeerInfo() ([]*pstore.PeerInfo, error) {
pis := make([]*pstore.PeerInfo, len(BootstrapPeers))
for x, p := range BootstrapPeers {
a, err := ma.NewMultiaddr(p)
if err != nil {
return nil, err
}

pi, err := pstore.InfoFromP2pAddr(a)
if err != nil {
return nil, err
}

pis[x] = pi
}

return pis, nil
}

func shufflePeerInfos(peers []*pstore.PeerInfo) {
for i := range peers {
j := rand.Intn(i + 1)
peers[i], peers[j] = peers[j], peers[i]
}
}

func (d *Daemon) Bootstrap() error {
pis, err := bootstrapPeerInfo()
if err != nil {
return err
}

for _, pi := range pis {
d.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.PermanentAddrTTL)
}

count := d.connectBootstrapPeers(pis, BootstrapConnections)
if count == 0 {
return errors.New("Failed to connect to bootstrap peers")
}

go d.keepBootstrapConnections(pis)

if d.dht != nil {
return d.dht.Bootstrap(d.ctx)
}

return nil
}

func (d *Daemon) connectBootstrapPeers(pis []*pstore.PeerInfo, toconnect int) int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe validate that len(pis) >= toconnect? Just a sanity check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, if it is less we'll just connect to all of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, the loop is a range loop over peers 👍

count := 0

shufflePeerInfos(pis)

ctx, cancel := context.WithTimeout(d.ctx, 60*time.Second)
defer cancel()

for _, pi := range pis {
if d.host.Network().Connectedness(pi.ID) == inet.Connected {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check is redundant, because Host#Connect() already performs it as part of its contract.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not -- we want to skip already connected peers otherwise we will connect to fewer peers than intended.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, my bad!

continue
}
err := d.host.Connect(ctx, *pi)
if err != nil {
log.Debugf("Error connecting to bootstrap peer %s: %s", pi.ID, err.Error())
} else {
count++
toconnect--
}
if toconnect == 0 {
break
}
}

return count

}

func (d *Daemon) keepBootstrapConnections(pis []*pstore.PeerInfo) {
ticker := time.NewTicker(15 * time.Minute)
for {
<-ticker.C

conns := d.host.Network().Conns()
if len(conns) >= BootstrapConnections {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing to me, as it makes me think this goroutine is going to "pin/keep connections to bootstrap peers". But in practice we don't care if the active connections are to bootstrap peers or not.

If this is intended as a low watermark check for DHT, we should also verify that the returned conns support DHT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a low watermark check mostly for the dht -- but you may want to keep bootstrap connections (say for relay, observed address identification, etc) without it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should earmark this as something that could/should be made into a connection manager?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either that or move it into the dht itself, as it seems useful

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make a connection manager, but for now it's well isolated in its corner of the code :)

continue
}

toconnect := BootstrapConnections - len(conns)
d.connectBootstrapPeers(pis, toconnect)
}
}
67 changes: 62 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2pd

import (
"context"
"errors"
"io"
"net"
"time"
Expand Down Expand Up @@ -79,8 +78,44 @@ func (d *Daemon) handleConn(c net.Conn) {
return
}

case pb.Request_DHT:
res, ch, cancel := d.doDHT(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
if ch != nil {
cancel()
}
return
}

if ch != nil {
for res := range ch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to block on this response? is there any sane way it could be async? i suppose that would require per client connection locking?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No easy way to make this async without reinventing multiplexing.

err = w.WriteMsg(res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
cancel()
return
}
}

err = w.WriteMsg(dhtResponseEnd())
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
return
}
}

case pb.Request_LIST_PEERS:
res := d.doListPeers(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
return
}

default:
log.Debugf("Unexpected request type: %s", req.Type)
log.Debugf("Unexpected request type: %d", *req.Type)
return
}
}
Expand All @@ -104,7 +139,7 @@ func (d *Daemon) doConnect(req *pb.Request) *pb.Response {
defer cancel()

if req.Connect == nil {
return errorResponse(errors.New("Malformed request; missing parameters"))
return errorResponseString("Malformed request; missing parameters")
}

pid, err := peer.IDFromBytes(req.Connect.Peer)
Expand Down Expand Up @@ -141,7 +176,7 @@ func (d *Daemon) doStreamOpen(req *pb.Request) (*pb.Response, inet.Stream) {
defer cancel()

if req.StreamOpen == nil {
return errorResponse(errors.New("Malformed request; missing parameters")), nil
return errorResponseString("Malformed request; missing parameters"), nil
}

pid, err := peer.IDFromBytes(req.StreamOpen.Peer)
Expand Down Expand Up @@ -169,7 +204,7 @@ func (d *Daemon) doStreamOpen(req *pb.Request) (*pb.Response, inet.Stream) {

func (d *Daemon) doStreamHandler(req *pb.Request) *pb.Response {
if req.StreamHandler == nil {
return errorResponse(errors.New("Malformed request; missing parameters"))
return errorResponseString("Malformed request; missing parameters")
}

d.mx.Lock()
Expand All @@ -189,6 +224,21 @@ func (d *Daemon) doStreamHandler(req *pb.Request) *pb.Response {
return okResponse()
}

func (d *Daemon) doListPeers(req *pb.Request) *pb.Response {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

conns := d.host.Network().Conns()
peers := make([]*pb.PeerInfo, len(conns))
for x, conn := range conns {
peers[x] = &pb.PeerInfo{
Id: []byte(conn.RemotePeer()),
Addrs: [][]byte{conn.RemoteMultiaddr().Bytes()},
}
}

res := okResponse()
res.Peers = peers
return res
}

func okResponse() *pb.Response {
return &pb.Response{
Type: pb.Response_OK.Enum(),
Expand All @@ -203,6 +253,13 @@ func errorResponse(err error) *pb.Response {
}
}

func errorResponseString(err string) *pb.Response {
return &pb.Response{
Type: pb.Response_ERROR.Enum(),
Error: &pb.ErrorResponse{Msg: &err},
}
}

func makeStreamInfo(s inet.Stream) *pb.StreamInfo {
proto := string(s.Protocol())
return &pb.StreamInfo{
Expand Down
23 changes: 23 additions & 0 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
logging "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
peer "github.com/libp2p/go-libp2p-peer"
proto "github.com/libp2p/go-libp2p-protocol"
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
ma "github.com/multiformats/go-multiaddr"
)

Expand All @@ -20,6 +23,8 @@ type Daemon struct {
host host.Host
listener net.Listener

dht *dht.IpfsDHT

mx sync.Mutex
// stream handlers: map of protocol.ID to unix socket path
handlers map[proto.ID]string
Expand Down Expand Up @@ -49,6 +54,24 @@ func NewDaemon(ctx context.Context, path string, opts ...libp2p.Option) (*Daemon
return d, nil
}

func (d *Daemon) EnableDHT(client bool) error {
var opts []dhtopts.Option

if client {
opts = append(opts, dhtopts.Client(true))
}

dht, err := dht.New(d.ctx, d.host, opts...)
if err != nil {
return err
}

d.dht = dht
d.host = rhost.Wrap(d.host, d.dht)

return nil
}

func (d *Daemon) ID() peer.ID {
return d.host.ID()
}
Expand Down
Loading