Skip to content

Commit

Permalink
Merge pull request #6 from libp2p/feat/search-value
Browse files Browse the repository at this point in the history
Implement SearchValue
  • Loading branch information
magik6k authored Sep 24, 2018
2 parents 61a4193 + 1721b52 commit b441a90
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 4 deletions.
10 changes: 10 additions & 0 deletions composed.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func (cr *Compose) GetValue(ctx context.Context, key string, opts ...ropts.Optio
return cr.ValueStore.GetValue(ctx, key, opts...)
}

// SearchValue searches for the value corresponding to given Key.
func (cr *Compose) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
if cr.ValueStore == nil {
out := make(chan []byte)
close(out)
return out, nil
}
return cr.ValueStore.SearchValue(ctx, key, opts...)
}

// Provide adds the given cid to the content routing system. If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
Expand Down
19 changes: 19 additions & 0 deletions dummy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ func (d *dummyValueStore) GetValue(ctx context.Context, key string, opts ...ropt
return nil, routing.ErrNotFound
}

func (d *dummyValueStore) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
out := make(chan []byte)
if strings.HasPrefix(key, "/error/") {
return nil, errors.New(key[len("/error/"):])
}

go func() {
defer close(out)
v, err := d.GetValue(ctx, key, opts...)
if err == nil {
select {
case out <- v:
case <-ctx.Done():
}
}
}()
return out, nil
}

type dummyProvider map[string][]peer.ID

func (d dummyProvider) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan pstore.PeerInfo {
Expand Down
13 changes: 12 additions & 1 deletion limited.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,25 @@ func (lvs *LimitedValueStore) KeySupported(key string) bool {
return false
}

// GetValue returns ErrNotSupported
// GetValue returns routing.ErrNotFound if key isn't supported
func (lvs *LimitedValueStore) GetValue(ctx context.Context, key string, opts ...ropts.Option) ([]byte, error) {
if !lvs.KeySupported(key) {
return nil, routing.ErrNotFound
}
return lvs.ValueStore.GetValue(ctx, key, opts...)
}

// SearchValue returns empty channel if key isn't supported or calls SearchValue
// on the underlying ValueStore
func (lvs *LimitedValueStore) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
if !lvs.KeySupported(key) {
out := make(chan []byte)
close(out)
return out, nil
}
return lvs.ValueStore.SearchValue(ctx, key, opts...)
}

func (lvs *LimitedValueStore) Bootstrap(ctx context.Context) error {
if bs, ok := lvs.ValueStore.(Bootstrap); ok {
return bs.Bootstrap(ctx)
Expand Down
5 changes: 5 additions & 0 deletions null.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func (nr Null) GetValue(context.Context, string, ...ropts.Option) ([]byte, error
return nil, routing.ErrNotFound
}

// SearchValue always returns ErrNotFound
func (nr Null) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
return nil, routing.ErrNotFound
}

// Provide always returns ErrNotSupported
func (nr Null) Provide(context.Context, cid.Cid, bool) error {
return routing.ErrNotSupported
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
"version": "2.0.0"
},
{
"hash": "QmdKS5YtmuSWKuLLgbHG176mS3VX3AKiyVmaaiAfvgcuch",
"hash": "QmaJ6QDUh7JKPPToaUZ4EtUsPBdrbSAG6zMTkzfEvZgz2j",
"name": "go-libp2p-routing",
"version": "2.5.0"
"version": "2.6.0"
},
{
"author": "hashicorp",
Expand Down
58 changes: 57 additions & 1 deletion parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,55 @@ func (r Parallel) put(do func(routing.IpfsRouting) error) error {
}
}

func (r Parallel) search(ctx context.Context, do func(routing.IpfsRouting) (<-chan []byte, error)) (<-chan []byte, error) {
switch len(r) {
case 0:
return nil, routing.ErrNotFound
case 1:
return do(r[0])
}

out := make(chan []byte)
var errs []error
var wg sync.WaitGroup

for _, ri := range r {
vchan, err := do(ri)
switch err {
case nil:
case routing.ErrNotFound, routing.ErrNotSupported:
continue
default:
errs = append(errs, err)
}
wg.Add(1)

go func() {
defer wg.Done()
for {
select {
case v := <-vchan:
//TODO: run validator.Select here
select {
case out <- v:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}

go func() {
wg.Wait()
close(out)
}()

return out, nil
}

func (r Parallel) get(ctx context.Context, do func(routing.IpfsRouting) (interface{}, error)) (interface{}, error) {
switch len(r) {
case 0:
Expand Down Expand Up @@ -180,7 +229,7 @@ func (r Parallel) get(ctx context.Context, do func(routing.IpfsRouting) (interfa
}

var errs []error
for _ = range r {
for range r {
select {
case res := <-results:
switch res.err {
Expand Down Expand Up @@ -230,6 +279,13 @@ func (r Parallel) GetValue(ctx context.Context, key string, opts ...ropts.Option
return val, err
}

func (r Parallel) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
resCh, err := r.forKey(key).search(ctx, func(ri routing.IpfsRouting) (<-chan []byte, error) {
return ri.SearchValue(ctx, key, opts...)
})
return resCh, err
}

func (r Parallel) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
vInt, err := r.
forKey(routing.KeyForPublicKey(p)).
Expand Down
4 changes: 4 additions & 0 deletions tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (r Tiered) GetValue(ctx context.Context, key string, opts ...ropts.Option)
return val, err
}

func (r Tiered) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
return Parallel(r).SearchValue(ctx, key, opts...)
}

func (r Tiered) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
vInt, err := r.get(ctx, func(ri routing.IpfsRouting) (interface{}, error) {
return routing.GetPublicKey(ri, ctx, p)
Expand Down

0 comments on commit b441a90

Please sign in to comment.