Skip to content

Commit

Permalink
Merge pull request #5404 from ipfs/feat/namestream
Browse files Browse the repository at this point in the history
ipfs name resolve --stream
  • Loading branch information
Stebalien authored Oct 18, 2018
2 parents 084c7d1 + 6207222 commit 7de7928
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 178 deletions.
26 changes: 24 additions & 2 deletions core/commands/name/ipns.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
nocacheOptionName = "nocache"
dhtRecordCountOptionName = "dht-record-count"
dhtTimeoutOptionName = "dht-timeout"
streamOptionName = "stream"
)

var IpnsCmd = &cmds.Command{
Expand Down Expand Up @@ -77,6 +78,7 @@ Resolve the value of a dnslink:
cmdkit.BoolOption(nocacheOptionName, "n", "Do not use cached entries."),
cmdkit.UintOption(dhtRecordCountOptionName, "dhtrc", "Number of records to request for DHT resolution."),
cmdkit.StringOption(dhtTimeoutOptionName, "dhtt", "Max time to collect values during DHT resolution eg \"30s\". Pass 0 for no timeout."),
cmdkit.BoolOption(streamOptionName, "s", "Stream entries as they are found."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env)
Expand All @@ -101,6 +103,7 @@ Resolve the value of a dnslink:
recursive, _ := req.Options[recursiveOptionName].(bool)
rc, rcok := req.Options[dhtRecordCountOptionName].(int)
dhtt, dhttok := req.Options[dhtTimeoutOptionName].(string)
stream, _ := req.Options[streamOptionName].(bool)

opts := []options.NameResolveOption{
options.Name.Local(local),
Expand Down Expand Up @@ -128,12 +131,31 @@ Resolve the value of a dnslink:
name = "/ipns/" + name
}

output, err := api.Name().Resolve(req.Context, name, opts...)
if !stream {
output, err := api.Name().Resolve(req.Context, name, opts...)
if err != nil {
return err
}

return cmds.EmitOnce(res, &ResolvedPath{path.FromString(output.String())})
}

output, err := api.Name().Search(req.Context, name, opts...)
if err != nil {
return err
}

return cmds.EmitOnce(res, &ResolvedPath{path.FromString(output.String())})
for v := range output {
if v.Err != nil {
return err
}
if err := res.Emit(&ResolvedPath{path.FromString(v.Path.String())}); err != nil {
return err
}

}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Expand Down
3 changes: 2 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,8 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
n.RecordValidator,
)
n.Routing = rhelpers.Tiered{
// Always check pubsub first.
Routers: []routing.IpfsRouting{
// Always check pubsub first.
&rhelpers.Compose{
ValueStore: &rhelpers.LimitedValueStore{
ValueStore: n.PSRouter,
Expand All @@ -533,6 +533,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
},
n.Routing,
},
Validator: n.RecordValidator,
}
}

Expand Down
15 changes: 15 additions & 0 deletions core/coreapi/interface/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package iface

import (
"context"
"errors"

options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
)

var ErrResolveFailed = errors.New("could not resolve name")

// IpnsEntry specifies the interface to IpnsEntries
type IpnsEntry interface {
// Name returns IpnsEntry name
Expand All @@ -14,6 +17,11 @@ type IpnsEntry interface {
Value() Path
}

type IpnsResult struct {
Path
Err error
}

// NameAPI specifies the interface to IPNS.
//
// IPNS is a PKI namespace, where names are the hashes of public keys, and the
Expand All @@ -28,4 +36,11 @@ type NameAPI interface {

// Resolve attempts to resolve the newest version of the specified name
Resolve(ctx context.Context, name string, opts ...options.NameResolveOption) (Path, error)

// Search is a version of Resolve which outputs paths as they are discovered,
// reducing the time to first entry
//
// Note: by default, all paths read from the channel are considered unsafe,
// except the latest (last path in channel read buffer).
Search(ctx context.Context, name string, opts ...options.NameResolveOption) (<-chan IpnsResult, error)
}
40 changes: 34 additions & 6 deletions core/coreapi/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/ipfs/go-ipfs/keystore"
"github.com/ipfs/go-ipfs/namesys"
ipath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"

"gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
"gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/offline"
"gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
ipath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"
)

type NameAPI CoreAPI
Expand Down Expand Up @@ -89,9 +89,7 @@ func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopt
}, nil
}

// Resolve attempts to resolve the newest version of the specified name and
// returns its path.
func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (coreiface.Path, error) {
func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.NameResolveOption) (<-chan coreiface.IpnsResult, error) {
options, err := caopts.NameResolveOptions(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -125,12 +123,42 @@ func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.Nam
name = "/ipns/" + name
}

output, err := resolver.Resolve(ctx, name, options.ResolveOpts...)
out := make(chan coreiface.IpnsResult)
go func() {
defer close(out)
for res := range resolver.ResolveAsync(ctx, name, options.ResolveOpts...) {
p, _ := coreiface.ParsePath(res.Path.String())

select {
case out <- coreiface.IpnsResult{Path: p, Err: res.Err}:
case <-ctx.Done():
return
}
}
}()

return out, nil
}

