Skip to content

Commit

Permalink
Merge pull request #728 from libp2p/fix/416
Browse files Browse the repository at this point in the history
feat: delete GetValues
  • Loading branch information
Stebalien authored Jul 22, 2021
2 parents 3cf54bb + 4626397 commit 65773b1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 79 deletions.
48 changes: 0 additions & 48 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,54 +470,6 @@ func TestSearchValue(t *testing.T) {
}
}

func TestGetValues(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)

defer dhtA.Close()
defer dhtB.Close()
defer dhtA.host.Close()
defer dhtB.host.Close()

connect(t, ctx, dhtA, dhtB)

ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

err := dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
if err != nil {
t.Error(err)
}

err = dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
if err != nil {
t.Error(err)
}

ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
defer cancel()
vals, err := dhtA.GetValues(ctxT, "/v/hello", 16)
if err != nil {
t.Fatal(err)
}

if len(vals) != 2 {
t.Fatalf("expected to get 2 values, got %d", len(vals))
}

sort.Slice(vals, func(i, j int) bool { return string(vals[i].Val) < string(vals[j].Val) })

if string(vals[0].Val) != "valid" {
t.Errorf("unexpected vals[0]: %s", string(vals[0].Val))
}
if string(vals[1].Val) != "valid" {
t.Errorf("unexpected vals[1]: %s", string(vals[1].Val))
}
}

func TestValueGetInvalid(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
41 changes: 10 additions & 31 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
return nil
}

// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
// recvdVal stores a value and the peer from which we got the value.
type recvdVal struct {
Val []byte
From peer.ID
}
Expand Down Expand Up @@ -182,11 +182,11 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
return out, nil
}

func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan recvdVal, stopCh chan struct{},
out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
numResponses := 0
return dht.processValues(ctx, key, valCh,
func(ctx context.Context, v RecvdVal, better bool) bool {
func(ctx context.Context, v recvdVal, better bool) bool {
numResponses++
if better {
select {
Expand All @@ -204,29 +204,8 @@ func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-c
})
}

// GetValues gets nvals values corresponding to the given key.
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
if !dht.enableValues {
return nil, routing.ErrNotSupported
}

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
valCh, _ := dht.getValues(queryCtx, key, nil)

out := make([]RecvdVal, 0, nvals)
for val := range valCh {
out = append(out, val)
if len(out) == nvals {
cancel()
}
}

return out, ctx.Err()
}

func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan recvdVal,
newVal func(ctx context.Context, v recvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
loop:
for {
if aborted {
Expand Down Expand Up @@ -290,15 +269,15 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte
}
}

func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
valCh := make(chan RecvdVal, 1)
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan recvdVal, <-chan *lookupWithFollowupResult) {
valCh := make(chan recvdVal, 1)
lookupResCh := make(chan *lookupWithFollowupResult, 1)

logger.Debugw("finding value", "key", internal.LoggableRecordKeyString(key))

if rec, err := dht.getLocal(key); rec != nil && err == nil {
select {
case valCh <- RecvdVal{
case valCh <- recvdVal{
Val: rec.GetValue(),
From: dht.self,
}:
Expand Down Expand Up @@ -337,7 +316,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
// TODO: What should happen if the record is invalid?
// Pre-existing code counted it towards the quorum, but should it?
if rec != nil && rec.GetValue() != nil {
rv := RecvdVal{
rv := recvdVal{
Val: rec.GetValue(),
From: p,
}
Expand Down

0 comments on commit 65773b1

Please sign in to comment.