Skip to content

Commit

Permalink
Implement SearchValue
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Aug 31, 2018
1 parent 73aab7f commit d73cd12
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 3 deletions.
8 changes: 8 additions & 0 deletions composed.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ func (cr *Compose) GetValue(ctx context.Context, key string, opts ...ropts.Optio
return cr.ValueStore.GetValue(ctx, key, opts...)
}

// SearchValue searches for the value corresponding to given Key.
func (cr *Compose) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
if cr.ValueStore == nil {
return nil, routing.ErrNotFound
}
return cr.ValueStore.SearchValue(ctx, key, opts...)
}

// Provide adds the given cid to the content routing system. If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
Expand Down
20 changes: 20 additions & 0 deletions dummy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@ func (d *dummyValueStore) GetValue(ctx context.Context, key string, opts ...ropt
return nil, routing.ErrNotFound
}

func (d *dummyValueStore) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
out := make(chan []byte)
if strings.HasPrefix(key, "/error/") {
return nil, errors.New(key[len("/error/"):])
}

go func() {
defer close(out)
v, err := d.GetValue(ctx, key, opts...)
if err == nil {
select {
case out <- v:
case <-ctx.Done():
}
}
}()
return out, nil
}


type dummyProvider map[string][]peer.ID

func (d dummyProvider) FindProvidersAsync(ctx context.Context, c *cid.Cid, count int) <-chan pstore.PeerInfo {
Expand Down
8 changes: 8 additions & 0 deletions limited.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ func (lvs *LimitedValueStore) GetValue(ctx context.Context, key string, opts ...
return lvs.ValueStore.GetValue(ctx, key, opts...)
}

// GetValue returns ErrNotSupported
func (lvs *LimitedValueStore) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
if !lvs.KeySupported(key) {
return nil, routing.ErrNotFound
}
return lvs.ValueStore.SearchValue(ctx, key, opts...)
}

func (lvs *LimitedValueStore) Bootstrap(ctx context.Context) error {
if bs, ok := lvs.ValueStore.(Bootstrap); ok {
return bs.Bootstrap(ctx)
Expand Down
5 changes: 5 additions & 0 deletions null.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func (nr Null) GetValue(context.Context, string, ...ropts.Option) ([]byte, error
return nil, routing.ErrNotFound
}

// SearchValue always returns ErrNotFound
func (nr Null) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
return nil, routing.ErrNotFound
}

// Provide always returns ErrNotSupported
func (nr Null) Provide(context.Context, *cid.Cid, bool) error {
return routing.ErrNotSupported
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
"version": "1.4.24"
},
{
"hash": "QmS4niovD1U6pRjUBXivr1zvvLBqiTKbERjFo994JU7oQS",
"hash": "QmNMiBu1AaojCGVqPp1pibwhHq4zTuvgrHXEi2dH2AgEuo",
"name": "go-libp2p-routing",
"version": "2.4.9"
"version": "2.5.0"
},
{
"author": "hashicorp",
Expand Down
52 changes: 51 additions & 1 deletion parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,49 @@ func (r Parallel) put(do func(routing.IpfsRouting) error) error {
}
}

func (r Parallel) search(ctx context.Context, do func(routing.IpfsRouting) (<-chan []byte, error)) (<-chan []byte, error) {
switch len(r) {
case 0:
return nil, routing.ErrNotFound
case 1:
return do(r[0])
}

rchan := make(chan []byte)
var errs []error
var wg sync.WaitGroup

for _, ri := range r {
vchan, err := do(ri)
switch err {
case nil:
case routing.ErrNotFound, routing.ErrNotSupported:
continue
default:
errs = append(errs, err)
}
wg.Add(1)

go func() {
defer wg.Done()
for {
select {
case rchan <- <- vchan:
case <-ctx.Done():
return
}
}
}()
}

go func() {
wg.Wait()
close(rchan)
}()

return rchan, nil
}

func (r Parallel) get(ctx context.Context, do func(routing.IpfsRouting) (interface{}, error)) (interface{}, error) {
switch len(r) {
case 0:
Expand Down Expand Up @@ -180,7 +223,7 @@ func (r Parallel) get(ctx context.Context, do func(routing.IpfsRouting) (interfa
}

var errs []error
for _ = range r {
for range r {
select {
case res := <-results:
switch res.err {
Expand Down Expand Up @@ -230,6 +273,13 @@ func (r Parallel) GetValue(ctx context.Context, key string, opts ...ropts.Option
return val, err
}

func (r Parallel) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
resCh, err := r.forKey(key).search(ctx, func(ri routing.IpfsRouting) (<-chan []byte, error) {
return ri.SearchValue(ctx, key, opts...)
})
return resCh, err
}

func (r Parallel) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
vInt, err := r.
forKey(routing.KeyForPublicKey(p)).
Expand Down
4 changes: 4 additions & 0 deletions tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (r Tiered) GetValue(ctx context.Context, key string, opts ...ropts.Option)
return val, err
}

func (r Tiered) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
return Parallel(r).SearchValue(ctx, key, opts...)
}

func (r Tiered) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
vInt, err := r.get(ctx, func(ri routing.IpfsRouting) (interface{}, error) {
return routing.GetPublicKey(ri, ctx, p)
Expand Down

0 comments on commit d73cd12

Please sign in to comment.