From 6e2b1667733c74ca25780d29829474a5cda15baa Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 10 Mar 2021 12:14:50 -0800 Subject: [PATCH] split core/commands/dag into individual files for different subcommands --- core/commands/dag/dag.go | 535 ++--------------------------------- core/commands/dag/export.go | 155 ++++++++++ core/commands/dag/get.go | 38 +++ core/commands/dag/import.go | 201 +++++++++++++ core/commands/dag/put.go | 80 ++++++ core/commands/dag/resolve.go | 25 ++ core/commands/dag/stat.go | 90 ++++++ 7 files changed, 606 insertions(+), 518 deletions(-) create mode 100644 core/commands/dag/export.go create mode 100644 core/commands/dag/get.go create mode 100644 core/commands/dag/import.go create mode 100644 core/commands/dag/put.go create mode 100644 core/commands/dag/resolve.go create mode 100644 core/commands/dag/stat.go diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index bee389673d8..005847039d7 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -1,37 +1,18 @@ package dagcmd import ( - "errors" "fmt" "io" - "math" - "os" - "strings" - "time" "github.com/ipfs/go-ipfs/core/commands/cmdenv" - "github.com/ipfs/go-ipfs/core/commands/e" - "github.com/ipfs/go-ipfs/core/coredag" - iface "github.com/ipfs/interface-go-ipfs-core" cid "github.com/ipfs/go-cid" cidenc "github.com/ipfs/go-cidutil/cidenc" cmds "github.com/ipfs/go-ipfs-cmds" - files "github.com/ipfs/go-ipfs-files" - ipld "github.com/ipfs/go-ipld-format" - mdag "github.com/ipfs/go-merkledag" - traverse "github.com/ipfs/go-merkledag/traverse" ipfspath "github.com/ipfs/go-path" - "github.com/ipfs/interface-go-ipfs-core/options" - path "github.com/ipfs/interface-go-ipfs-core/path" - mh "github.com/multiformats/go-multihash" - - gocar "github.com/ipld/go-car" //gipfree "github.com/ipld/go-ipld-prime/impl/free" //gipselector "github.com/ipld/go-ipld-prime/traversal/selector" //gipselectorbuilder "github.com/ipld/go-ipld-prime/traversal/selector/builder" - - "github.com/cheggaaa/pb" ) const ( @@ -40,6 +21,7 @@ const ( pinRootsOptionName = "pin-roots" ) +// DagCmd provides a subset of commands for interacting with ipld dag objects var DagCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Interact with ipld dag objects.", @@ -75,11 +57,14 @@ type ResolveOutput struct { type CarImportOutput struct { Root RootMeta } + +// RootMeta is the metadata for a root pinning response type RootMeta struct { Cid cid.Cid PinErrorMsg string } +// DagPutCmd is a command for adding a dag node var DagPutCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Add a dag node to ipfs.", @@ -97,71 +82,7 @@ into an object of the specified format. cmds.BoolOption("pin", "Pin this object when adding."), cmds.StringOption("hash", "Hash function to use").WithDefault(""), }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - api, err := cmdenv.GetApi(env, req) - if err != nil { - return err - } - - ienc, _ := req.Options["input-enc"].(string) - format, _ := req.Options["format"].(string) - hash, _ := req.Options["hash"].(string) - dopin, _ := req.Options["pin"].(bool) - - // mhType tells inputParser which hash should be used. MaxUint64 means 'use - // default hash' (sha256 for cbor, sha1 for git..) - mhType := uint64(math.MaxUint64) - - if hash != "" { - var ok bool - mhType, ok = mh.Names[hash] - if !ok { - return fmt.Errorf("%s in not a valid multihash name", hash) - } - } - - var adder ipld.NodeAdder = api.Dag() - if dopin { - adder = api.Dag().Pinning() - } - b := ipld.NewBatch(req.Context, adder) - - it := req.Files.Entries() - for it.Next() { - file := files.FileFromEntry(it) - if file == nil { - return fmt.Errorf("expected a regular file") - } - nds, err := coredag.ParseInputs(ienc, format, file, mhType, -1) - if err != nil { - return err - } - if len(nds) == 0 { - return fmt.Errorf("no node returned from ParseInputs") - } - - for _, nd := range nds { - err := b.Add(req.Context, nd) - if err != nil { - return err - } - } - - cid := nds[0].Cid() - if err := res.Emit(&OutputObject{Cid: cid}); err != nil { - return err - } - } - if it.Err() != nil { - return it.Err() - } - - if err := b.Commit(); err != nil { - return err - } - - return nil - }, + Run: dagPut, Type: OutputObject{}, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OutputObject) error { @@ -175,6 +96,7 @@ into an object of the specified format. }, } +// DagGetCmd is a command for getting a dag node from IPFS var DagGetCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Get a dag node from ipfs.", @@ -186,33 +108,7 @@ format. Arguments: []cmds.Argument{ cmds.StringArg("ref", true, false, "The object to get").EnableStdin(), }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - api, err := cmdenv.GetApi(env, req) - if err != nil { - return err - } - - rp, err := api.ResolvePath(req.Context, path.New(req.Arguments[0])) - if err != nil { - return err - } - - obj, err := api.Dag().Get(req.Context, rp.Cid()) - if err != nil { - return err - } - - var out interface{} = obj - if len(rp.Remainder()) > 0 { - rem := strings.Split(rp.Remainder(), "/") - final, _, err := obj.Resolve(rem) - if err != nil { - return err - } - out = final - } - return cmds.EmitOnce(res, &out) - }, + Run: dagGet, } // DagResolveCmd returns address of highest block within a path and a path remainder @@ -226,22 +122,7 @@ var DagResolveCmd = &cmds.Command{ Arguments: []cmds.Argument{ cmds.StringArg("ref", true, false, "The path to resolve").EnableStdin(), }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - api, err := cmdenv.GetApi(env, req) - if err != nil { - return err - } - - rp, err := api.ResolvePath(req.Context, path.New(req.Arguments[0])) - if err != nil { - return err - } - - return cmds.EmitOnce(res, &ResolveOutput{ - Cid: rp.Cid(), - RemPath: rp.Remainder(), - }) - }, + Run: dagResolve, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *ResolveOutput) error { var ( @@ -280,6 +161,7 @@ type importResult struct { err error } +// DagImportCmd is a command for importing a car to ipfs var DagImportCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Import the contents of .car files", @@ -312,107 +194,7 @@ Maximum supported CAR version: 1 cmds.BoolOption(pinRootsOptionName, "Pin optional roots listed in the .car headers after importing.").WithDefault(true), }, Type: CarImportOutput{}, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - - node, err := cmdenv.GetNode(env) - if err != nil { - return err - } - - api, err := cmdenv.GetApi(env, req) - if err != nil { - return err - } - - // on import ensure we do not reach out to the network for any reason - // if a pin based on what is imported + what is in the blockstore - // isn't possible: tough luck - api, err = api.WithOptions(options.Api.Offline(true)) - if err != nil { - return err - } - - // grab a pinlock ( which doubles as a GC lock ) so that regardless of the - // size of the streamed-in cars nothing will disappear on us before we had - // a chance to roots that may show up at the very end - // This is especially important for use cases like dagger: - // ipfs dag import $( ... | ipfs-dagger --stdout=carfifos ) - // - unlocker := node.Blockstore.PinLock() - defer unlocker.Unlock() - - doPinRoots, _ := req.Options[pinRootsOptionName].(bool) - - retCh := make(chan importResult, 1) - go importWorker(req, res, api, retCh) - - done := <-retCh - if done.err != nil { - return done.err - } - - // It is not guaranteed that a root in a header is actually present in the same ( or any ) - // .car file. This is the case in version 1, and ideally in further versions too - // Accumulate any root CID seen in a header, and supplement its actual node if/when encountered - // We will attempt a pin *only* at the end in case all car files were well formed - // - // The boolean value indicates whether we have encountered the root within the car file's - roots := done.roots - - // opportunistic pinning: try whatever sticks - if doPinRoots { - - var failedPins int - for c := range roots { - - // We need to re-retrieve a block, convert it to ipld, and feed it - // to the Pinning interface, sigh... - // - // If we didn't have the problem of inability to take multiple pinlocks, - // we could use the api directly like so (though internally it does the same): - // - // // not ideal, but the pinning api takes only paths :( - // rp := path.NewResolvedPath( - // ipfspath.FromCid(c), - // c, - // c, - // "", - // ) - // - // if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil { - - ret := RootMeta{Cid: c} - - if block, err := node.Blockstore.Get(c); err != nil { - ret.PinErrorMsg = err.Error() - } else if nd, err := ipld.Decode(block); err != nil { - ret.PinErrorMsg = err.Error() - } else if err := node.Pinning.Pin(req.Context, nd, true); err != nil { - ret.PinErrorMsg = err.Error() - } else if err := node.Pinning.Flush(req.Context); err != nil { - ret.PinErrorMsg = err.Error() - } - - if ret.PinErrorMsg != "" { - failedPins++ - } - - if err := res.Emit(&CarImportOutput{Root: ret}); err != nil { - return err - } - } - - if failedPins > 0 { - return fmt.Errorf( - "unable to pin all roots: %d out of %d failed", - failedPins, - len(roots), - ) - } - } - - return nil - }, + Run: dagImport, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, event *CarImportOutput) error { @@ -443,88 +225,7 @@ Maximum supported CAR version: 1 }, } -func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) { - - // this is *not* a transaction - // it is simply a way to relieve pressure on the blockstore - // similar to pinner.Pin/pinner.Flush - batch := ipld.NewBatch(req.Context, api.Dag()) - - roots := make(map[cid.Cid]struct{}) - - it := req.Files.Entries() - for it.Next() { - - file := files.FileFromEntry(it) - if file == nil { - ret <- importResult{err: errors.New("expected a file handle")} - return - } - - // wrap a defer-closer-scope - // - // every single file in it() is already open before we start - // just close here sooner rather than later for neatness - // and to surface potential errors writing on closed fifos - // this won't/can't help with not running out of handles - err := func() error { - defer file.Close() - - car, err := gocar.NewCarReader(file) - if err != nil { - return err - } - - // Be explicit here, until the spec is finished - if car.Header.Version != 1 { - return errors.New("only car files version 1 supported at present") - } - - for _, c := range car.Header.Roots { - roots[c] = struct{}{} - } - - for { - block, err := car.Next() - if err != nil && err != io.EOF { - return err - } else if block == nil { - break - } - - // the double-decode is suboptimal, but we need it for batching - nd, err := ipld.Decode(block) - if err != nil { - return err - } - - if err := batch.Add(req.Context, nd); err != nil { - return err - } - } - - return nil - }() - - if err != nil { - ret <- importResult{err: err} - return - } - } - - if err := it.Err(); err != nil { - ret <- importResult{err: err} - return - } - - if err := batch.Commit(); err != nil { - ret <- importResult{err: err} - return - } - - ret <- importResult{roots: roots} -} - +// DagExportCmd is a command for exporting an ipfs dag to a car var DagExportCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Streams the selected DAG as a .car stream on stdout.", @@ -540,145 +241,13 @@ The output of blocks happens in strict DAG-traversal, first-seen, order. Options: []cmds.Option{ cmds.BoolOption(progressOptionName, "p", "Display progress on CLI. Defaults to true when STDERR is a TTY."), }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - - c, err := cid.Decode(req.Arguments[0]) - if err != nil { - return fmt.Errorf( - "unable to parse root specification (currently only bare CIDs are supported): %s", - err, - ) - } - - api, err := cmdenv.GetApi(env, req) - if err != nil { - return err - } - - // Code disabled until descent-issue in go-ipld-prime is fixed - // https://github.com/ribasushi/gip-muddle-up - // - // sb := gipselectorbuilder.NewSelectorSpecBuilder(gipfree.NodeBuilder()) - // car := gocar.NewSelectiveCar( - // req.Context, - // , - // []gocar.Dag{gocar.Dag{ - // Root: c, - // Selector: sb.ExploreRecursive( - // gipselector.RecursionLimitNone(), - // sb.ExploreAll(sb.ExploreRecursiveEdge()), - // ).Node(), - // }}, - // ) - // ... - // if err := car.Write(pipeW); err != nil {} - - pipeR, pipeW := io.Pipe() - - errCh := make(chan error, 2) // we only report the 1st error - go func() { - defer func() { - if err := pipeW.Close(); err != nil { - errCh <- fmt.Errorf("stream flush failed: %s", err) - } - close(errCh) - }() - - if err := gocar.WriteCar( - req.Context, - mdag.NewSession( - req.Context, - api.Dag(), - ), - []cid.Cid{c}, - pipeW, - ); err != nil { - errCh <- err - } - }() - - if err := res.Emit(pipeR); err != nil { - pipeR.Close() // ignore the error if any - return err - } - - err = <-errCh - - // minimal user friendliness - if err != nil && - err == ipld.ErrNotFound { - explicitOffline, _ := req.Options["offline"].(bool) - if explicitOffline { - err = fmt.Errorf("%s (currently offline, perhaps retry without the offline flag)", err) - } else { - node, envErr := cmdenv.GetNode(env) - if envErr == nil && !node.IsOnline { - err = fmt.Errorf("%s (currently offline, perhaps retry after attaching to the network)", err) - } - } - } - - return err - }, + Run: dagExport, PostRun: cmds.PostRunMap{ - cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error { - - var showProgress bool - val, specified := res.Request().Options[progressOptionName] - if !specified { - // default based on TTY availability - errStat, _ := os.Stderr.Stat() - if 0 != (errStat.Mode() & os.ModeCharDevice) { - showProgress = true - } - } else if val.(bool) { - showProgress = true - } - - // simple passthrough, no progress - if !showProgress { - return cmds.Copy(re, res) - } - - bar := pb.New64(0).SetUnits(pb.U_BYTES) - bar.Output = os.Stderr - bar.ShowSpeed = true - bar.ShowElapsedTime = true - bar.RefreshRate = 500 * time.Millisecond - bar.Start() - - var processedOneResponse bool - for { - v, err := res.Next() - if err == io.EOF { - - // We only write the final bar update on success - // On error it looks too weird - bar.Finish() - - return re.Close() - } else if err != nil { - return re.CloseWithError(err) - } else if processedOneResponse { - return re.CloseWithError(errors.New("unexpected multipart response during emit, please file a bugreport")) - } - - r, ok := v.(io.Reader) - if !ok { - // some sort of encoded response, this should not be happening - return errors.New("unexpected non-stream passed to PostRun: please file a bugreport") - } - - processedOneResponse = true - - if err := re.Emit(bar.NewProxyReader(r)); err != nil { - return err - } - } - }, + cmds.CLI: finishCLIExport, }, } +// DagStat is a dag stat command response type DagStat struct { Size uint64 NumBlocks int64 @@ -688,6 +257,7 @@ func (s *DagStat) String() string { return fmt.Sprintf("Size: %d, NumBlocks: %d", s.Size, s.NumBlocks) } +// DagStatCmd is a command for getting size information about an ipfs-stored dag var DagStatCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Gets stats for a DAG", @@ -704,81 +274,10 @@ Note: This command skips duplicate blocks in reporting both size and the number Options: []cmds.Option{ cmds.BoolOption(progressOptionName, "p", "Return progressive data while reading through the DAG").WithDefault(true), }, - Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - progressive := req.Options[progressOptionName].(bool) - - api, err := cmdenv.GetApi(env, req) - if err != nil { - return err - } - - rp, err := api.ResolvePath(req.Context, path.New(req.Arguments[0])) - if err != nil { - return err - } - - if len(rp.Remainder()) > 0 { - return fmt.Errorf("cannot return size for anything other than a DAG with a root CID") - } - - nodeGetter := mdag.NewSession(req.Context, api.Dag()) - obj, err := nodeGetter.Get(req.Context, rp.Cid()) - if err != nil { - return err - } - - dagstats := &DagStat{} - err = traverse.Traverse(obj, traverse.Options{ - DAG: nodeGetter, - Order: traverse.DFSPre, - Func: func(current traverse.State) error { - dagstats.Size += uint64(len(current.Node.RawData())) - dagstats.NumBlocks++ - - if progressive { - if err := res.Emit(dagstats); err != nil { - return err - } - } - return nil - }, - ErrFunc: nil, - SkipDuplicates: true, - }) - if err != nil { - return fmt.Errorf("error traversing DAG: %w", err) - } - - if !progressive { - if err := res.Emit(dagstats); err != nil { - return err - } - } - - return nil - }, + Run: dagStat, Type: DagStat{}, PostRun: cmds.PostRunMap{ - cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error { - var dagStats *DagStat - for { - v, err := res.Next() - if err != nil { - if err == io.EOF { - break - } - return err - } - - out, ok := v.(*DagStat) - if !ok { - return e.TypeErr(out, v) - } - dagStats = out - fmt.Fprintf(os.Stderr, "%v\r", out) - } - return re.Emit(dagStats) - }, + cmds.CLI: finishCLIStat, }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, event *DagStat) error { diff --git a/core/commands/dag/export.go b/core/commands/dag/export.go new file mode 100644 index 00000000000..bff9dc47929 --- /dev/null +++ b/core/commands/dag/export.go @@ -0,0 +1,155 @@ +package dagcmd + +import ( + "errors" + "fmt" + "io" + "os" + "time" + + "github.com/cheggaaa/pb" + cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + ipld "github.com/ipfs/go-ipld-format" + mdag "github.com/ipfs/go-merkledag" + + cmds "github.com/ipfs/go-ipfs-cmds" + gocar "github.com/ipld/go-car" +) + +func dagExport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + + c, err := cid.Decode(req.Arguments[0]) + if err != nil { + return fmt.Errorf( + "unable to parse root specification (currently only bare CIDs are supported): %s", + err, + ) + } + + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + + // Code disabled until descent-issue in go-ipld-prime is fixed + // https://github.com/ribasushi/gip-muddle-up + // + // sb := gipselectorbuilder.NewSelectorSpecBuilder(gipfree.NodeBuilder()) + // car := gocar.NewSelectiveCar( + // req.Context, + // , + // []gocar.Dag{gocar.Dag{ + // Root: c, + // Selector: sb.ExploreRecursive( + // gipselector.RecursionLimitNone(), + // sb.ExploreAll(sb.ExploreRecursiveEdge()), + // ).Node(), + // }}, + // ) + // ... + // if err := car.Write(pipeW); err != nil {} + + pipeR, pipeW := io.Pipe() + + errCh := make(chan error, 2) // we only report the 1st error + go func() { + defer func() { + if err := pipeW.Close(); err != nil { + errCh <- fmt.Errorf("stream flush failed: %s", err) + } + close(errCh) + }() + + if err := gocar.WriteCar( + req.Context, + mdag.NewSession( + req.Context, + api.Dag(), + ), + []cid.Cid{c}, + pipeW, + ); err != nil { + errCh <- err + } + }() + + if err := res.Emit(pipeR); err != nil { + pipeR.Close() // ignore the error if any + return err + } + + err = <-errCh + + // minimal user friendliness + if err != nil && + err == ipld.ErrNotFound { + explicitOffline, _ := req.Options["offline"].(bool) + if explicitOffline { + err = fmt.Errorf("%s (currently offline, perhaps retry without the offline flag)", err) + } else { + node, envErr := cmdenv.GetNode(env) + if envErr == nil && !node.IsOnline { + err = fmt.Errorf("%s (currently offline, perhaps retry after attaching to the network)", err) + } + } + } + + return err +} + +func finishCLIExport(res cmds.Response, re cmds.ResponseEmitter) error { + + var showProgress bool + val, specified := res.Request().Options[progressOptionName] + if !specified { + // default based on TTY availability + errStat, _ := os.Stderr.Stat() + if 0 != (errStat.Mode() & os.ModeCharDevice) { + showProgress = true + } + } else if val.(bool) { + showProgress = true + } + + // simple passthrough, no progress + if !showProgress { + return cmds.Copy(re, res) + } + + bar := pb.New64(0).SetUnits(pb.U_BYTES) + bar.Output = os.Stderr + bar.ShowSpeed = true + bar.ShowElapsedTime = true + bar.RefreshRate = 500 * time.Millisecond + bar.Start() + + var processedOneResponse bool + for { + v, err := res.Next() + if err == io.EOF { + + // We only write the final bar update on success + // On error it looks too weird + bar.Finish() + + return re.Close() + } else if err != nil { + return re.CloseWithError(err) + } else if processedOneResponse { + return re.CloseWithError(errors.New("unexpected multipart response during emit, please file a bugreport")) + } + + r, ok := v.(io.Reader) + if !ok { + // some sort of encoded response, this should not be happening + return errors.New("unexpected non-stream passed to PostRun: please file a bugreport") + } + + processedOneResponse = true + + if err := re.Emit(bar.NewProxyReader(r)); err != nil { + return err + } + } +} diff --git a/core/commands/dag/get.go b/core/commands/dag/get.go new file mode 100644 index 00000000000..a5f92273ae3 --- /dev/null +++ b/core/commands/dag/get.go @@ -0,0 +1,38 @@ +package dagcmd + +import ( + "strings" + + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/interface-go-ipfs-core/path" + + cmds "github.com/ipfs/go-ipfs-cmds" +) + +func dagGet(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + + rp, err := api.ResolvePath(req.Context, path.New(req.Arguments[0])) + if err != nil { + return err + } + + obj, err := api.Dag().Get(req.Context, rp.Cid()) + if err != nil { + return err + } + + var out interface{} = obj + if len(rp.Remainder()) > 0 { + rem := strings.Split(rp.Remainder(), "/") + final, _, err := obj.Resolve(rem) + if err != nil { + return err + } + out = final + } + return cmds.EmitOnce(res, &out) +} diff --git a/core/commands/dag/import.go b/core/commands/dag/import.go new file mode 100644 index 00000000000..b83af8b911d --- /dev/null +++ b/core/commands/dag/import.go @@ -0,0 +1,201 @@ +package dagcmd + +import ( + "errors" + "fmt" + "io" + + cid "github.com/ipfs/go-cid" + files "github.com/ipfs/go-ipfs-files" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + ipld "github.com/ipfs/go-ipld-format" + iface "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/options" + + cmds "github.com/ipfs/go-ipfs-cmds" + gocar "github.com/ipld/go-car" +) + +func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + + node, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + + // on import ensure we do not reach out to the network for any reason + // if a pin based on what is imported + what is in the blockstore + // isn't possible: tough luck + api, err = api.WithOptions(options.Api.Offline(true)) + if err != nil { + return err + } + + // grab a pinlock ( which doubles as a GC lock ) so that regardless of the + // size of the streamed-in cars nothing will disappear on us before we had + // a chance to roots that may show up at the very end + // This is especially important for use cases like dagger: + // ipfs dag import $( ... | ipfs-dagger --stdout=carfifos ) + // + unlocker := node.Blockstore.PinLock() + defer unlocker.Unlock() + + doPinRoots, _ := req.Options[pinRootsOptionName].(bool) + + retCh := make(chan importResult, 1) + go importWorker(req, res, api, retCh) + + done := <-retCh + if done.err != nil { + return done.err + } + + // It is not guaranteed that a root in a header is actually present in the same ( or any ) + // .car file. This is the case in version 1, and ideally in further versions too + // Accumulate any root CID seen in a header, and supplement its actual node if/when encountered + // We will attempt a pin *only* at the end in case all car files were well formed + // + // The boolean value indicates whether we have encountered the root within the car file's + roots := done.roots + + // opportunistic pinning: try whatever sticks + if doPinRoots { + + var failedPins int + for c := range roots { + + // We need to re-retrieve a block, convert it to ipld, and feed it + // to the Pinning interface, sigh... + // + // If we didn't have the problem of inability to take multiple pinlocks, + // we could use the api directly like so (though internally it does the same): + // + // // not ideal, but the pinning api takes only paths :( + // rp := path.NewResolvedPath( + // ipfspath.FromCid(c), + // c, + // c, + // "", + // ) + // + // if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil { + + ret := RootMeta{Cid: c} + + if block, err := node.Blockstore.Get(c); err != nil { + ret.PinErrorMsg = err.Error() + } else if nd, err := ipld.Decode(block); err != nil { + ret.PinErrorMsg = err.Error() + } else if err := node.Pinning.Pin(req.Context, nd, true); err != nil { + ret.PinErrorMsg = err.Error() + } else if err := node.Pinning.Flush(req.Context); err != nil { + ret.PinErrorMsg = err.Error() + } + + if ret.PinErrorMsg != "" { + failedPins++ + } + + if err := res.Emit(&CarImportOutput{Root: ret}); err != nil { + return err + } + } + + if failedPins > 0 { + return fmt.Errorf( + "unable to pin all roots: %d out of %d failed", + failedPins, + len(roots), + ) + } + } + + return nil +} + +func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) { + + // this is *not* a transaction + // it is simply a way to relieve pressure on the blockstore + // similar to pinner.Pin/pinner.Flush + batch := ipld.NewBatch(req.Context, api.Dag()) + + roots := make(map[cid.Cid]struct{}) + + it := req.Files.Entries() + for it.Next() { + + file := files.FileFromEntry(it) + if file == nil { + ret <- importResult{err: errors.New("expected a file handle")} + return + } + + // wrap a defer-closer-scope + // + // every single file in it() is already open before we start + // just close here sooner rather than later for neatness + // and to surface potential errors writing on closed fifos + // this won't/can't help with not running out of handles + err := func() error { + defer file.Close() + + car, err := gocar.NewCarReader(file) + if err != nil { + return err + } + + // Be explicit here, until the spec is finished + if car.Header.Version != 1 { + return errors.New("only car files version 1 supported at present") + } + + for _, c := range car.Header.Roots { + roots[c] = struct{}{} + } + + for { + block, err := car.Next() + if err != nil && err != io.EOF { + return err + } else if block == nil { + break + } + + // the double-decode is suboptimal, but we need it for batching + nd, err := ipld.Decode(block) + if err != nil { + return err + } + + if err := batch.Add(req.Context, nd); err != nil { + return err + } + } + + return nil + }() + + if err != nil { + ret <- importResult{err: err} + return + } + } + + if err := it.Err(); err != nil { + ret <- importResult{err: err} + return + } + + if err := batch.Commit(); err != nil { + ret <- importResult{err: err} + return + } + + ret <- importResult{roots: roots} +} diff --git a/core/commands/dag/put.go b/core/commands/dag/put.go new file mode 100644 index 00000000000..7f6e744c872 --- /dev/null +++ b/core/commands/dag/put.go @@ -0,0 +1,80 @@ +package dagcmd + +import ( + "fmt" + "math" + + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/coredag" + + cmds "github.com/ipfs/go-ipfs-cmds" + files "github.com/ipfs/go-ipfs-files" + ipld "github.com/ipfs/go-ipld-format" + mh "github.com/multiformats/go-multihash" +) + +func dagPut(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + + ienc, _ := req.Options["input-enc"].(string) + format, _ := req.Options["format"].(string) + hash, _ := req.Options["hash"].(string) + dopin, _ := req.Options["pin"].(bool) + + // mhType tells inputParser which hash should be used. MaxUint64 means 'use + // default hash' (sha256 for cbor, sha1 for git..) + mhType := uint64(math.MaxUint64) + + if hash != "" { + var ok bool + mhType, ok = mh.Names[hash] + if !ok { + return fmt.Errorf("%s in not a valid multihash name", hash) + } + } + + var adder ipld.NodeAdder = api.Dag() + if dopin { + adder = api.Dag().Pinning() + } + b := ipld.NewBatch(req.Context, adder) + + it := req.Files.Entries() + for it.Next() { + file := files.FileFromEntry(it) + if file == nil { + return fmt.Errorf("expected a regular file") + } + nds, err := coredag.ParseInputs(ienc, format, file, mhType, -1) + if err != nil { + return err + } + if len(nds) == 0 { + return fmt.Errorf("no node returned from ParseInputs") + } + + for _, nd := range nds { + err := b.Add(req.Context, nd) + if err != nil { + return err + } + } + + cid := nds[0].Cid() + if err := res.Emit(&OutputObject{Cid: cid}); err != nil { + return err + } + } + if it.Err() != nil { + return it.Err() + } + + if err := b.Commit(); err != nil { + return err + } + + return nil +} diff --git a/core/commands/dag/resolve.go b/core/commands/dag/resolve.go new file mode 100644 index 00000000000..836138368b0 --- /dev/null +++ b/core/commands/dag/resolve.go @@ -0,0 +1,25 @@ +package dagcmd + +import ( + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/interface-go-ipfs-core/path" + + cmds "github.com/ipfs/go-ipfs-cmds" +) + +func dagResolve(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + + rp, err := api.ResolvePath(req.Context, path.New(req.Arguments[0])) + if err != nil { + return err + } + + return cmds.EmitOnce(res, &ResolveOutput{ + Cid: rp.Cid(), + RemPath: rp.Remainder(), + }) +} diff --git a/core/commands/dag/stat.go b/core/commands/dag/stat.go new file mode 100644 index 00000000000..3e3336f2322 --- /dev/null +++ b/core/commands/dag/stat.go @@ -0,0 +1,90 @@ +package dagcmd + +import ( + "fmt" + "io" + "os" + + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/core/commands/e" + "github.com/ipfs/go-merkledag/traverse" + "github.com/ipfs/interface-go-ipfs-core/path" + + cmds "github.com/ipfs/go-ipfs-cmds" + mdag "github.com/ipfs/go-merkledag" +) + +func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + progressive := req.Options[progressOptionName].(bool) + + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + + rp, err := api.ResolvePath(req.Context, path.New(req.Arguments[0])) + if err != nil { + return err + } + + if len(rp.Remainder()) > 0 { + return fmt.Errorf("cannot return size for anything other than a DAG with a root CID") + } + + nodeGetter := mdag.NewSession(req.Context, api.Dag()) + obj, err := nodeGetter.Get(req.Context, rp.Cid()) + if err != nil { + return err + } + + dagstats := &DagStat{} + err = traverse.Traverse(obj, traverse.Options{ + DAG: nodeGetter, + Order: traverse.DFSPre, + Func: func(current traverse.State) error { + dagstats.Size += uint64(len(current.Node.RawData())) + dagstats.NumBlocks++ + + if progressive { + if err := res.Emit(dagstats); err != nil { + return err + } + } + return nil + }, + ErrFunc: nil, + SkipDuplicates: true, + }) + if err != nil { + return fmt.Errorf("error traversing DAG: %w", err) + } + + if !progressive { + if err := res.Emit(dagstats); err != nil { + return err + } + } + + return nil +} + +func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error { + var dagStats *DagStat + for { + v, err := res.Next() + if err != nil { + if err == io.EOF { + break + } + return err + } + + out, ok := v.(*DagStat) + if !ok { + return e.TypeErr(out, v) + } + dagStats = out + fmt.Fprintf(os.Stderr, "%v\r", out) + } + return re.Emit(dagStats) +}