From 7ff9f09b07a493fdcadaba6f6954519514e04fa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 28 Aug 2018 00:44:51 +0200 Subject: [PATCH 01/12] namesys: Implement async methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/commands/name/ipns.go | 2 + namesys/base.go | 95 ++++++++++++++++++++++---- namesys/dns.go | 57 +++++++++++++++- namesys/interface.go | 11 +++ namesys/namesys.go | 88 +++++++++++++++++++++++- namesys/opts/opts.go | 8 +-- namesys/proquint.go | 16 ++++- namesys/routing.go | 135 ++++++++++++++++++++++++++++++++++++- 8 files changed, 390 insertions(+), 22 deletions(-) diff --git a/core/commands/name/ipns.go b/core/commands/name/ipns.go index ff78948e7bb..d1260831503 100644 --- a/core/commands/name/ipns.go +++ b/core/commands/name/ipns.go @@ -30,6 +30,7 @@ const ( nocacheOptionName = "nocache" dhtRecordCountOptionName = "dht-record-count" dhtTimeoutOptionName = "dht-timeout" + streamOptionName = "stream" ) var IpnsCmd = &cmds.Command{ @@ -78,6 +79,7 @@ Resolve the value of a dnslink: cmdkit.BoolOption(nocacheOptionName, "n", "Do not use cached entries."), cmdkit.UintOption(dhtRecordCountOptionName, "dhtrc", "Number of records to request for DHT resolution."), cmdkit.StringOption(dhtTimeoutOptionName, "dhtt", "Max time to collect values during DHT resolution eg \"30s\". Pass 0 for no timeout."), + cmdkit.BoolOption(streamOptionName, "s", "Stream entries as they are found."), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { n, err := cmdenv.GetNode(env) diff --git a/namesys/base.go b/namesys/base.go index afdc0a468d8..6e5dc70e079 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -10,13 +10,21 @@ import ( path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path" ) +type onceResult struct { + value path.Path + ttl time.Duration + err error +} + type resolver interface { // resolveOnce looks up a name once (without recursion). - resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (value path.Path, ttl time.Duration, err error) + resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (value path.Path, ttl time.Duration, err error) + + resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult } // resolve is a helper for implementing Resolver.ResolveN using resolveOnce. -func resolve(ctx context.Context, r resolver, name string, options *opts.ResolveOpts, prefixes ...string) (path.Path, error) { +func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) (path.Path, error) { depth := options.Depth for { p, _, err := r.resolveOnce(ctx, name, options) @@ -34,23 +42,82 @@ func resolve(ctx context.Context, r resolver, name string, options *opts.Resolve return p, ErrResolveRecursion } - matched := false - for _, prefix := range prefixes { - if strings.HasPrefix(p.String(), prefix) { - matched = true - if len(prefixes) == 1 { - name = strings.TrimPrefix(p.String(), prefix) - } - break - } - } - - if !matched { + if !strings.HasPrefix(p.String(), prefix) { return p, nil } + name = strings.TrimPrefix(p.String(), prefix) if depth > 1 { depth-- } } } + +//TODO: +// - better error handling +func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { + resCh := r.resolveOnceAsync(ctx, name, options) + depth := options.Depth + outCh := make(chan Result) + + go func() { + defer close(outCh) + var subCh <-chan Result + var cancelSub context.CancelFunc + + for { + select { + case res, ok := <-resCh: + if res.err != nil { + outCh <- Result{err: res.err} + return + } + if !ok { + resCh = nil + continue + } + log.Debugf("resolved %s to %s", name, res.value.String()) + if strings.HasPrefix(res.value.String(), "/ipfs/") { + outCh <- Result{err: res.err} + continue + } + p := strings.TrimPrefix(res.value.String(), prefix) + + if depth == 1 { + outCh <- Result{err: ErrResolveRecursion} + continue + } + + subopts := options + if subopts.Depth > 1 { + subopts.Depth-- + } + + var subCtx context.Context + if subCh != nil { + // Cancel previous recursive resolve since it won't be used anyways + cancelSub() + } + subCtx, cancelSub = context.WithCancel(ctx) + + subCh = resolveAsyncDo(subCtx, r, p, subopts, prefix) + case res, ok := <-subCh: + if res.err != nil { + outCh <- Result{err: res.err} + return + } + if !ok { + subCh = nil + continue + } + outCh <- res + case <-ctx.Done(): + } + } + }() + return outCh +} + +func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { + return resolveAsyncDo(ctx, r, name, options, prefix) +} diff --git a/namesys/dns.go b/namesys/dns.go index 5ca7323a960..a8e0b0fc2b0 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -39,7 +39,7 @@ type lookupRes struct { // resolveOnce implements resolver. // TXT records for a given domain name should contain a b58 // encoded multihash. -func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { segments := strings.SplitN(name, "/", 2) domain := segments[0] @@ -84,6 +84,61 @@ func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options *opt return p, 0, err } +func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) + segments := strings.SplitN(name, "/", 2) + domain := segments[0] + + if !isd.IsDomain(domain) { + out <- onceResult{err: errors.New("not a valid domain name")} + close(out) + return out + } + log.Debugf("DNSResolver resolving %s", domain) + + rootChan := make(chan lookupRes, 1) + go workDomain(r, domain, rootChan) + + subChan := make(chan lookupRes, 1) + go workDomain(r, "_dnslink."+domain, subChan) + + go func() { + defer close(out) + for { + select { + case subRes, ok := <-subChan: + if !ok { + subChan = nil + } + if subRes.error == nil { + select { + case out <- onceResult{value: subRes.path}: + case <-ctx.Done(): + } + return + } + case rootRes, ok := <-rootChan: + if !ok { + subChan = nil + } + if rootRes.error == nil { + select { + case out <- onceResult{value: rootRes.path}: + case <-ctx.Done(): + } + } + case <-ctx.Done(): + return + } + if subChan == nil && rootChan == nil { + return + } + } + }() + + return out +} + func workDomain(r *DNSResolver, name string, res chan lookupRes) { txt, err := r.lookupTXT(name) diff --git a/namesys/interface.go b/namesys/interface.go index fcc956cb142..aac3f324a5d 100644 --- a/namesys/interface.go +++ b/namesys/interface.go @@ -63,6 +63,12 @@ type NameSystem interface { Publisher } +// Result is the return type for Resolver.ResolveAsync. +type Result struct { + path path.Path + err error +} + // Resolver is an object capable of resolving names. type Resolver interface { @@ -81,6 +87,11 @@ type Resolver interface { // users will be fine with this default limit, but if you need to // adjust the limit you can specify it as an option. Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (value path.Path, err error) + + // ResolveAsync performs recursive name lookup, like Resolve, but it returns + // entries as they are discovered in the DHT. Each returned result is guaranteed + // to be "better" (which usually means newer) than the previous one. + ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result } // Publisher is an object capable of publishing particular names. diff --git a/namesys/namesys.go b/namesys/namesys.go index 054a51bbd4b..0842486ea1d 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -64,8 +64,25 @@ func (ns *mpns) Resolve(ctx context.Context, name string, options ...opts.Resolv return resolve(ctx, ns, name, opts.ProcessOpts(options), "/ipns/") } +func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + res := make(chan Result, 1) + if strings.HasPrefix(name, "/ipfs/") { + p, err := path.ParsePath(name) + res <- Result{p, err} + return res + } + + if !strings.HasPrefix(name, "/") { + p, err := path.ParsePath("/ipfs/" + name) + res <- Result{p, err} + return res + } + + return resolveAsync(ctx, ns, name, opts.ProcessOpts(options), "/ipns/") +} + // resolveOnce implements resolver. -func (ns *mpns) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (ns *mpns) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { if !strings.HasPrefix(name, "/ipns/") { name = "/ipns/" + name } @@ -107,6 +124,75 @@ func (ns *mpns) resolveOnce(ctx context.Context, name string, options *opts.Reso return p, 0, err } +func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) + + if !strings.HasPrefix(name, "/ipns/") { + name = "/ipns/" + name + } + segments := strings.SplitN(name, "/", 4) + if len(segments) < 3 || segments[0] != "" { + log.Debugf("invalid name syntax for %s", name) + out <- onceResult{err: ErrResolveFailed} + close(out) + return out + } + + key := segments[2] + + if p, ok := ns.cacheGet(key); ok { + out <- onceResult{value: p} + close(out) + return out + } + + // Resolver selection: + // 1. if it is a multihash resolve through "ipns". + // 2. if it is a domain name, resolve through "dns" + // 3. otherwise resolve through the "proquint" resolver + + var res resolver + if _, err := mh.FromB58String(key); err == nil { + res = ns.ipnsResolver + } else if isd.IsDomain(key) { + res = ns.dnsResolver + } else { + res = ns.proquintResolver + } + + resCh := res.resolveOnceAsync(ctx, key, options) + var best onceResult + go func() { + defer close(out) + for { + select { + case res, ok := <-resCh: + if !ok { + if best != (onceResult{}) { + ns.cacheSet(key, best.value, best.ttl) + } + return + } + if res.err == nil { + best = res + } + p := res.value + + // Attach rest of the path + if len(segments) > 3 { + p, _ = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) + } + + out <- onceResult{value: p, err: res.err} + case <-ctx.Done(): + return + } + } + }() + + return out +} + // Publish implements Publisher func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error { return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordTTL)) diff --git a/namesys/opts/opts.go b/namesys/opts/opts.go index 6690cf77942..ee2bd5ac2a4 100644 --- a/namesys/opts/opts.go +++ b/namesys/opts/opts.go @@ -31,8 +31,8 @@ type ResolveOpts struct { // DefaultResolveOpts returns the default options for resolving // an IPNS path -func DefaultResolveOpts() *ResolveOpts { - return &ResolveOpts{ +func DefaultResolveOpts() ResolveOpts { + return ResolveOpts{ Depth: DefaultDepthLimit, DhtRecordCount: 16, DhtTimeout: time.Minute, @@ -65,10 +65,10 @@ func DhtTimeout(timeout time.Duration) ResolveOpt { } // ProcessOpts converts an array of ResolveOpt into a ResolveOpts object -func ProcessOpts(opts []ResolveOpt) *ResolveOpts { +func ProcessOpts(opts []ResolveOpt) ResolveOpts { rsopts := DefaultResolveOpts() for _, option := range opts { - option(rsopts) + option(&rsopts) } return rsopts } diff --git a/namesys/proquint.go b/namesys/proquint.go index 4ba505dc88a..279c361fdd1 100644 --- a/namesys/proquint.go +++ b/namesys/proquint.go @@ -19,7 +19,7 @@ func (r *ProquintResolver) Resolve(ctx context.Context, name string, options ... } // resolveOnce implements resolver. Decodes the proquint string. -func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { ok, err := proquint.IsProquint(name) if err != nil || !ok { return "", 0, errors.New("not a valid proquint string") @@ -27,3 +27,17 @@ func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options // Return a 0 TTL as caching this result is pointless. return path.FromString(string(proquint.Decode(name))), 0, nil } + +func (r *ProquintResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) + defer close(out) + + ok, err := proquint.IsProquint(name) + if err != nil || !ok { + out <- onceResult{err: errors.New("not a valid proquint string")} + return out + } + // Return a 0 TTL as caching this result is pointless. + out <- onceResult{value: path.FromString(string(proquint.Decode(name)))} + return out +} diff --git a/namesys/routing.go b/namesys/routing.go index dc53868ad7f..d633c2d8dc4 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -42,9 +42,13 @@ func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } +func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/") +} + // resolveOnce implements resolver. Uses the IPFS routing system to // resolve SFS-like names. -func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { log.Debugf("RoutingResolver resolving %s", name) if options.DhtTimeout != 0 { @@ -126,3 +130,132 @@ func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options *op return p, ttl, nil } + +func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) + log.Debugf("RoutingResolver resolving %s", name) + if options.DhtTimeout != 0 { + // Resolution must complete within the timeout + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout) + defer cancel() + } + + name = strings.TrimPrefix(name, "/ipns/") + hash, err := mh.FromB58String(name) + if err != nil { + // name should be a multihash. if it isn't, error out here. + log.Debugf("RoutingResolver: bad input hash: [%s]\n", name) + out <- onceResult{err: err} + close(out) + return out + } + + pid, err := peer.IDFromBytes(hash) + if err != nil { + log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err) + out <- onceResult{err: err} + close(out) + return out + } + + // Name should be the hash of a public key retrievable from ipfs. + // We retrieve the public key here to make certain that it's in the peer + // store before calling GetValue() on the DHT - the DHT will call the + // ipns validator, which in turn will get the public key from the peer + // store to verify the record signature + _, err = routing.GetPublicKey(r.routing, ctx, pid) + if err != nil { + log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err) + out <- onceResult{err: err} + close(out) + return out + } + + // Use the routing system to get the name. + // Note that the DHT will call the ipns validator when retrieving + // the value, which in turn verifies the ipns record signature + ipnsKey := ipns.RecordKey(pid) + + vals, err := r.routing.(*dht.IpfsDHT).SearchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) + if err != nil { + log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) + out <- onceResult{err: err} + close(out) + return out + } + + go func() { + defer close(out) + for { + select { + case val, ok := <-vals: + if !ok { + return + } + + entry := new(pb.IpnsEntry) + err = proto.Unmarshal(val, entry) + if err != nil { + log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err) + select { + case out <- onceResult{err: err}: + case <-ctx.Done(): + } + return + } + + var p path.Path + // check for old style record: + if valh, err := mh.Cast(entry.GetValue()); err == nil { + // Its an old style multihash record + log.Debugf("encountered CIDv0 ipns entry: %s", valh) + p = path.FromCid(cid.NewCidV0(valh)) + } else { + // Not a multihash, probably a new style record + p, err = path.ParsePath(string(entry.GetValue())) + if err != nil { + select { + case out <- onceResult{err: err}: + case <-ctx.Done(): + } + return + } + } + + ttl := DefaultResolverCacheTTL + if entry.Ttl != nil { + ttl = time.Duration(*entry.Ttl) + } + switch eol, err := ipns.GetEOL(entry); err { + case ipns.ErrUnrecognizedValidity: + // No EOL. + case nil: + ttEol := eol.Sub(time.Now()) + if ttEol < 0 { + // It *was* valid when we first resolved it. + ttl = 0 + } else if ttEol < ttl { + ttl = ttEol + } + default: + log.Errorf("encountered error when parsing EOL: %s", err) + select { + case out <- onceResult{err: err}: + case <-ctx.Done(): + } + return + } + + select { + case out <- onceResult{value: p, ttl: ttl}: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } + } + }() + + return out +} From 6adb15f4fb3c2bc64207972339f4f0a32b1cf834 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 28 Aug 2018 01:06:12 +0200 Subject: [PATCH 02/12] namesys: async: go vet fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/corehttp/gateway_test.go | 10 +++++++++- namesys/base.go | 24 ++++++++++++++---------- namesys/dns.go | 8 ++++++-- namesys/interface.go | 4 ++-- namesys/namesys_test.go | 6 +++++- 5 files changed, 36 insertions(+), 16 deletions(-) diff --git a/core/corehttp/gateway_test.go b/core/corehttp/gateway_test.go index 83e2ea5e8dd..f93c8844c50 100644 --- a/core/corehttp/gateway_test.go +++ b/core/corehttp/gateway_test.go @@ -35,7 +35,7 @@ type mockNamesys map[string]path.Path func (m mockNamesys) Resolve(ctx context.Context, name string, opts ...nsopts.ResolveOpt) (value path.Path, err error) { cfg := nsopts.DefaultResolveOpts() for _, o := range opts { - o(cfg) + o(&cfg) } depth := cfg.Depth if depth == nsopts.UnlimitedDepth { @@ -57,6 +57,14 @@ func (m mockNamesys) Resolve(ctx context.Context, name string, opts ...nsopts.Re return value, nil } +func (m mockNamesys) ResolveAsync(ctx context.Context, name string, opts ...nsopts.ResolveOpt) <-chan namesys.Result { + out := make(chan namesys.Result, 1) + v, err := m.Resolve(ctx, name, opts...) + out <- namesys.Result{Path: v, Err: err} + close(out) + return nil +} + func (m mockNamesys) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error { return errors.New("not implemented for mockNamesys") } diff --git a/namesys/base.go b/namesys/base.go index 6e5dc70e079..b3610cc0d59 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -68,23 +68,24 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R for { select { case res, ok := <-resCh: - if res.err != nil { - outCh <- Result{err: res.err} - return - } if !ok { resCh = nil continue } + + if res.err != nil { + outCh <- Result{Err: res.err} + return + } log.Debugf("resolved %s to %s", name, res.value.String()) if strings.HasPrefix(res.value.String(), "/ipfs/") { - outCh <- Result{err: res.err} + outCh <- Result{Err: res.err} continue } p := strings.TrimPrefix(res.value.String(), prefix) if depth == 1 { - outCh <- Result{err: ErrResolveRecursion} + outCh <- Result{Err: ErrResolveRecursion} continue } @@ -99,17 +100,20 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R cancelSub() } subCtx, cancelSub = context.WithCancel(ctx) + defer cancelSub() subCh = resolveAsyncDo(subCtx, r, p, subopts, prefix) case res, ok := <-subCh: - if res.err != nil { - outCh <- Result{err: res.err} - return - } if !ok { subCh = nil continue } + + if res.Err != nil { + outCh <- Result{Err: res.Err} + return + } + outCh <- res case <-ctx.Done(): } diff --git a/namesys/dns.go b/namesys/dns.go index a8e0b0fc2b0..d92ce8fa7b9 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -31,6 +31,10 @@ func (r *DNSResolver) Resolve(ctx context.Context, name string, options ...opts. return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } +func (r *DNSResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/") +} + type lookupRes struct { path path.Path error error @@ -112,8 +116,8 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options } if subRes.error == nil { select { - case out <- onceResult{value: subRes.path}: - case <-ctx.Done(): + case out <- onceResult{value: subRes.path}: + case <-ctx.Done(): } return } diff --git a/namesys/interface.go b/namesys/interface.go index aac3f324a5d..dd195cc9ab4 100644 --- a/namesys/interface.go +++ b/namesys/interface.go @@ -65,8 +65,8 @@ type NameSystem interface { // Result is the return type for Resolver.ResolveAsync. type Result struct { - path path.Path - err error + Path path.Path + Err error } // Resolver is an object capable of resolving names. diff --git a/namesys/namesys_test.go b/namesys/namesys_test.go index c2a530c26c1..23a1852f84b 100644 --- a/namesys/namesys_test.go +++ b/namesys/namesys_test.go @@ -38,11 +38,15 @@ func testResolution(t *testing.T, resolver Resolver, name string, depth uint, ex } } -func (r *mockResolver) resolveOnce(ctx context.Context, name string, opts *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *mockResolver) resolveOnce(ctx context.Context, name string, opts opts.ResolveOpts) (path.Path, time.Duration, error) { p, err := path.ParsePath(r.entries[name]) return p, 0, err } +func (r *mockResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + panic("stub") +} + func mockResolverOne() *mockResolver { return &mockResolver{ entries: map[string]string{ From 804634d59b249a00768390e479bec558035ef5f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 28 Aug 2018 15:37:12 +0200 Subject: [PATCH 03/12] namesys: switch to async code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- namesys/base.go | 66 +++++-------- namesys/dns.go | 67 ++++--------- namesys/ipns_resolver_validation_test.go | 11 +-- namesys/namesys.go | 42 -------- namesys/namesys_test.go | 14 ++- namesys/proquint.go | 13 +-- namesys/routing.go | 117 ++++------------------- 7 files changed, 74 insertions(+), 256 deletions(-) diff --git a/namesys/base.go b/namesys/base.go index b3610cc0d59..c248327995e 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -17,45 +17,33 @@ type onceResult struct { } type resolver interface { - // resolveOnce looks up a name once (without recursion). - resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (value path.Path, ttl time.Duration, err error) - resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult } // resolve is a helper for implementing Resolver.ResolveN using resolveOnce. func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) (path.Path, error) { - depth := options.Depth - for { - p, _, err := r.resolveOnce(ctx, name, options) - if err != nil { - return "", err - } - log.Debugf("resolved %s to %s", name, p.String()) + ctx, cancel := context.WithCancel(ctx) + defer cancel() - if strings.HasPrefix(p.String(), "/ipfs/") { - // we've bottomed out with an IPFS path - return p, nil - } + err := ErrResolveFailed + var p path.Path - if depth == 1 { - return p, ErrResolveRecursion - } + resCh := resolveAsync(ctx, r, name, options, prefix) - if !strings.HasPrefix(p.String(), prefix) { - return p, nil - } - name = strings.TrimPrefix(p.String(), prefix) - - if depth > 1 { - depth-- + for res := range resCh { + p, err = res.Path, res.Err + if err != nil { + break } } + + return p, err } //TODO: // - better error handling -func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { +// - select on writes +func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { resCh := r.resolveOnceAsync(ctx, name, options) depth := options.Depth outCh := make(chan Result) @@ -70,7 +58,7 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R case res, ok := <-resCh: if !ok { resCh = nil - continue + break } if res.err != nil { @@ -79,14 +67,13 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R } log.Debugf("resolved %s to %s", name, res.value.String()) if strings.HasPrefix(res.value.String(), "/ipfs/") { - outCh <- Result{Err: res.err} - continue + outCh <- Result{Path: res.value} + break } - p := strings.TrimPrefix(res.value.String(), prefix) if depth == 1 { - outCh <- Result{Err: ErrResolveRecursion} - continue + outCh <- Result{Path: res.value, Err: ErrResolveRecursion} + break } subopts := options @@ -102,26 +89,21 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R subCtx, cancelSub = context.WithCancel(ctx) defer cancelSub() - subCh = resolveAsyncDo(subCtx, r, p, subopts, prefix) + p := strings.TrimPrefix(res.value.String(), prefix) + subCh = resolveAsync(subCtx, r, p, subopts, prefix) case res, ok := <-subCh: if !ok { subCh = nil - continue - } - - if res.Err != nil { - outCh <- Result{Err: res.Err} - return + break } outCh <- res case <-ctx.Done(): } + if resCh == nil && subCh == nil { + return + } } }() return outCh } - -func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { - return resolveAsyncDo(ctx, r, name, options, prefix) -} diff --git a/namesys/dns.go b/namesys/dns.go index d92ce8fa7b9..f90880de9ad 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -5,7 +5,6 @@ import ( "errors" "net" "strings" - "time" opts "github.com/ipfs/go-ipfs/namesys/opts" isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain" @@ -31,6 +30,7 @@ func (r *DNSResolver) Resolve(ctx context.Context, name string, options ...opts. return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } +// ResolveAsync implements Resolver. func (r *DNSResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } @@ -43,51 +43,6 @@ type lookupRes struct { // resolveOnce implements resolver. // TXT records for a given domain name should contain a b58 // encoded multihash. -func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { - segments := strings.SplitN(name, "/", 2) - domain := segments[0] - - if !isd.IsDomain(domain) { - return "", 0, errors.New("not a valid domain name") - } - log.Debugf("DNSResolver resolving %s", domain) - - rootChan := make(chan lookupRes, 1) - go workDomain(r, domain, rootChan) - - subChan := make(chan lookupRes, 1) - go workDomain(r, "_dnslink."+domain, subChan) - - var subRes lookupRes - select { - case subRes = <-subChan: - case <-ctx.Done(): - return "", 0, ctx.Err() - } - - var p path.Path - if subRes.error == nil { - p = subRes.path - } else { - var rootRes lookupRes - select { - case rootRes = <-rootChan: - case <-ctx.Done(): - return "", 0, ctx.Err() - } - if rootRes.error == nil { - p = rootRes.path - } else { - return "", 0, ErrResolveFailed - } - } - var err error - if len(segments) > 1 { - p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1]) - } - return p, 0, err -} - func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { out := make(chan onceResult, 1) segments := strings.SplitN(name, "/", 2) @@ -106,6 +61,13 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options subChan := make(chan lookupRes, 1) go workDomain(r, "_dnslink."+domain, subChan) + appendPath := func(p path.Path) (path.Path, error) { + if len(segments) > 1 { + return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1]) + } + return p, nil + } + go func() { defer close(out) for { @@ -113,21 +75,25 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options case subRes, ok := <-subChan: if !ok { subChan = nil + break } if subRes.error == nil { + p, err := appendPath(subRes.path) select { - case out <- onceResult{value: subRes.path}: + case out <- onceResult{value: p, err: err}: case <-ctx.Done(): } return } case rootRes, ok := <-rootChan: if !ok { - subChan = nil + rootChan = nil + break } if rootRes.error == nil { + p, err := appendPath(rootRes.path) select { - case out <- onceResult{value: rootRes.path}: + case out <- onceResult{value: p, err: err}: case <-ctx.Done(): } } @@ -144,8 +110,9 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options } func workDomain(r *DNSResolver, name string, res chan lookupRes) { - txt, err := r.lookupTXT(name) + defer close(res) + txt, err := r.lookupTXT(name) if err != nil { // Error is != nil res <- lookupRes{"", err} diff --git a/namesys/ipns_resolver_validation_test.go b/namesys/ipns_resolver_validation_test.go index cc99bf1d7b6..36e5fdc67aa 100644 --- a/namesys/ipns_resolver_validation_test.go +++ b/namesys/ipns_resolver_validation_test.go @@ -57,14 +57,13 @@ func TestResolverValidation(t *testing.T) { } // Resolve entry - resp, _, err := resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts()) + resp, err := resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err != nil { t.Fatal(err) } if resp != path.Path(p) { t.Fatalf("Mismatch between published path %s and resolved path %s", p, resp) } - // Create expired entry expiredEntry, err := ipns.Create(priv, p, 1, ts.Add(-1*time.Hour)) if err != nil { @@ -78,7 +77,7 @@ func TestResolverValidation(t *testing.T) { } // Record should fail validation because entry is expired - _, _, err = resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err == nil { t.Fatal("ValidateIpnsRecord should have returned error") } @@ -100,7 +99,7 @@ func TestResolverValidation(t *testing.T) { // Record should fail validation because public key defined by // ipns path doesn't match record signature - _, _, err = resolver.resolveOnce(ctx, id2.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id2.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err == nil { t.Fatal("ValidateIpnsRecord should have failed signature verification") } @@ -118,7 +117,7 @@ func TestResolverValidation(t *testing.T) { // Record should fail validation because public key is not available // in peer store or on network - _, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err == nil { t.Fatal("ValidateIpnsRecord should have failed because public key was not found") } @@ -133,7 +132,7 @@ func TestResolverValidation(t *testing.T) { // public key is available in the peer store by looking it up in // the DHT, which causes the DHT to fetch it and cache it in the // peer store - _, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err != nil { t.Fatal(err) } diff --git a/namesys/namesys.go b/namesys/namesys.go index 0842486ea1d..1099997b23e 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -82,48 +82,6 @@ func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.R } // resolveOnce implements resolver. -func (ns *mpns) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { - if !strings.HasPrefix(name, "/ipns/") { - name = "/ipns/" + name - } - segments := strings.SplitN(name, "/", 4) - if len(segments) < 3 || segments[0] != "" { - log.Debugf("invalid name syntax for %s", name) - return "", 0, ErrResolveFailed - } - - key := segments[2] - - p, ok := ns.cacheGet(key) - var err error - if !ok { - // Resolver selection: - // 1. if it is a multihash resolve through "ipns". - // 2. if it is a domain name, resolve through "dns" - // 3. otherwise resolve through the "proquint" resolver - var res resolver - if _, err := mh.FromB58String(key); err == nil { - res = ns.ipnsResolver - } else if isd.IsDomain(key) { - res = ns.dnsResolver - } else { - res = ns.proquintResolver - } - - var ttl time.Duration - p, ttl, err = res.resolveOnce(ctx, key, options) - if err != nil { - return "", 0, ErrResolveFailed - } - ns.cacheSet(key, p, ttl) - } - - if len(segments) > 3 { - p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) - } - return p, 0, err -} - func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { out := make(chan onceResult, 1) diff --git a/namesys/namesys_test.go b/namesys/namesys_test.go index 23a1852f84b..09d5cf81e44 100644 --- a/namesys/namesys_test.go +++ b/namesys/namesys_test.go @@ -4,12 +4,11 @@ import ( "context" "fmt" "testing" - "time" opts "github.com/ipfs/go-ipfs/namesys/opts" + "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs" path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path" - ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" offroute "gx/ipfs/QmScZySgru9jaoDa12sSfvh21sWbqF5eXkieTmJzAHJXkQ/go-ipfs-routing/offline" ds "gx/ipfs/QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV/go-datastore" @@ -38,13 +37,12 @@ func testResolution(t *testing.T, resolver Resolver, name string, depth uint, ex } } -func (r *mockResolver) resolveOnce(ctx context.Context, name string, opts opts.ResolveOpts) (path.Path, time.Duration, error) { - p, err := path.ParsePath(r.entries[name]) - return p, 0, err -} - func (r *mockResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { - panic("stub") + p, err := path.ParsePath(r.entries[name]) + out := make(chan onceResult, 1) + out <- onceResult{value: p, err: err} + close(out) + return out } func mockResolverOne() *mockResolver { diff --git a/namesys/proquint.go b/namesys/proquint.go index 279c361fdd1..ad09fd48c27 100644 --- a/namesys/proquint.go +++ b/namesys/proquint.go @@ -1,10 +1,8 @@ package namesys import ( + "context" "errors" - "time" - - context "context" opts "github.com/ipfs/go-ipfs/namesys/opts" proquint "gx/ipfs/QmYnf27kzqR2cxt6LFZdrAFJuQd6785fTkBvMuEj9EeRxM/proquint" @@ -19,15 +17,6 @@ func (r *ProquintResolver) Resolve(ctx context.Context, name string, options ... } // resolveOnce implements resolver. Decodes the proquint string. -func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { - ok, err := proquint.IsProquint(name) - if err != nil || !ok { - return "", 0, errors.New("not a valid proquint string") - } - // Return a 0 TTL as caching this result is pointless. - return path.FromString(string(proquint.Decode(name))), 0, nil -} - func (r *ProquintResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { out := make(chan onceResult, 1) defer close(out) diff --git a/namesys/routing.go b/namesys/routing.go index d633c2d8dc4..c591d5662ea 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -11,6 +11,7 @@ import ( cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" routing "gx/ipfs/QmVBnJDKhtFXTRVjXKinqpwGu8t1DyNqPKan2iGX8PR8xG/go-libp2p-routing" + ropts "gx/ipfs/QmVBnJDKhtFXTRVjXKinqpwGu8t1DyNqPKan2iGX8PR8xG/go-libp2p-routing/options" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" dht "gx/ipfs/QmZVakpN44VAUxs9eXAuUGLFYTCGmSyqSy6hyEKfMv68ME/go-libp2p-kad-dht" ipns "gx/ipfs/QmZrmn2BPZbSviQAWeyY2iXkCukmJHv9n7zrLgWU5KgbTb/go-ipns" @@ -42,120 +43,30 @@ func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } +// ResolveAsync implements Resolver. func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } // resolveOnce implements resolver. Uses the IPFS routing system to // resolve SFS-like names. -func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { - log.Debugf("RoutingResolver resolving %s", name) - - if options.DhtTimeout != 0 { - // Resolution must complete within the timeout - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout) - defer cancel() - } - - name = strings.TrimPrefix(name, "/ipns/") - pid, err := peer.IDB58Decode(name) - if err != nil { - // name should be a multihash. if it isn't, error out here. - log.Debugf("RoutingResolver: IPNS address not a valid peer ID: [%s]\n", name) - return "", 0, err - } - - // Name should be the hash of a public key retrievable from ipfs. - // We retrieve the public key here to make certain that it's in the peer - // store before calling GetValue() on the DHT - the DHT will call the - // ipns validator, which in turn will get the public key from the peer - // store to verify the record signature - _, err = routing.GetPublicKey(r.routing, ctx, pid) - if err != nil { - log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err) - return "", 0, err - } - - // Use the routing system to get the name. - // Note that the DHT will call the ipns validator when retrieving - // the value, which in turn verifies the ipns record signature - ipnsKey := ipns.RecordKey(pid) - val, err := r.routing.GetValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) - if err != nil { - log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) - return "", 0, err - } - - entry := new(pb.IpnsEntry) - err = proto.Unmarshal(val, entry) - if err != nil { - log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err) - return "", 0, err - } - - var p path.Path - // check for old style record: - if valh, err := mh.Cast(entry.GetValue()); err == nil { - // Its an old style multihash record - log.Debugf("encountered CIDv0 ipns entry: %s", valh) - p = path.FromCid(cid.NewCidV0(valh)) - } else { - // Not a multihash, probably a new record - p, err = path.ParsePath(string(entry.GetValue())) - if err != nil { - return "", 0, err - } - } - - ttl := DefaultResolverCacheTTL - if entry.Ttl != nil { - ttl = time.Duration(*entry.Ttl) - } - switch eol, err := ipns.GetEOL(entry); err { - case ipns.ErrUnrecognizedValidity: - // No EOL. - case nil: - ttEol := eol.Sub(time.Now()) - if ttEol < 0 { - // It *was* valid when we first resolved it. - ttl = 0 - } else if ttEol < ttl { - ttl = ttEol - } - default: - log.Errorf("encountered error when parsing EOL: %s", err) - return "", 0, err - } - - return p, ttl, nil -} - func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { out := make(chan onceResult, 1) log.Debugf("RoutingResolver resolving %s", name) + cancel := func() {} + if options.DhtTimeout != 0 { // Resolution must complete within the timeout - var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout) - defer cancel() } name = strings.TrimPrefix(name, "/ipns/") - hash, err := mh.FromB58String(name) - if err != nil { - // name should be a multihash. if it isn't, error out here. - log.Debugf("RoutingResolver: bad input hash: [%s]\n", name) - out <- onceResult{err: err} - close(out) - return out - } - - pid, err := peer.IDFromBytes(hash) + pid, err := peer.IDB58Decode(name) if err != nil { log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err) out <- onceResult{err: err} close(out) + cancel() return out } @@ -169,6 +80,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err) out <- onceResult{err: err} close(out) + cancel() return out } @@ -177,15 +89,17 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option // the value, which in turn verifies the ipns record signature ipnsKey := ipns.RecordKey(pid) - vals, err := r.routing.(*dht.IpfsDHT).SearchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) + vals, err := r.searchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) if err != nil { log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) out <- onceResult{err: err} close(out) + cancel() return out } go func() { + defer cancel() defer close(out) for { select { @@ -259,3 +173,14 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option return out } + +func (r *IpnsResolver) searchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) { + if ir, ok := r.routing.(*dht.IpfsDHT); ok { + return ir.SearchValue(ctx, key, opts...) + } + out := make(chan []byte, 1) + val, err := r.routing.GetValue(ctx, key, opts...) + out <- val + close(out) + return out, err +} From f69cf07444b91db5238813e43e1562b9916cde1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Aug 2018 01:03:56 +0200 Subject: [PATCH 04/12] ipfs name resolve --stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/commands/name/ipns.go | 28 +++++++++++++++++++++------- namesys/namesys_test.go | 4 ++-- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/core/commands/name/ipns.go b/core/commands/name/ipns.go index d1260831503..8df592a037a 100644 --- a/core/commands/name/ipns.go +++ b/core/commands/name/ipns.go @@ -127,7 +127,7 @@ Resolve the value of a dnslink: recursive, _ := req.Options[recursiveOptionName].(bool) rc, rcok := req.Options[dhtRecordCountOptionName].(int) dhtt, dhttok := req.Options[dhtTimeoutOptionName].(string) - + stream, _ := req.Options[streamOptionName].(bool) var ropts []nsopts.ResolveOpt if !recursive { ropts = append(ropts, nsopts.Depth(1)) @@ -150,13 +150,27 @@ Resolve the value of a dnslink: name = "/ipns/" + name } - output, err := resolver.Resolve(req.Context, name, ropts...) - if err != nil { - return err - } - // TODO: better errors (in the case of not finding the name, we get "failed to find any peer in table") - return cmds.EmitOnce(res, &ResolvedPath{output}) + + if !stream { + output, err := resolver.Resolve(req.Context, name, ropts...) + if err != nil { + return err + } + + return cmds.EmitOnce(res, &ResolvedPath{output}) + } else { + output := resolver.ResolveAsync(req.Context, name, ropts...) + for v := range output { + if v.Err != nil { + return err + } + if err := res.Emit(&ResolvedPath{v.Path}); err != nil { + return err + } + } + } + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { diff --git a/namesys/namesys_test.go b/namesys/namesys_test.go index 09d5cf81e44..301c98a5f61 100644 --- a/namesys/namesys_test.go +++ b/namesys/namesys_test.go @@ -7,14 +7,14 @@ import ( opts "github.com/ipfs/go-ipfs/namesys/opts" - "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs" - path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path" ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" offroute "gx/ipfs/QmScZySgru9jaoDa12sSfvh21sWbqF5eXkieTmJzAHJXkQ/go-ipfs-routing/offline" + "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs" ds "gx/ipfs/QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV/go-datastore" dssync "gx/ipfs/QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV/go-datastore/sync" ipns "gx/ipfs/QmZrmn2BPZbSviQAWeyY2iXkCukmJHv9n7zrLgWU5KgbTb/go-ipns" peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path" pstoremem "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore/pstoremem" ) From 8e4ce426a49be469f359d8d0bbde20a7e5b7bffa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 24 Sep 2018 16:12:47 +0200 Subject: [PATCH 05/12] namesys: use routing.SearchValue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/commands/name/ipns.go | 19 ++++++++++--------- namesys/routing.go | 14 +------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/core/commands/name/ipns.go b/core/commands/name/ipns.go index 8df592a037a..9ee2e8c5cff 100644 --- a/core/commands/name/ipns.go +++ b/core/commands/name/ipns.go @@ -159,16 +159,17 @@ Resolve the value of a dnslink: } return cmds.EmitOnce(res, &ResolvedPath{output}) - } else { - output := resolver.ResolveAsync(req.Context, name, ropts...) - for v := range output { - if v.Err != nil { - return err - } - if err := res.Emit(&ResolvedPath{v.Path}); err != nil { - return err - } + } + + output := resolver.ResolveAsync(req.Context, name, ropts...) + for v := range output { + if v.Err != nil { + return err } + if err := res.Emit(&ResolvedPath{v.Path}); err != nil { + return err + } + } return nil }, diff --git a/namesys/routing.go b/namesys/routing.go index c591d5662ea..6ac9f081a45 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -11,7 +11,6 @@ import ( cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" routing "gx/ipfs/QmVBnJDKhtFXTRVjXKinqpwGu8t1DyNqPKan2iGX8PR8xG/go-libp2p-routing" - ropts "gx/ipfs/QmVBnJDKhtFXTRVjXKinqpwGu8t1DyNqPKan2iGX8PR8xG/go-libp2p-routing/options" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" dht "gx/ipfs/QmZVakpN44VAUxs9eXAuUGLFYTCGmSyqSy6hyEKfMv68ME/go-libp2p-kad-dht" ipns "gx/ipfs/QmZrmn2BPZbSviQAWeyY2iXkCukmJHv9n7zrLgWU5KgbTb/go-ipns" @@ -89,7 +88,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option // the value, which in turn verifies the ipns record signature ipnsKey := ipns.RecordKey(pid) - vals, err := r.searchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) + vals, err := r.routing.SearchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) if err != nil { log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) out <- onceResult{err: err} @@ -173,14 +172,3 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option return out } - -func (r *IpnsResolver) searchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) { - if ir, ok := r.routing.(*dht.IpfsDHT); ok { - return ir.SearchValue(ctx, key, opts...) - } - out := make(chan []byte, 1) - val, err := r.routing.GetValue(ctx, key, opts...) - out <- val - close(out) - return out, err -} From ea352f62d0f63f38e6a1f1cb8467bb5dbf3c43b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 26 Sep 2018 14:01:55 +0200 Subject: [PATCH 06/12] update routing-helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/core.go | 19 +++++++++++-------- package.json | 8 ++++---- test/sharness/t0183-namesys-pubsub.sh | 4 ++-- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/core/core.go b/core/core.go index dc58a48338a..7cbaf705c59 100644 --- a/core/core.go +++ b/core/core.go @@ -33,6 +33,7 @@ import ( metrics "gx/ipfs/QmNn6gcjBXpg8kccr9zEV7UVBpqAw8FZEiQ6DksvzyTQ5K/go-libp2p-metrics" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" quic "gx/ipfs/QmPWMfcxNC7txnUvT21xdivgRJfJ9e7YuC5nCn6JEALFQv/go-libp2p-quic-transport" + psrouter "gx/ipfs/QmPbLCBNGvyQhs3xK64cbobq7sN1Hdn6Ud9pkhsZME1sqT/go-libp2p-pubsub-router" floodsub "gx/ipfs/QmPbYVFhKxamZAN9MyrQMDeoGYa6zkQkhAPguwFfzxPM1J/go-libp2p-floodsub" u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util" ic "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" @@ -68,12 +69,11 @@ import ( mfs "gx/ipfs/QmahrY1adY4wvtYEtoGjpZ2GUohTyukrkMkwUR9ytRjTG2/go-mfs" peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" merkledag "gx/ipfs/QmcBoNcAP6qDjgRBew7yjvCqHq7p5jMstE44jPUBWBxzsV/go-merkledag" + rhelpers "gx/ipfs/QmcRdSdYkL17qhuQAibF5D14XdQ1iunvaVnXGjDiQTdRm9/go-libp2p-routing-helpers" bserv "gx/ipfs/QmcRecCZWM2NZfCQrCe97Ch3Givv8KKEP82tGUDntzdLFe/go-blockservice" "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path/resolver" yamux "gx/ipfs/QmcsgrV3nCAKjiHKZhKVXWc4oY3WBECJCqahXEMpHeMrev/go-smux-yamux" - psrouter "gx/ipfs/Qmd547Rr4cZUEG5ETGHHNgx6xHBY4ee7hB6NAEBw2UWnea/go-libp2p-pubsub-router" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" - rhelpers "gx/ipfs/QmdELF3gxpecwBauFtZBib2fcnEEAbXiTVA1NSWZjf95Tp/go-libp2p-routing-helpers" bstore "gx/ipfs/QmdriVJgKx4JADRgh3cYPXqXmsa1A45SvFki1nDWHhQNtC/go-ipfs-blockstore" p2phost "gx/ipfs/QmeA5hsqgLryvkeyqeQdvGDqurLkYi3XEPLZP3pzuBJXh2/go-libp2p-host" pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" @@ -523,14 +523,17 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost n.RecordValidator, ) n.Routing = rhelpers.Tiered{ - // Always check pubsub first. - &rhelpers.Compose{ - ValueStore: &rhelpers.LimitedValueStore{ - ValueStore: n.PSRouter, - Namespaces: []string{"ipns"}, + Routers: []routing.IpfsRouting{ + // Always check pubsub first. + &rhelpers.Compose{ + ValueStore: &rhelpers.LimitedValueStore{ + ValueStore: n.PSRouter, + Namespaces: []string{"ipns"}, + }, }, + n.Routing, }, - n.Routing, + Validator: n.RecordValidator, } } diff --git a/package.json b/package.json index e5d03e70e4a..cafdcd7ead5 100644 --- a/package.json +++ b/package.json @@ -464,15 +464,15 @@ }, { "author": "stebalien", - "hash": "Qmd547Rr4cZUEG5ETGHHNgx6xHBY4ee7hB6NAEBw2UWnea", + "hash": "QmPbLCBNGvyQhs3xK64cbobq7sN1Hdn6Ud9pkhsZME1sqT", "name": "go-libp2p-pubsub-router", - "version": "0.4.3" + "version": "0.4.5" }, { "author": "Stebalien", - "hash": "QmdELF3gxpecwBauFtZBib2fcnEEAbXiTVA1NSWZjf95Tp", + "hash": "QmcRdSdYkL17qhuQAibF5D14XdQ1iunvaVnXGjDiQTdRm9", "name": "go-libp2p-routing-helpers", - "version": "0.3.2" + "version": "0.3.4" }, { "author": "fsnotify", diff --git a/test/sharness/t0183-namesys-pubsub.sh b/test/sharness/t0183-namesys-pubsub.sh index eb3c27921f2..467c9c1e9a0 100755 --- a/test/sharness/t0183-namesys-pubsub.sh +++ b/test/sharness/t0183-namesys-pubsub.sh @@ -28,8 +28,8 @@ test_expect_success 'check namesys pubsub state' ' # These commands are *expected* to fail. We haven't published anything yet. test_expect_success 'subscribe nodes to the publisher topic' ' - ipfsi 1 name resolve /ipns/$PEERID_0; - ipfsi 2 name resolve /ipns/$PEERID_0; + ipfsi 1 name resolve /ipns/$PEERID_0 --timeout=1s; + ipfsi 2 name resolve /ipns/$PEERID_0 --timeout=1s; true ' From 7dbeb27e5b1e18f126aaaf3bd2cd8eba81ec675f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Oct 2018 11:41:00 +0200 Subject: [PATCH 07/12] namesys: review fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/interface/name.go | 1 - core/coreapi/name.go | 3 +-- core/corehttp/gateway_test.go | 2 +- namesys/base.go | 16 +++++++++------- namesys/dns.go | 1 + namesys/namesys.go | 18 +++++++++++++++--- 6 files changed, 27 insertions(+), 14 deletions(-) diff --git a/core/coreapi/interface/name.go b/core/coreapi/interface/name.go index 14127ac27b3..782f6835150 100644 --- a/core/coreapi/interface/name.go +++ b/core/coreapi/interface/name.go @@ -2,7 +2,6 @@ package iface import ( "context" - "errors" options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" diff --git a/core/coreapi/name.go b/core/coreapi/name.go index 51afff66f59..b5bf3fbd633 100644 --- a/core/coreapi/name.go +++ b/core/coreapi/name.go @@ -7,8 +7,6 @@ import ( "strings" "time" - ipath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" - "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" @@ -18,6 +16,7 @@ import ( "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" "gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/offline" "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + ipath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" ) type NameAPI CoreAPI diff --git a/core/corehttp/gateway_test.go b/core/corehttp/gateway_test.go index 701f16f07a1..e7e34af559b 100644 --- a/core/corehttp/gateway_test.go +++ b/core/corehttp/gateway_test.go @@ -62,7 +62,7 @@ func (m mockNamesys) ResolveAsync(ctx context.Context, name string, opts ...nsop v, err := m.Resolve(ctx, name, opts...) out <- namesys.Result{Path: v, Err: err} close(out) - return nil + return out } func (m mockNamesys) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error { diff --git a/namesys/base.go b/namesys/base.go index 56cfd03a242..508847ac3bc 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -1,12 +1,12 @@ package namesys import ( + "context" "strings" "time" - context "context" - opts "github.com/ipfs/go-ipfs/namesys/opts" + path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" ) @@ -40,13 +40,10 @@ func resolve(ctx context.Context, r resolver, name string, options opts.ResolveO return p, err } -//TODO: -// - better error handling -// - select on writes func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { resCh := r.resolveOnceAsync(ctx, name, options) depth := options.Depth - outCh := make(chan Result) + outCh := make(chan Result, 1) go func() { defer close(outCh) @@ -97,8 +94,13 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res break } - outCh <- res + select { + case outCh <- res: + case <-ctx.Done(): + return + } case <-ctx.Done(): + return } if resCh == nil && subCh == nil { return diff --git a/namesys/dns.go b/namesys/dns.go index 81eef07da9c..f4d37e654fa 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -7,6 +7,7 @@ import ( "strings" opts "github.com/ipfs/go-ipfs/namesys/opts" + isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" ) diff --git a/namesys/namesys.go b/namesys/namesys.go index 2e71b30031d..83cd7dbccf8 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -5,9 +5,10 @@ import ( "strings" "time" - opts "github.com/ipfs/go-ipfs/namesys/opts" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + opts "github.com/ipfs/go-ipfs/namesys/opts" + routing "gx/ipfs/QmPmFeQ5oY5G6M7aBWggi5phxEPXwsQntE1DFcUzETULdp/go-libp2p-routing" mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" @@ -138,10 +139,21 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts. // Attach rest of the path if len(segments) > 3 { - p, _ = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) + p, err := path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) + if err != nil { + select { + case out <- onceResult{value: p, err: err}: + case <-ctx.Done(): + } + return + } } - out <- onceResult{value: p, err: res.err} + select { + case out <- onceResult{value: p, ttl: res.ttl, err: res.err}: + case <-ctx.Done(): + return + } case <-ctx.Done(): return } From e335fd32f26951d84df57813b7a91bc23b09062e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Oct 2018 16:35:31 +0200 Subject: [PATCH 08/12] namesys: drop prefix args MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/coreapi/interface/name.go | 4 ++-- namesys/base.go | 10 +++++----- namesys/dns.go | 4 ++-- namesys/ipns_resolver_validation_test.go | 13 +++++++------ namesys/namesys.go | 4 ++-- namesys/proquint.go | 5 +++-- namesys/routing.go | 7 ++++--- 7 files changed, 25 insertions(+), 22 deletions(-) diff --git a/core/coreapi/interface/name.go b/core/coreapi/interface/name.go index 782f6835150..a02bc078748 100644 --- a/core/coreapi/interface/name.go +++ b/core/coreapi/interface/name.go @@ -40,7 +40,7 @@ type NameAPI interface { // Search is a version of Resolve which outputs paths as they are discovered, // reducing the time to first entry // - // Note that by default only the last path returned before the channel closes - // can be considered 'safe'. + // Note: by default, all paths read from the channel are considered unsafe, + // except the latest (last path in channel read buffer). Search(ctx context.Context, name string, opts ...options.NameResolveOption) (<-chan IpnsResult, error) } diff --git a/namesys/base.go b/namesys/base.go index 508847ac3bc..1906bdf5d9f 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -21,14 +21,14 @@ type resolver interface { } // resolve is a helper for implementing Resolver.ResolveN using resolveOnce. -func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) (path.Path, error) { +func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts) (path.Path, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() err := ErrResolveFailed var p path.Path - resCh := resolveAsync(ctx, r, name, options, prefix) + resCh := resolveAsync(ctx, r, name, options) for res := range resCh { p, err = res.Path, res.Err @@ -40,7 +40,7 @@ func resolve(ctx context.Context, r resolver, name string, options opts.ResolveO return p, err } -func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { +func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts) <-chan Result { resCh := r.resolveOnceAsync(ctx, name, options) depth := options.Depth outCh := make(chan Result, 1) @@ -86,8 +86,8 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res subCtx, cancelSub = context.WithCancel(ctx) defer cancelSub() - p := strings.TrimPrefix(res.value.String(), prefix) - subCh = resolveAsync(subCtx, r, p, subopts, prefix) + p := strings.TrimPrefix(res.value.String(), ipnsPrefix) + subCh = resolveAsync(subCtx, r, p, subopts) case res, ok := <-subCh: if !ok { subCh = nil diff --git a/namesys/dns.go b/namesys/dns.go index f4d37e654fa..d3f9e09561f 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -28,12 +28,12 @@ func NewDNSResolver() *DNSResolver { // Resolve implements Resolver. func (r *DNSResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { - return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") + return resolve(ctx, r, name, opts.ProcessOpts(options)) } // ResolveAsync implements Resolver. func (r *DNSResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { - return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/") + return resolveAsync(ctx, r, name, opts.ProcessOpts(options)) } type lookupRes struct { diff --git a/namesys/ipns_resolver_validation_test.go b/namesys/ipns_resolver_validation_test.go index e12841421a6..5bcef4e1474 100644 --- a/namesys/ipns_resolver_validation_test.go +++ b/namesys/ipns_resolver_validation_test.go @@ -5,9 +5,10 @@ import ( "testing" "time" - opts "github.com/ipfs/go-ipfs/namesys/opts" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + opts "github.com/ipfs/go-ipfs/namesys/opts" + testutil "gx/ipfs/QmNfQbgBfARAtrYsBguChX6VJ5nbjeoYy1KdC36aaYWqG8/go-testutil" u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util" routing "gx/ipfs/QmPmFeQ5oY5G6M7aBWggi5phxEPXwsQntE1DFcUzETULdp/go-libp2p-routing" @@ -57,7 +58,7 @@ func TestResolverValidation(t *testing.T) { } // Resolve entry - resp, err := resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/") + resp, err := resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts()) if err != nil { t.Fatal(err) } @@ -77,7 +78,7 @@ func TestResolverValidation(t *testing.T) { } // Record should fail validation because entry is expired - _, err = resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/") + _, err = resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts()) if err == nil { t.Fatal("ValidateIpnsRecord should have returned error") } @@ -99,7 +100,7 @@ func TestResolverValidation(t *testing.T) { // Record should fail validation because public key defined by // ipns path doesn't match record signature - _, err = resolve(ctx, resolver, id2.Pretty(), opts.DefaultResolveOpts(), "/ipns/") + _, err = resolve(ctx, resolver, id2.Pretty(), opts.DefaultResolveOpts()) if err == nil { t.Fatal("ValidateIpnsRecord should have failed signature verification") } @@ -117,7 +118,7 @@ func TestResolverValidation(t *testing.T) { // Record should fail validation because public key is not available // in peer store or on network - _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/") + _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts()) if err == nil { t.Fatal("ValidateIpnsRecord should have failed because public key was not found") } @@ -132,7 +133,7 @@ func TestResolverValidation(t *testing.T) { // public key is available in the peer store by looking it up in // the DHT, which causes the DHT to fetch it and cache it in the // peer store - _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/") + _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts()) if err != nil { t.Fatal(err) } diff --git a/namesys/namesys.go b/namesys/namesys.go index 83cd7dbccf8..50d302079bf 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -62,7 +62,7 @@ func (ns *mpns) Resolve(ctx context.Context, name string, options ...opts.Resolv return path.ParsePath("/ipfs/" + name) } - return resolve(ctx, ns, name, opts.ProcessOpts(options), "/ipns/") + return resolve(ctx, ns, name, opts.ProcessOpts(options)) } func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { @@ -79,7 +79,7 @@ func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.R return res } - return resolveAsync(ctx, ns, name, opts.ProcessOpts(options), "/ipns/") + return resolveAsync(ctx, ns, name, opts.ProcessOpts(options)) } // resolveOnce implements resolver. diff --git a/namesys/proquint.go b/namesys/proquint.go index 0caaf9497cb..89457931370 100644 --- a/namesys/proquint.go +++ b/namesys/proquint.go @@ -4,16 +4,17 @@ import ( "context" "errors" - opts "github.com/ipfs/go-ipfs/namesys/opts" proquint "gx/ipfs/QmYnf27kzqR2cxt6LFZdrAFJuQd6785fTkBvMuEj9EeRxM/proquint" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + + opts "github.com/ipfs/go-ipfs/namesys/opts" ) type ProquintResolver struct{} // Resolve implements Resolver. func (r *ProquintResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { - return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") + return resolve(ctx, r, name, opts.ProcessOpts(options)) } // resolveOnce implements resolver. Decodes the proquint string. diff --git a/namesys/routing.go b/namesys/routing.go index ef7e376e62f..25daafff4ef 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -5,9 +5,10 @@ import ( "strings" "time" - opts "github.com/ipfs/go-ipfs/namesys/opts" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + opts "github.com/ipfs/go-ipfs/namesys/opts" + cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" routing "gx/ipfs/QmPmFeQ5oY5G6M7aBWggi5phxEPXwsQntE1DFcUzETULdp/go-libp2p-routing" mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" @@ -39,12 +40,12 @@ func NewIpnsResolver(route routing.ValueStore) *IpnsResolver { // Resolve implements Resolver. func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { - return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") + return resolve(ctx, r, name, opts.ProcessOpts(options)) } // ResolveAsync implements Resolver. func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { - return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/") + return resolveAsync(ctx, r, name, opts.ProcessOpts(options)) } // resolveOnce implements resolver. Uses the IPFS routing system to From 462c8026befd1bf576facb97887d2f030aeb59fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Oct 2018 16:37:15 +0200 Subject: [PATCH 09/12] namesys: allow non /ipfs paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- namesys/base.go | 2 +- test/sharness/t0100-name.sh | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/namesys/base.go b/namesys/base.go index 1906bdf5d9f..a523a10bf0b 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -63,7 +63,7 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res return } log.Debugf("resolved %s to %s", name, res.value.String()) - if strings.HasPrefix(res.value.String(), "/ipfs/") { + if !strings.HasPrefix(res.value.String(), ipnsPrefix) { outCh <- Result{Path: res.value} break } diff --git a/test/sharness/t0100-name.sh b/test/sharness/t0100-name.sh index 019a5f94776..7a23fd1968d 100755 --- a/test/sharness/t0100-name.sh +++ b/test/sharness/t0100-name.sh @@ -114,6 +114,27 @@ test_expect_success "publish an explicit node ID as key name looks good" ' test_cmp expected_node_id_publish actual_node_id_publish ' +# test IPNS + IPLD +test_expect_success "'ipfs dag put' succeeds" ' + HELLO_HASH="$(echo "\"hello world\"" | ipfs dag put)" && + OBJECT_HASH="$(echo "{\"thing\": {\"/\": \"${HELLO_HASH}\" }}" | ipfs dag put)" +' +test_expect_success "'ipfs name publish --allow-offline /ipld/...' succeeds" ' + PEERID=`ipfs id --format=""` && + test_check_peerid "${PEERID}" && + ipfs name publish --allow-offline "/ipld/$OBJECT_HASH/thing" >publish_out +' +test_expect_success "publish a path looks good" ' + echo "Published to ${PEERID}: /ipld/$OBJECT_HASH/thing" >expected3 && + test_cmp expected3 publish_out +' +test_expect_success "'ipfs name resolve' succeeds" ' + ipfs name resolve "$PEERID" >output +' +test_expect_success "resolve output looks good" ' + printf "/ipld/%s/thing\n" "$OBJECT_HASH" >expected4 && + test_cmp expected4 output +' # test publishing nothing From 734615ac98d0ccadbb33d7cf8ba3ebdb438cfb1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Oct 2018 16:53:45 +0200 Subject: [PATCH 10/12] namesys: avoid defer in loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- namesys/base.go | 15 +++++++++++++-- namesys/namesys.go | 4 ++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/namesys/base.go b/namesys/base.go index a523a10bf0b..064286ab45c 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -60,6 +60,9 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res if res.err != nil { outCh <- Result{Err: res.err} + if cancelSub != nil { + cancelSub() + } return } log.Debugf("resolved %s to %s", name, res.value.String()) @@ -79,12 +82,11 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res } var subCtx context.Context - if subCh != nil { + if cancelSub != nil { // Cancel previous recursive resolve since it won't be used anyways cancelSub() } subCtx, cancelSub = context.WithCancel(ctx) - defer cancelSub() p := strings.TrimPrefix(res.value.String(), ipnsPrefix) subCh = resolveAsync(subCtx, r, p, subopts) @@ -97,12 +99,21 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res select { case outCh <- res: case <-ctx.Done(): + if cancelSub != nil { + cancelSub() + } return } case <-ctx.Done(): + if cancelSub != nil { + cancelSub() + } return } if resCh == nil && subCh == nil { + if cancelSub != nil { + cancelSub() + } return } } diff --git a/namesys/namesys.go b/namesys/namesys.go index 50d302079bf..674146e575f 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -86,8 +86,8 @@ func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.R func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { out := make(chan onceResult, 1) - if !strings.HasPrefix(name, "/ipns/") { - name = "/ipns/" + name + if !strings.HasPrefix(name, ipnsPrefix) { + name = ipnsPrefix + name } segments := strings.SplitN(name, "/", 4) if len(segments) < 3 || segments[0] != "" { From 7fcf56e8e532997f7b93f51587b74ebf7e37c5b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 16 Oct 2018 17:45:13 +0200 Subject: [PATCH 11/12] namesys: select on output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- namesys/base.go | 37 +++++++++++++++++-------------------- namesys/dns.go | 10 ++-------- namesys/namesys.go | 19 +++++++++---------- namesys/routing.go | 20 ++++---------------- 4 files changed, 32 insertions(+), 54 deletions(-) diff --git a/namesys/base.go b/namesys/base.go index 064286ab45c..28bc87dad75 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -49,6 +49,11 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res defer close(outCh) var subCh <-chan Result var cancelSub context.CancelFunc + defer func() { + if cancelSub != nil { + cancelSub() + } + }() for { select { @@ -59,20 +64,17 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res } if res.err != nil { - outCh <- Result{Err: res.err} - if cancelSub != nil { - cancelSub() - } + emitResult(ctx, outCh, Result{Err: res.err}) return } log.Debugf("resolved %s to %s", name, res.value.String()) if !strings.HasPrefix(res.value.String(), ipnsPrefix) { - outCh <- Result{Path: res.value} + emitResult(ctx, outCh, Result{Path: res.value}) break } if depth == 1 { - outCh <- Result{Path: res.value, Err: ErrResolveRecursion} + emitResult(ctx, outCh, Result{Path: res.value, Err: ErrResolveRecursion}) break } @@ -87,6 +89,7 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res cancelSub() } subCtx, cancelSub = context.WithCancel(ctx) + _ = cancelSub p := strings.TrimPrefix(res.value.String(), ipnsPrefix) subCh = resolveAsync(subCtx, r, p, subopts) @@ -96,27 +99,21 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res break } - select { - case outCh <- res: - case <-ctx.Done(): - if cancelSub != nil { - cancelSub() - } - return - } + emitResult(ctx, outCh, res) case <-ctx.Done(): - if cancelSub != nil { - cancelSub() - } return } if resCh == nil && subCh == nil { - if cancelSub != nil { - cancelSub() - } return } } }() return outCh } + +func emitResult(ctx context.Context, outCh chan<- Result, r Result) { + select { + case outCh <- r: + case <-ctx.Done(): + } +} diff --git a/namesys/dns.go b/namesys/dns.go index d3f9e09561f..bd62b7d220f 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -80,10 +80,7 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options } if subRes.error == nil { p, err := appendPath(subRes.path) - select { - case out <- onceResult{value: p, err: err}: - case <-ctx.Done(): - } + emitOnceResult(ctx, out, onceResult{value: p, err: err}) return } case rootRes, ok := <-rootChan: @@ -93,10 +90,7 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options } if rootRes.error == nil { p, err := appendPath(rootRes.path) - select { - case out <- onceResult{value: p, err: err}: - case <-ctx.Done(): - } + emitOnceResult(ctx, out, onceResult{value: p, err: err}) } case <-ctx.Done(): return diff --git a/namesys/namesys.go b/namesys/namesys.go index 674146e575f..aa37a93fe9f 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -141,19 +141,11 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts. if len(segments) > 3 { p, err := path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) if err != nil { - select { - case out <- onceResult{value: p, err: err}: - case <-ctx.Done(): - } - return + emitOnceResult(ctx, out, onceResult{value: p, ttl: res.ttl, err: err}) } } - select { - case out <- onceResult{value: p, ttl: res.ttl, err: res.err}: - case <-ctx.Done(): - return - } + emitOnceResult(ctx, out, onceResult{value: p, ttl: res.ttl, err: res.err}) case <-ctx.Done(): return } @@ -163,6 +155,13 @@ func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts. return out } +func emitOnceResult(ctx context.Context, outCh chan<- onceResult, r onceResult) { + select { + case outCh <- r: + case <-ctx.Done(): + } +} + // Publish implements Publisher func (ns *mpns) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error { return ns.PublishWithEOL(ctx, name, value, time.Now().Add(DefaultRecordTTL)) diff --git a/namesys/routing.go b/namesys/routing.go index 25daafff4ef..76aa8603471 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -112,10 +112,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option err = proto.Unmarshal(val, entry) if err != nil { log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err) - select { - case out <- onceResult{err: err}: - case <-ctx.Done(): - } + emitOnceResult(ctx, out, onceResult{err: err}) return } @@ -129,10 +126,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option // Not a multihash, probably a new style record p, err = path.ParsePath(string(entry.GetValue())) if err != nil { - select { - case out <- onceResult{err: err}: - case <-ctx.Done(): - } + emitOnceResult(ctx, out, onceResult{err: err}) return } } @@ -154,17 +148,11 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option } default: log.Errorf("encountered error when parsing EOL: %s", err) - select { - case out <- onceResult{err: err}: - case <-ctx.Done(): - } + emitOnceResult(ctx, out, onceResult{err: err}) return } - select { - case out <- onceResult{value: p, ttl: ttl}: - case <-ctx.Done(): - } + emitOnceResult(ctx, out, onceResult{value: p, ttl: ttl}) case <-ctx.Done(): return } From 6207222211db8bfd8f83e05508acce17397d6808 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 18 Oct 2018 15:02:05 +0200 Subject: [PATCH 12/12] namesys: doc on emitResult MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- namesys/base.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/namesys/base.go b/namesys/base.go index 28bc87dad75..f90e8add164 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -99,6 +99,8 @@ func resolveAsync(ctx context.Context, r resolver, name string, options opts.Res break } + // We don't bother returning here in case of context timeout as there is + // no good reason to do that, and we may still be able to emit a result emitResult(ctx, outCh, res) case <-ctx.Done(): return