Skip to content

Commit

Permalink
Dag import functionality only ( no progress )
Browse files Browse the repository at this point in the history
This still works over "loosely defined" .car files
Please refer to the sharness tests for extra info

We can tighten this up if the sentiment is "Postel was wrong"
  • Loading branch information
ribasushi committed Apr 8, 2020
1 parent 20a89b3 commit a9c8a23
Show file tree
Hide file tree
Showing 9 changed files with 465 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ LICENSE text eol=auto
*.png binary
*.tar binary
*.gz binary
*.xz binary
*.car binary

# Binary assets
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ vendor
.tarball
go-ipfs-source.tar.gz
docs/examples/go-ipfs-as-a-library/example-folder/Qm*
/test/sharness/t0054-dag-car-import-export-data/*.car
1 change: 1 addition & 0 deletions core/commands/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func TestCommands(t *testing.T) {
"/dag/get",
"/dag/export",
"/dag/put",
"/dag/import",
"/dag/resolve",
"/dht",
"/dht/findpeer",
Expand Down
278 changes: 277 additions & 1 deletion core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import (

"github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/core/coredag"
mdag "github.com/ipfs/go-merkledag"
iface "github.com/ipfs/interface-go-ipfs-core"

cid "github.com/ipfs/go-cid"
cidenc "github.com/ipfs/go-cidutil/cidenc"
cmds "github.com/ipfs/go-ipfs-cmds"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
mdag "github.com/ipfs/go-merkledag"
ipfspath "github.com/ipfs/go-path"
"github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"
mh "github.com/multiformats/go-multihash"

Expand All @@ -32,6 +34,8 @@ import (

const (
progressOptionName = "progress"
silentOptionName = "silent"
pinRootsOptionName = "pin-roots"
)

var DagCmd = &cmds.Command{
Expand All @@ -48,6 +52,7 @@ to deprecate and replace the existing 'ipfs object' command moving forward.
"put": DagPutCmd,
"get": DagGetCmd,
"resolve": DagResolveCmd,
"import": DagImportCmd,
"export": DagExportCmd,
},
}
Expand All @@ -63,6 +68,16 @@ type ResolveOutput struct {
RemPath string
}

// CarImportOutput is the output type of the 'dag import' commands
type CarImportOutput struct {
Root RootMeta
}
type RootMeta struct {
Cid cid.Cid
PresentInImport bool
PinErrorMsg string
}

var DagPutCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Add a dag node to ipfs.",
Expand Down Expand Up @@ -258,6 +273,267 @@ var DagResolveCmd = &cmds.Command{
Type: ResolveOutput{},
}

type importResult struct {
roots map[cid.Cid]bool
err error
}

var DagImportCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Import the contents of .car files",
ShortDescription: `
'ipfs dag import' imports all blocks present in supplied .car
( Content Address aRchive ) files, recursively pinning any roots
specified in the CAR file headers, unless --pin-roots is set to false.
Note:
This command will import all blocks in the CAR file, not just those
reachable from the specified roots. However, these other blocks will
not be pinned and may be garbage collected later.
The pinning of the roots happens after all car files are processed,
permitting import of DAGs spanning multiple files.
Pinning takes place in offline-mode exclusively, one root at a time.
If the combination of blocks from the imported CAR files and what is
currently present in the blockstore does not represent a complete DAG,
pinning of that individual root will fail.
Maximum supported CAR version: 1
`,
},
Arguments: []cmds.Argument{
cmds.FileArg("path", true, true, "The path of a .car file.").EnableStdin(),
},
Options: []cmds.Option{
cmds.BoolOption(silentOptionName, "No output."),
cmds.BoolOption(pinRootsOptionName, "Pin optional roots listed in the .car headers after importing.").WithDefault(true),
},
Type: CarImportOutput{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {

node, err := cmdenv.GetNode(env)
if err != nil {
return err
}

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}

// on import ensure we do not reach out to the network for any reason
// if a pin based on what is imported + what is in the blockstore
// isn't possible: tough luck
api, err = api.WithOptions(options.Api.Offline(true))
if err != nil {
return err
}

// grab a pinlock ( which doubles as a GC lock ) so that regardless of the
// size of the streamed-in cars nothing will disappear on us before we had
// a chance to roots that may show up at the very end
// This is especially important for use cases like dagger:
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
//
unlocker := node.Blockstore.PinLock()
defer unlocker.Unlock()

doPinRoots, _ := req.Options[pinRootsOptionName].(bool)

retCh := make(chan importResult, 1)
go importWorker(req, res, api, retCh)

done := <-retCh
if done.err != nil {
return done.err
}

// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in further versions too
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will attempt a pin *only* at the end in case all car files were well formed
//
// The boolean value indicates whether we have encountered the root within the car file's
roots := done.roots

// opportunistic pinning: try whatever sticks
if doPinRoots {

var failedPins int
for c, seen := range roots {

// We need to re-retrieve a block, convert it to ipld, and feed it
// to the Pinning interface, sigh...
//
// If we didn't have the problem of inability to take multiple pinlocks,
// we could use the Api directly like so (though internally it does the same):
//
// // not ideal, but the pinning api takes only paths :(
// rp := path.NewResolvedPath(
// ipfspath.FromCid(c),
// c,
// c,
// "",
// )
//
// if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil {

ret := RootMeta{Cid: c, PresentInImport: seen}

if block, err := node.Blockstore.Get(c); err != nil {
ret.PinErrorMsg = err.Error()
} else if nd, err := ipld.Decode(block); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Pin(req.Context, nd, true); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Flush(req.Context); err != nil {
ret.PinErrorMsg = err.Error()
}

if ret.PinErrorMsg != "" {
failedPins++
}

if err := res.Emit(&CarImportOutput{Root: ret}); err != nil {
return err
}
}

if failedPins > 0 {
return fmt.Errorf(
"unable to pin all roots: %d out of %d failed",
failedPins,
len(roots),
)
}
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, event *CarImportOutput) error {

silent, _ := req.Options[silentOptionName].(bool)
if silent {
return nil
}

enc, err := cmdenv.GetLowLevelCidEncoder(req)
if err != nil {
return err
}

if event.Root.PinErrorMsg != "" {
event.Root.PinErrorMsg = fmt.Sprintf("FAILED: %s", event.Root.PinErrorMsg)
} else {
event.Root.PinErrorMsg = "success"
}

if !event.Root.PresentInImport {
event.Root.PinErrorMsg += " (root specified in .car header without available data)"
}

_, err = fmt.Fprintf(
w,
"Pinned root\t%s\t%s\n",
enc.Encode(event.Root.Cid),
event.Root.PinErrorMsg,
)
return err
}),
},
}

func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) {

// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())

roots := make(map[cid.Cid]bool)

it := req.Files.Entries()
for it.Next() {

file := files.FileFromEntry(it)
if file == nil {
ret <- importResult{err: errors.New("expected a file handle")}
return
}

// wrap a defer-closer-scope
//
// every single file in it() is already open before we start
// just close here sooner rather than later for neatness
// and to surface potential erorrs writing on closed fifos
// this won't/can't help with not running out of handles
err := func() error {
defer file.Close()

car, err := gocar.NewCarReader(file)
if err != nil {
return err
}

// Be explicit here, until the spec is finished
if car.Header.Version != 1 {
return errors.New("only car files version 1 supported at present")
}

for _, c := range car.Header.Roots {
if _, exists := roots[c]; !exists {
roots[c] = false
}
}

for {
block, err := car.Next()
if err != nil && err != io.EOF {
return err
} else if block == nil {
break
}

// the double-decode is suboptimal, but we need it for batching
nd, err := ipld.Decode(block)
if err != nil {
return err
}

if err := batch.Add(req.Context, nd); err != nil {
return err
}

// encountered something known to be a root, for the first time
if seen, exists := roots[nd.Cid()]; exists && !seen {
roots[nd.Cid()] = true
}
}

return nil
}()

if err != nil {
ret <- importResult{err: err}
return
}
}

if err := it.Err(); err != nil {
ret <- importResult{err: err}
return
}

if err := batch.Commit(); err != nil {
ret <- importResult{err: err}
return
}

ret <- importResult{roots: roots}
}

var DagExportCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Streams the selected DAG as a .car stream on stdout.",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipld-cbor v0.0.4
github.com/ipfs/go-ipld-format v0.0.2
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-ipld-git v0.0.3
github.com/ipfs/go-ipns v0.0.2
github.com/ipfs/go-log v1.0.3
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBA
github.com/alangpierce/go-forceexport v0.0.0-20160317203124-8f1d6941cd75 h1:3ILjVyslFbc4jl1w5TWuvvslFD/nDfR2H8tVaMVLrEY=
github.com/alangpierce/go-forceexport v0.0.0-20160317203124-8f1d6941cd75/go.mod h1:uAXEEpARkRhCZfEvy/y0Jcc888f9tHCc1W7/UeEtreE=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down Expand Up @@ -192,12 +190,15 @@ github.com/ipfs/go-blockservice v0.0.3/go.mod h1:/NNihwTi6V2Yr6g8wBI+BSwPuURpBRM
github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbRhbvNSdgc/7So=
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
github.com/ipfs/go-blockservice v0.1.1/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I=
github.com/ipfs/go-blockservice v0.1.2 h1:fqFeeu1EG0lGVrqUo+BVJv7LZV31I4ZsyNthCOMAJRc=
github.com/ipfs/go-blockservice v0.1.2/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I=
github.com/ipfs/go-blockservice v0.1.3 h1:9XgsPMwwWJSC9uVr2pMDsW2qFTBSkxpGMhmna8mIjPM=
github.com/ipfs/go-blockservice v0.1.3/go.mod h1:OTZhFpkgY48kNzbgyvcexW9cHrpjBYIjSR0KoDOFOLU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.4 h1:UlfXKrZx1DjZoBhQHmNHLC1fK1dUJDN20Y28A7s+gJ8=
github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M=
github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU=
github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog=
Expand Down Expand Up @@ -302,6 +303,8 @@ github.com/ipfs/go-ipld-cbor v0.0.4/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9
github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dCDnkOJhcZkms=
github.com/ipfs/go-ipld-format v0.0.2 h1:OVAGlyYT6JPZ0pEfGntFPS40lfrDmaDbQwNHEY2G9Zs=
github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k=
github.com/ipfs/go-ipld-format v0.2.0 h1:xGlJKkArkmBvowr+GMCX0FEZtkro71K1AwiKnL37mwA=
github.com/ipfs/go-ipld-format v0.2.0/go.mod h1:3l3C1uKoadTPbeNfrDi+xMInYKlx2Cvg1BuydPSdzQs=
github.com/ipfs/go-ipld-git v0.0.3 h1:/YjkjCyo5KYRpW+suby8Xh9Cm/iH9dAgGV6qyZ1dGus=
github.com/ipfs/go-ipld-git v0.0.3/go.mod h1:RuvMXa9qtJpDbqngyICCU/d+cmLFXxLsbIclmD0Lcr0=
github.com/ipfs/go-ipns v0.0.2 h1:oq4ErrV4hNQ2Eim257RTYRgfOSV/s8BDaf9iIl4NwFs=
Expand Down Expand Up @@ -556,6 +559,7 @@ github.com/libp2p/go-libp2p-peerstore v0.2.0 h1:XcgJhI8WyUOCbHyRLNEX5542YNj8hnLS
github.com/libp2p/go-libp2p-peerstore v0.2.0/go.mod h1:N2l3eVIeAitSg3Pi2ipSrJYnqhVnMNQZo9nkSCuAbnQ=
github.com/libp2p/go-libp2p-peerstore v0.2.1 h1:u+gOfsKgu73ZkGWhvckRm03z9C+iS9TrLqpANweELGs=
github.com/libp2p/go-libp2p-peerstore v0.2.1/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRjwRLBr4TYKfNgrUkOPA=
github.com/libp2p/go-libp2p-peerstore v0.2.2 h1:iqc/m03jHn5doXN3+kS6JKvqQRHEltiXljQB85iVHWE=
github.com/libp2p/go-libp2p-peerstore v0.2.2/go.mod h1:NQxhNjWxf1d4w6PihR8btWIRjwRLBr4TYKfNgrUkOPA=
github.com/libp2p/go-libp2p-peerstore v0.2.3 h1:MofRq2l3c15vQpEygTetV+zRRrncz+ktiXW7H2EKoEQ=
github.com/libp2p/go-libp2p-peerstore v0.2.3/go.mod h1:K8ljLdFn590GMttg/luh4caB/3g0vKuY01psze0upRw=
Expand Down
Loading

0 comments on commit a9c8a23

Please sign in to comment.