-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DHT API #11
DHT API #11
Changes from 18 commits
5e720e5
b13ae19
b1258bc
14296fa
30110e2
4647341
cd177ac
900aa28
f5e74ac
533a8e7
ab07018
29e500c
812e5a1
f2d26a4
37ac1a1
eea9c45
55e194d
e1a93a8
bd604c4
d6a1cea
4d7e846
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package p2pd | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"math/rand" | ||
"time" | ||
|
||
inet "github.com/libp2p/go-libp2p-net" | ||
pstore "github.com/libp2p/go-libp2p-peerstore" | ||
ma "github.com/multiformats/go-multiaddr" | ||
) | ||
|
||
var BootstrapPeers = []string{ | ||
"/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", | ||
"/dnsaddr/bootstrap.libp2p.io/ipfs/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", | ||
"/dnsaddr/bootstrap.libp2p.io/ipfs/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb", | ||
"/dnsaddr/bootstrap.libp2p.io/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", | ||
"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io | ||
"/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM", // pluto.i.ipfs.io | ||
"/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu", // saturn.i.ipfs.io | ||
"/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io | ||
"/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io | ||
"/ip6/2604:a880:1:20::203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM", // pluto.i.ipfs.io | ||
"/ip6/2400:6180:0:d0::151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu", // saturn.i.ipfs.io | ||
"/ip6/2604:a880:800:10::4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io | ||
"/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io | ||
} | ||
|
||
const BootstrapConnections = 4 | ||
|
||
func bootstrapPeerInfo() ([]*pstore.PeerInfo, error) { | ||
pis := make([]*pstore.PeerInfo, len(BootstrapPeers)) | ||
for x, p := range BootstrapPeers { | ||
a, err := ma.NewMultiaddr(p) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
pi, err := pstore.InfoFromP2pAddr(a) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
pis[x] = pi | ||
} | ||
|
||
return pis, nil | ||
} | ||
|
||
func shufflePeerInfos(peers []*pstore.PeerInfo) { | ||
for i := range peers { | ||
j := rand.Intn(i + 1) | ||
peers[i], peers[j] = peers[j], peers[i] | ||
} | ||
} | ||
|
||
func (d *Daemon) Bootstrap() error { | ||
pis, err := bootstrapPeerInfo() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, pi := range pis { | ||
d.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.PermanentAddrTTL) | ||
} | ||
|
||
count := d.connectBootstrapPeers(pis, BootstrapConnections) | ||
if count == 0 { | ||
return errors.New("Failed to connect to bootstrap peers") | ||
} | ||
|
||
go d.keepBootstrapConnections(pis) | ||
|
||
if d.dht != nil { | ||
return d.dht.Bootstrap(d.ctx) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (d *Daemon) connectBootstrapPeers(pis []*pstore.PeerInfo, toconnect int) int { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe validate that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nah, if it is less we'll just connect to all of them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yeah, the loop is a range loop over peers 👍 |
||
count := 0 | ||
|
||
shufflePeerInfos(pis) | ||
|
||
ctx, cancel := context.WithTimeout(d.ctx, 60*time.Second) | ||
defer cancel() | ||
|
||
for _, pi := range pis { | ||
if d.host.Network().Connectedness(pi.ID) == inet.Connected { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this check is redundant, because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's not -- we want to skip already connected peers otherwise we will connect to fewer peers than intended. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, right, my bad! |
||
continue | ||
} | ||
err := d.host.Connect(ctx, *pi) | ||
if err != nil { | ||
log.Debugf("Error connecting to bootstrap peer %s: %s", pi.ID, err.Error()) | ||
} else { | ||
count++ | ||
toconnect-- | ||
} | ||
if toconnect == 0 { | ||
break | ||
} | ||
} | ||
|
||
return count | ||
|
||
} | ||
|
||
func (d *Daemon) keepBootstrapConnections(pis []*pstore.PeerInfo) { | ||
ticker := time.NewTicker(15 * time.Minute) | ||
for { | ||
<-ticker.C | ||
|
||
conns := d.host.Network().Conns() | ||
if len(conns) >= BootstrapConnections { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit confusing to me, as it makes me think this goroutine is going to "pin/keep connections to bootstrap peers". But in practice we don't care if the active connections are to bootstrap peers or not. If this is intended as a low watermark check for DHT, we should also verify that the returned conns support DHT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's a low watermark check mostly for the dht -- but you may want to keep bootstrap connections (say for relay, observed address identification, etc) without it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. perhaps we should earmark this as something that could/should be made into a connection manager? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. either that or move it into the dht itself, as it seems useful There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could make a connection manager, but for now it's well isolated in its corner of the code :) |
||
continue | ||
} | ||
|
||
toconnect := BootstrapConnections - len(conns) | ||
d.connectBootstrapPeers(pis, toconnect) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ package p2pd | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"io" | ||
"net" | ||
"time" | ||
|
@@ -79,8 +78,44 @@ func (d *Daemon) handleConn(c net.Conn) { | |
return | ||
} | ||
|
||
case pb.Request_DHT: | ||
res, ch, cancel := d.doDHT(&req) | ||
err := w.WriteMsg(res) | ||
if err != nil { | ||
log.Debugf("Error writing response: %s", err.Error()) | ||
if ch != nil { | ||
cancel() | ||
} | ||
return | ||
} | ||
|
||
if ch != nil { | ||
for res := range ch { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want to block on this response? is there any sane way it could be async? i suppose that would require per client connection locking? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No easy way to make this async without reinventing multiplexing. |
||
err = w.WriteMsg(res) | ||
if err != nil { | ||
log.Debugf("Error writing response: %s", err.Error()) | ||
cancel() | ||
return | ||
} | ||
} | ||
|
||
err = w.WriteMsg(dhtResponseEnd()) | ||
if err != nil { | ||
log.Debugf("Error writing response: %s", err.Error()) | ||
return | ||
} | ||
} | ||
|
||
case pb.Request_LIST_PEERS: | ||
res := d.doListPeers(&req) | ||
err := w.WriteMsg(res) | ||
if err != nil { | ||
log.Debugf("Error writing response: %s", err.Error()) | ||
return | ||
} | ||
|
||
default: | ||
log.Debugf("Unexpected request type: %s", req.Type) | ||
log.Debugf("Unexpected request type: %d", *req.Type) | ||
return | ||
} | ||
} | ||
|
@@ -104,7 +139,7 @@ func (d *Daemon) doConnect(req *pb.Request) *pb.Response { | |
defer cancel() | ||
|
||
if req.Connect == nil { | ||
return errorResponse(errors.New("Malformed request; missing parameters")) | ||
return errorResponseString("Malformed request; missing parameters") | ||
} | ||
|
||
pid, err := peer.IDFromBytes(req.Connect.Peer) | ||
|
@@ -141,7 +176,7 @@ func (d *Daemon) doStreamOpen(req *pb.Request) (*pb.Response, inet.Stream) { | |
defer cancel() | ||
|
||
if req.StreamOpen == nil { | ||
return errorResponse(errors.New("Malformed request; missing parameters")), nil | ||
return errorResponseString("Malformed request; missing parameters"), nil | ||
} | ||
|
||
pid, err := peer.IDFromBytes(req.StreamOpen.Peer) | ||
|
@@ -169,7 +204,7 @@ func (d *Daemon) doStreamOpen(req *pb.Request) (*pb.Response, inet.Stream) { | |
|
||
func (d *Daemon) doStreamHandler(req *pb.Request) *pb.Response { | ||
if req.StreamHandler == nil { | ||
return errorResponse(errors.New("Malformed request; missing parameters")) | ||
return errorResponseString("Malformed request; missing parameters") | ||
} | ||
|
||
d.mx.Lock() | ||
|
@@ -189,6 +224,21 @@ func (d *Daemon) doStreamHandler(req *pb.Request) *pb.Response { | |
return okResponse() | ||
} | ||
|
||
func (d *Daemon) doListPeers(req *pb.Request) *pb.Response { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
conns := d.host.Network().Conns() | ||
peers := make([]*pb.PeerInfo, len(conns)) | ||
for x, conn := range conns { | ||
peers[x] = &pb.PeerInfo{ | ||
Id: []byte(conn.RemotePeer()), | ||
Addrs: [][]byte{conn.RemoteMultiaddr().Bytes()}, | ||
} | ||
} | ||
|
||
res := okResponse() | ||
res.Peers = peers | ||
return res | ||
} | ||
|
||
func okResponse() *pb.Response { | ||
return &pb.Response{ | ||
Type: pb.Response_OK.Enum(), | ||
|
@@ -203,6 +253,13 @@ func errorResponse(err error) *pb.Response { | |
} | ||
} | ||
|
||
func errorResponseString(err string) *pb.Response { | ||
return &pb.Response{ | ||
Type: pb.Response_ERROR.Enum(), | ||
Error: &pb.ErrorResponse{Msg: &err}, | ||
} | ||
} | ||
|
||
func makeStreamInfo(s inet.Stream) *pb.StreamInfo { | ||
proto := string(s.Protocol()) | ||
return &pb.StreamInfo{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the application wanted a DHT separate from ours? I'm sure that'll be the case of Ethereum. The Ethereum Foundation runs their own bootstrap nodes per network:
https://github.com/ethereum/go-ethereum/blob/461291882edce0ac4a28f64c4e8725b7f57cbeae/params/bootnodes.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a related note, what if we had more than one app behind this daemon, and each wanted a different DHT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we should be able to support different bootstrap peers.
I think what we want is a configuration file of sorts for the daemon profile, and there we can list bootstrap peers there.
But I left that for a subsequent pr (we can open follow up issue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I'd like to see an application "opening a session" with a daemon, and providing any initial seed configuration as part of that handshake.
I think of the daemon as a service provided by the environment/OS for applications to use, versus a sidecar for applications. As such, multi-tenancy and app-specific configuration should be part of the protocol.
For now, it's good that it's on our radar. An issue would be great, and perhaps a note in the specs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can borrow the
Options
style configuration fromgo-libp2p
and have a default bootstrap configuration with our peersThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's not a bad idea at all.
Upon further thought I'll add a
-bootstrapPeers
flag so that we can pass a comma separated list of peers in the command line, so that we can punt on the configuration.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a command-line flag in bd604c4