Skip to content

Commit

Permalink
Merge pull request ipfs#148 from libp2p/feat/routing-refactor
Browse files Browse the repository at this point in the history
update for the routing refactor
  • Loading branch information
Stebalien authored Jun 2, 2018
2 parents f9f7b87 + 093ebc5 commit 654c41b
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 111 deletions.
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.0.4: QmSBxn1eLMdViZRDGW9rRHRYwtqq5bqUgipqTMPuTim616
4.1.0: Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd
66 changes: 39 additions & 27 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"sync"
"time"

opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
routing "github.com/libp2p/go-libp2p-routing"

proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
Expand All @@ -28,6 +28,7 @@ import (
protocol "github.com/libp2p/go-libp2p-protocol"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
routing "github.com/libp2p/go-libp2p-routing"
base32 "github.com/whyrusleeping/base32"
)

Expand All @@ -54,8 +55,7 @@ type IpfsDHT struct {

birth time.Time // When this peer started up

Validator record.Validator // record validator funcs
Selector record.Selector // record selection funcs
Validator record.Validator

ctx context.Context
proc goprocess.Process
Expand All @@ -66,23 +66,13 @@ type IpfsDHT struct {
plk sync.Mutex
}

// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := NewDHTClient(ctx, h, dstore)

h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
h.SetStreamHandler(ProtocolDHTOld, dht.handleNewStream)

return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := makeDHT(ctx, h, dstore)
// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
Expand All @@ -94,10 +84,35 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
})

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

dht.Validator["pk"] = record.PublicKeyValidator
dht.Selector["pk"] = record.PublicKeySelector
if !cfg.Client {
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
h.SetStreamHandler(ProtocolDHTOld, dht.handleNewStream)
}
return dht, nil
}

// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, opts.Datastore(dstore))
if err != nil {
panic(err)
}
return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true))
if err != nil {
panic(err)
}
return dht
}

Expand All @@ -122,9 +137,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,

Validator: make(record.Validator),
Selector: make(record.Selector),
}
}

Expand Down Expand Up @@ -176,7 +188,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
log.Debug("getValueOrPeers: got value")

// make sure record is valid.
err = dht.Validator.VerifyRecord(record)
err = dht.Validator.Validate(record.GetKey(), record.GetValue())
if err != nil {
log.Info("Received invalid record! (discarded)")
// return a sentinal to signify an invalid record was received
Expand Down Expand Up @@ -239,7 +251,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
return nil, err
}