// Resolve attempts to resolve the newest version of the specified name and
// returns its path.
func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (coreiface.Path, error) {
results, err := api.Search(ctx, name, opts...)
if err != nil {
return nil, err
}

return coreiface.ParsePath(output.String())
err = coreiface.ErrResolveFailed
var p coreiface.Path

for res := range results {
p, err = res.Path, res.Err
if err != nil {
break
}
}

return p, err
}

func keylookup(n *core.IpfsNode, k string) (crypto.PrivKey, error) {
Expand Down
10 changes: 9 additions & 1 deletion core/corehttp/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type mockNamesys map[string]path.Path
func (m mockNamesys) Resolve(ctx context.Context, name string, opts ...nsopts.ResolveOpt) (value path.Path, err error) {
cfg := nsopts.DefaultResolveOpts()
for _, o := range opts {
o(cfg)
o(&cfg)
}
depth := cfg.Depth
if depth == nsopts.UnlimitedDepth {
Expand All @@ -57,6 +57,14 @@ func (m mockNamesys) Resolve(ctx context.Context, name string, opts ...nsopts.Re
return value, nil
}

func (m mockNamesys) ResolveAsync(ctx context.Context, name string, opts ...nsopts.ResolveOpt) <-chan namesys.Result {
out := make(chan namesys.Result, 1)
v, err := m.Resolve(ctx, name, opts...)
out <- namesys.Result{Path: v, Err: err}
close(out)
return out
}

func (m mockNamesys) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error {
return errors.New("not implemented for mockNamesys")
}
Expand Down
127 changes: 96 additions & 31 deletions namesys/base.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,121 @@
package namesys

import (
"context"
"strings"
"time"

context "context"

opts "github.com/ipfs/go-ipfs/namesys/opts"

path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"
)

type onceResult struct {
value path.Path
ttl time.Duration
err error
}

type resolver interface {
// resolveOnce looks up a name once (without recursion).
resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (value path.Path, ttl time.Duration, err error)
resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult
}

// resolve is a helper for implementing Resolver.ResolveN using resolveOnce.
func resolve(ctx context.Context, r resolver, name string, options *opts.ResolveOpts, prefixes ...string) (path.Path, error) {
depth := options.Depth
for {
p, _, err := r.resolveOnce(ctx, name, options)
func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts) (path.Path, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := ErrResolveFailed
var p path.Path

resCh := resolveAsync(ctx, r, name, options)

for res := range resCh {
p, err = res.Path, res.Err
if err != nil {
return "", err
break
}
log.Debugf("resolved %s to %s", name, p.String())
}

if strings.HasPrefix(p.String(), "/ipfs/") {
// we've bottomed out with an IPFS path
return p, nil
}
return p, err
}

if depth == 1 {
return p, ErrResolveRecursion
}
func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts) <-chan Result {
resCh := r.resolveOnceAsync(ctx, name, options)
depth := options.Depth
outCh := make(chan Result, 1)

matched := false
for _, prefix := range prefixes {
if strings.HasPrefix(p.String(), prefix) {
matched = true
if len(prefixes) == 1 {
name = strings.TrimPrefix(p.String(), prefix)
}
break
go func() {
defer close(outCh)
var subCh <-chan Result
var cancelSub context.CancelFunc
defer func() {
if cancelSub != nil {
cancelSub()
}
}
}()

if !matched {
return p, nil
}
for {
select {
case res, ok := <-resCh:
if !ok {
resCh = nil
break
}

if res.err != nil {
emitResult(ctx, outCh, Result{Err: res.err})
return
}
log.Debugf("resolved %s to %s", name, res.value.String())
if !strings.HasPrefix(res.value.String(), ipnsPrefix) {
emitResult(ctx, outCh, Result{Path: res.value})
break
}

if depth == 1 {
emitResult(ctx, outCh, Result{Path: res.value, Err: ErrResolveRecursion})
break
}

subopts := options
if subopts.Depth > 1 {
subopts.Depth--
}

if depth > 1 {
depth--
var subCtx context.Context
if cancelSub != nil {
// Cancel previous recursive resolve since it won't be used anyways
cancelSub()
}
subCtx, cancelSub = context.WithCancel(ctx)
_ = cancelSub

p := strings.TrimPrefix(res.value.String(), ipnsPrefix)
subCh = resolveAsync(subCtx, r, p, subopts)
case res, ok := <-subCh:
if !ok {
subCh = nil
break
}

// We don't bother returning here in case of context timeout as there is
// no good reason to do that, and we may still be able to emit a result
emitResult(ctx, outCh, res)
case <-ctx.Done():
return
}
if resCh == nil && subCh == nil {
return
}
}
}()
return outCh
}

func emitResult(ctx context.Context, outCh chan<- Result, r Result) {
select {
case outCh <- r:
case <-ctx.Done():
}
}
Loading

0 comments on commit 7de7928

Please sign in to comment.