Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract DHT message sender from the DHT #659

Merged
merged 15 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 17 additions & 91 deletions dht.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package dht

import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/rand"
Expand All @@ -17,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"

"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
Expand All @@ -33,7 +32,6 @@ import (
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/multiformats/go-base32"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"go.opencensus.io/tag"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -97,8 +95,8 @@ type IpfsDHT struct {
ctx context.Context
proc goprocess.Process

strmap map[peer.ID]*messageSender
smlk sync.Mutex
protoMessenger *pb.ProtocolMessenger
messageMgr *messageManager

plk sync.Mutex

Expand Down Expand Up @@ -190,6 +188,15 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
dht.disableFixLowPeers = cfg.disableFixLowPeers

dht.Validator = cfg.validator
dht.messageMgr = &messageManager{
host: h,
strmap: make(map[peer.ID]*messageSender),
protocols: dht.protocols,
}
dht.protoMessenger, err = pb.NewProtocolMessenger(dht.messageMgr, pb.WithValidator(dht.Validator))
if err != nil {
return nil, err
}

dht.testAddressUpdateProcessing = cfg.testAddressUpdateProcessing

Expand Down Expand Up @@ -276,7 +283,6 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
selfKey: kb.ConvertPeerID(h.ID()),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
protocols: protocols,
protocolsStrs: protocol.ConvertToStrings(protocols),
Expand Down Expand Up @@ -530,80 +536,19 @@ func (dht *IpfsDHT) persistRTPeersInPeerStore() {
}
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
logger.Debugw("failed to put value to peer", "to", p, "key", loggableRecordKeyBytes(rec.Key), "error", err)
return err
}

if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
logger.Infow("value not put correctly", "put-message", pmes, "get-message", rpmes)
return errors.New("value not put correctly")
}

return nil
}

var errInvalidRecord = errors.New("received invalid record")

// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
}

// Perhaps we were given closer peers
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())

if rec := pmes.GetRecord(); rec != nil {
// Success! We were given the value
logger.Debug("got value")

// make sure record is valid.
err = dht.Validator.Validate(string(rec.GetKey()), rec.GetValue())
if err != nil {
logger.Debug("received invalid record (discarded)")
// return a sentinal to signify an invalid record was received
err = errInvalidRecord
rec = new(recpb.Record)
}
return rec, peers, err
}

if len(peers) > 0 {
return nil, peers, nil
}

return nil, nil, routing.ErrNotFound
}

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
return dht.sendRequest(ctx, p, pmes)
}

// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
logger.Debugw("finding value in datastore", "key", loggableRecordKeyString(key))
logger.Debugw("finding value in datastore", "key", internal.LoggableRecordKeyString(key))

rec, err := dht.getRecordFromDatastore(mkDsKey(key))
if err != nil {
logger.Warnw("get local failed", "key", loggableRecordKeyString(key), "error", err)
logger.Warnw("get local failed", "key", internal.LoggableRecordKeyString(key), "error", err)
return nil, err
}

// Double check the key. Can't hurt.
if rec != nil && string(rec.GetKey()) != key {
logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", loggableRecordKeyString(key), "got", rec.GetKey())
logger.Errorw("BUG: found a DHT record that didn't match it's key", "expected", internal.LoggableRecordKeyString(key), "got", rec.GetKey())
return nil, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.. seems like it's an error not a nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably... I mean realistically it wouldn't even be crazy to panic here since if this occurs then there was a programming error somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this shouldn't happen, returning nil, nil when there is a validation error does fulfill the function contract just like it does in dht.getRecordFromDatastore. I've copied the comment explaining that from dht.getRecordFromDatastore to here.


}
Expand All @@ -614,7 +559,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
logger.Warnw("failed to put marshal record for local put", "error", err, "key", loggableRecordKeyString(key))
logger.Warnw("failed to put marshal record for local put", "error", err, "key", internal.LoggableRecordKeyString(key))
return err
}

Expand Down Expand Up @@ -719,17 +664,6 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
}
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
return dht.sendRequest(ctx, p, pmes)
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
Expand Down Expand Up @@ -870,15 +804,7 @@ func (dht *IpfsDHT) Host() host.Host {

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
req := pb.NewMessage(pb.Message_PING, nil, 0)
resp, err := dht.sendRequest(ctx, p, req)
if err != nil {
return fmt.Errorf("sending request: %w", err)
}
if resp.Type != pb.Message_PING {
return fmt.Errorf("got unexpected response type: %v", resp.Type)
}
return nil
return dht.protoMessenger.Ping(ctx, p)
}

// newContextWithLocalTags returns a new context.Context with the InstanceID and
Expand Down
Loading