err = dht.Validator.VerifyRecord(rec)
err = dht.Validator.Validate(rec.GetKey(), rec.GetValue())
if err != nil {
log.Debugf("local record verify failed: %s (discarded)", err)
return nil, err
Expand Down
88 changes: 41 additions & 47 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import (
"testing"
"time"

opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"

cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
u "github.com/ipfs/go-ipfs-util"
kb "github.com/libp2p/go-libp2p-kbucket"
netutil "github.com/libp2p/go-libp2p-netutil"
Expand All @@ -41,19 +40,47 @@ func init() {
}
}

func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
h := bhost.New(netutil.GenSwarmNetwork(t, ctx))
type blankValidator struct{}

func (blankValidator) Validate(_ string, _ []byte) error { return nil }
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

dss := dssync.MutexWrap(ds.NewMapDatastore())
var d *IpfsDHT
if client {
d = NewDHTClient(ctx, h, dss)
} else {
d = NewDHT(ctx, h, dss)
type testValidator struct{}

func (testValidator) Select(_ string, bs [][]byte) (int, error) {
index := -1
for i, b := range bs {
if bytes.Compare(b, []byte("newer")) == 0 {
index = i
} else if bytes.Compare(b, []byte("valid")) == 0 {
if index == -1 {
index = i
}
}
}
if index == -1 {
return -1, errors.New("no rec found")
}
return index, nil
}
func (testValidator) Validate(_ string, b []byte) error {
if bytes.Compare(b, []byte("expired")) == 0 {
return errors.New("expired")
}
return nil
}

d.Validator["v"] = func(*record.ValidationRecord) error { return nil }
d.Selector["v"] = func(_ string, bs [][]byte) (int, error) { return 0, nil }
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
d, err := New(
ctx,
bhost.New(netutil.GenSwarmNetwork(t, ctx)),
opts.Client(client),
opts.NamespacedValidator("v", blankValidator{}),
)

if err != nil {
t.Fatal(err)
}
return d
}

Expand Down Expand Up @@ -148,14 +175,6 @@ func TestValueGetSet(t *testing.T) {
defer dhtA.host.Close()
defer dhtB.host.Close()

vf := func(*record.ValidationRecord) error { return nil }
nulsel := func(_ string, bs [][]byte) (int, error) { return 0, nil }

dhtA.Validator["v"] = vf
dhtB.Validator["v"] = vf
dhtA.Selector["v"] = nulsel
dhtB.Selector["v"] = nulsel

connect(t, ctx, dhtA, dhtB)

log.Debug("adding value on: ", dhtA.self)
Expand Down Expand Up @@ -203,33 +222,8 @@ func TestValueSetInvalid(t *testing.T) {
defer dhtA.host.Close()
defer dhtB.host.Close()

vf := func(r *record.ValidationRecord) error {
if bytes.Compare(r.Value, []byte("expired")) == 0 {
return errors.New("expired")
}
return nil
}
nulsel := func(k string, bs [][]byte) (int, error) {
index := -1
for i, b := range bs {
if bytes.Compare(b, []byte("newer")) == 0 {
index = i
} else if bytes.Compare(b, []byte("valid")) == 0 {
if index == -1 {
index = i
}
}
}
if index == -1 {
return -1, errors.New("no rec found")
}
return index, nil
}

dhtA.Validator["v"] = vf
dhtB.Validator["v"] = vf
dhtA.Selector["v"] = nulsel
dhtB.Selector["v"] = nulsel
dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}

connect(t, ctx, dhtA, dhtB)

Expand Down
26 changes: 16 additions & 10 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"time"

ggio "github.com/gogo/protobuf/io"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p-net"
Expand All @@ -31,8 +29,10 @@ func TestGetFailures(t *testing.T) {
}
hosts := mn.Hosts()

tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d, err := New(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())

// Reply with failures to every message
Expand Down Expand Up @@ -148,8 +148,10 @@ func TestNotFound(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d, err := New(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}

for _, p := range hosts {
d.Update(ctx, p.ID())
Expand Down Expand Up @@ -225,8 +227,10 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()

tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d, err := New(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}

for i := 1; i < 5; i++ {
d.Update(ctx, hosts[i].ID())
Expand Down Expand Up @@ -292,8 +296,10 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d, err := New(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}

d.Update(ctx, hosts[1].ID())

Expand Down
15 changes: 10 additions & 5 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,26 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
eip.Done()
}()

dskey := convertToDsKey(pmes.GetKey())

rec := pmes.GetRecord()
if rec == nil {
log.Infof("Got nil record from: %s", p.Pretty())
return nil, errors.New("nil record")
}

if pmes.GetKey() != rec.GetKey() {
return nil, errors.New("put key doesn't match record key")
}

cleanRecord(rec)

// Make sure the record is valid (not expired, valid signature etc)
if err = dht.Validator.VerifyRecord(rec); err != nil {
if err = dht.Validator.Validate(rec.GetKey(), rec.GetValue()); err != nil {
log.Warningf("Bad dht record in PUT from: %s. %s", p.Pretty(), err)
return nil, err
}

dskey := convertToDsKey(rec.GetKey())

// Make sure the new record is "better" than the record we have locally.
// This prevents a record with for example a lower sequence number from
// overwriting a record with a higher sequence number.
Expand All @@ -188,7 +193,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess

if existing != nil {
recs := [][]byte{rec.GetValue(), existing.GetValue()}
i, err := dht.Selector.BestRecord(pmes.GetKey(), recs)
i, err := dht.Validator.Select(rec.GetKey(), recs)
if err != nil {
log.Warningf("Bad dht record in PUT from %s: %s", p.Pretty(), err)
return nil, err
Expand Down Expand Up @@ -237,7 +242,7 @@ func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error)
return nil, nil
}

err = dht.Validator.VerifyRecord(rec)
err = dht.Validator.Validate(rec.GetKey(), rec.GetValue())
if err != nil {
// Invalid record in datastore, probably expired but don't return an error,
// we'll just overwrite it
Expand Down
Loading

0 comments on commit 654c41b

Please sign in to comment.