From 4b4769f7459d4f7104ed6ae38c21d23778977d55 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sun, 16 Oct 2016 21:06:28 -0400 Subject: [PATCH 1/2] Add PosInfo() method to blocks and merkeldag nodes. The PosInfo() method contains information on the original file the block/node was created from. It contains enough information to recreate a leaf node's data from the file such as the filename and the offset. The PosInfo is set when building the DAG in the Balanced builder. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blocks.go | 19 ++++++++++++--- commands/files/file.go | 11 +++++++++ core/coreunix/add.go | 12 +++++++++- importer/balanced/builder.go | 12 +++++++--- importer/chunk/rabin.go | 10 ++++++-- importer/chunk/splitting.go | 5 ++++ importer/helpers/dagbuilder.go | 43 ++++++++++++++++++++++++++++------ importer/helpers/helpers.go | 17 ++++++++++++-- merkledag/node.go | 12 ++++++++++ 9 files changed, 123 insertions(+), 18 deletions(-) diff --git a/blocks/blocks.go b/blocks/blocks.go index d4a385f2961..b55047290bf 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" + "github.com/ipfs/go-ipfs/commands/files" + cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" @@ -13,18 +15,21 @@ import ( var ErrWrongHash = errors.New("data did not match given hash!") +// Block is a singular block of data in ipfs + type Block interface { Multihash() mh.Multihash RawData() []byte + PosInfo() *files.PosInfo Cid() *cid.Cid String() string Loggable() map[string]interface{} } -// Block is a singular block of data in ipfs type BasicBlock struct { - cid *cid.Cid - data []byte + cid *cid.Cid + data []byte + posInfo *files.PosInfo } // NewBlock creates a Block object from opaque data. It will hash the data. @@ -59,6 +64,14 @@ func (b *BasicBlock) Cid() *cid.Cid { return b.cid } +func (b *BasicBlock) PosInfo() *files.PosInfo { + return b.posInfo +} + +func (b *BasicBlock) SetPosInfo(posInfo *files.PosInfo) { + b.posInfo = posInfo +} + func (b *BasicBlock) String() string { return fmt.Sprintf("[Block %s]", b.Cid()) } diff --git a/commands/files/file.go b/commands/files/file.go index c2185153c78..e6e16ea9afb 100644 --- a/commands/files/file.go +++ b/commands/files/file.go @@ -55,3 +55,14 @@ type SizeFile interface { Size() (int64, error) } + +type FileInfo interface { + FullPath() string + Stat() os.FileInfo +} + +type PosInfo struct { + Offset uint64 + FullPath string + Stat os.FileInfo // can be nil +} diff --git a/core/coreunix/add.go b/core/coreunix/add.go index a2ee0b7b663..372dd14a94f 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -392,7 +392,12 @@ func (adder *Adder) addFile(file files.File) error { // progress updates to the client (over the output channel) var reader io.Reader = file if adder.Progress { - reader = &progressReader{file: file, out: adder.Out} + rdr := &progressReader{file: file, out: adder.Out} + if fi, ok := file.(files.FileInfo); ok { + reader = &progressReader2{rdr, fi} + } else { + reader = rdr + } } dagnode, err := adder.add(reader) @@ -515,3 +520,8 @@ func (i *progressReader) Read(p []byte) (int, error) { return n, err } + +type progressReader2 struct { + *progressReader + files.FileInfo +} diff --git a/importer/balanced/builder.go b/importer/balanced/builder.go index 3e448e3b9e2..55bb244db16 100644 --- a/importer/balanced/builder.go +++ b/importer/balanced/builder.go @@ -8,10 +8,12 @@ import ( ) func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) { + var offset uint64 = 0 var root *h.UnixfsNode for level := 0; !db.Done(); level++ { nroot := h.NewUnixfsNode() + db.SetPosInfo(nroot, 0) // add our old root as a child of the new root. if root != nil { // nil if it's the first node. @@ -21,11 +23,13 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) { } // fill it up. - if err := fillNodeRec(db, nroot, level); err != nil { + if err := fillNodeRec(db, nroot, level, offset); err != nil { return nil, err } + offset = nroot.FileSize() root = nroot + } if root == nil { root = h.NewUnixfsNode() @@ -49,7 +53,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) { // it returns the total dataSize of the node, and a potential error // // warning: **children** pinned indirectly, but input node IS NOT pinned. -func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { +func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int, offset uint64) error { if depth < 0 { return errors.New("attempt to fillNode at depth < 0") } @@ -62,14 +66,16 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { // while we have room AND we're not done for node.NumChildren() < db.Maxlinks() && !db.Done() { child := h.NewUnixfsNode() + db.SetPosInfo(child, offset) - if err := fillNodeRec(db, child, depth-1); err != nil { + if err := fillNodeRec(db, child, depth-1, offset); err != nil { return err } if err := node.AddChild(child, db); err != nil { return err } + offset += child.FileSize() } return nil diff --git a/importer/chunk/rabin.go b/importer/chunk/rabin.go index ce9b5fc5679..d2d71460d34 100644 --- a/importer/chunk/rabin.go +++ b/importer/chunk/rabin.go @@ -10,7 +10,8 @@ import ( var IpfsRabinPoly = chunker.Pol(17437180132763653) type Rabin struct { - r *chunker.Chunker + r *chunker.Chunker + reader io.Reader } func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin { @@ -25,7 +26,8 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max) return &Rabin{ - r: ch, + r: ch, + reader: r, } } @@ -37,3 +39,7 @@ func (r *Rabin) NextBytes() ([]byte, error) { return ch.Data, nil } + +func (r *Rabin) Reader() io.Reader { + return r.reader +} diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index f3256c4587c..6fd55e22da8 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -12,6 +12,7 @@ var log = logging.Logger("chunk") var DefaultBlockSize int64 = 1024 * 256 type Splitter interface { + Reader() io.Reader NextBytes() ([]byte, error) } @@ -77,3 +78,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { return buf[:n], nil } + +func (ss *sizeSplitterv2) Reader() io.Reader { + return ss.r +} diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index 4f2875a4c22..fd567d60212 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -1,8 +1,11 @@ package helpers import ( + "github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" + "io" + "os" ) // DagBuilderHelper wraps together a bunch of objects needed to @@ -14,6 +17,8 @@ type DagBuilderHelper struct { nextData []byte // the next item to return. maxlinks int batch *dag.Batch + fullPath string + stat os.FileInfo } type DagBuilderParams struct { @@ -27,12 +32,17 @@ type DagBuilderParams struct { // Generate a new DagBuilderHelper from the given params, which data source comes // from chunks object func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { - return &DagBuilderHelper{ + db := &DagBuilderHelper{ dserv: dbp.Dagserv, spl: spl, maxlinks: dbp.Maxlinks, batch: dbp.Dagserv.Batch(), } + if fi, ok := spl.Reader().(files.FileInfo); ok { + db.fullPath = fi.FullPath() + db.stat = fi.Stat() + } + return db } // prepareNext consumes the next item from the splitter and puts it @@ -40,12 +50,14 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { // it will do nothing. func (db *DagBuilderHelper) prepareNext() { // if we already have data waiting to be consumed, we're ready - if db.nextData != nil { + if db.nextData != nil || db.recvdErr != nil { return } - // TODO: handle err (which wasn't handled either when the splitter was channeled) - db.nextData, _ = db.spl.NextBytes() + db.nextData, db.recvdErr = db.spl.NextBytes() + if db.recvdErr == io.EOF { + db.recvdErr = nil + } } // Done returns whether or not we're done consuming the incoming data. @@ -53,17 +65,24 @@ func (db *DagBuilderHelper) Done() bool { // ensure we have an accurate perspective on data // as `done` this may be called before `next`. db.prepareNext() // idempotent + if db.recvdErr != nil { + return false + } return db.nextData == nil } // Next returns the next chunk of data to be inserted into the dag // if it returns nil, that signifies that the stream is at an end, and // that the current building operation should finish -func (db *DagBuilderHelper) Next() []byte { +func (db *DagBuilderHelper) Next() ([]byte, error) { db.prepareNext() // idempotent d := db.nextData db.nextData = nil // signal we've consumed it - return d + if db.recvdErr != nil { + return nil, db.recvdErr + } else { + return d, nil + } } // GetDagServ returns the dagservice object this Helper is using @@ -93,7 +112,10 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { } func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error { - data := db.Next() + data, err := db.Next() + if err != nil { + return err + } if data == nil { // we're done! return nil } @@ -103,9 +125,16 @@ func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error { } node.SetData(data) + return nil } +func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) { + if db.stat != nil { + node.SetPosInfo(offset, db.fullPath, db.stat) + } +} + func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) { dn, err := node.GetDagNode() if err != nil { diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index bfde68214a6..50f0d492a82 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -2,8 +2,11 @@ package helpers import ( "fmt" + "os" "context" + + files "github.com/ipfs/go-ipfs/commands/files" chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" @@ -37,8 +40,9 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded") // UnixfsNode is a struct created to aid in the generation // of unixfs DAG trees type UnixfsNode struct { - node *dag.Node - ufmt *ft.FSNode + node *dag.Node + ufmt *ft.FSNode + posInfo *files.PosInfo } // NewUnixfsNode creates a new Unixfs node to represent a file @@ -115,10 +119,18 @@ func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) { n.node.Links = append(n.node.Links[:index], n.node.Links[index+1:]...) } +func (n *UnixfsNode) FileSize() uint64 { + return n.ufmt.FileSize() +} + func (n *UnixfsNode) SetData(data []byte) { n.ufmt.Data = data } +func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) { + n.posInfo = &files.PosInfo{offset, fullPath, stat} +} + // getDagNode fills out the proper formatting for the unixfs node // inside of a DAG node and returns the dag node func (n *UnixfsNode) GetDagNode() (*dag.Node, error) { @@ -127,5 +139,6 @@ func (n *UnixfsNode) GetDagNode() (*dag.Node, error) { return nil, err } n.node.SetData(data) + n.node.SetPosInfo(n.posInfo) return n.node, nil } diff --git a/merkledag/node.go b/merkledag/node.go index 91b9be64114..4f6cdc6f5e1 100644 --- a/merkledag/node.go +++ b/merkledag/node.go @@ -5,6 +5,8 @@ import ( "context" + "github.com/ipfs/go-ipfs/commands/files" + cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash" key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" @@ -22,6 +24,8 @@ type Node struct { encoded []byte cached *cid.Cid + + posInfo *files.PosInfo } // NodeStat is a statistics object for a Node. Mostly sizes. @@ -188,6 +192,10 @@ func (n *Node) RawData() []byte { return out } +func (n *Node) PosInfo() *files.PosInfo { + return n.posInfo +} + func (n *Node) Data() []byte { return n.data } @@ -198,6 +206,10 @@ func (n *Node) SetData(d []byte) { n.data = d } +func (n *Node) SetPosInfo(pi *files.PosInfo) { + n.posInfo = pi +} + // UpdateNodeLink return a copy of the node with the link name set to point to // that. If a link of the same name existed, it is removed. func (n *Node) UpdateNodeLink(name string, that *Node) (*Node, error) { From 39e494b1a412e821939811df7151b3cc2900e3f7 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sun, 16 Oct 2016 21:14:26 -0400 Subject: [PATCH 2/2] Add test that PosInfo() is getting set. License: MIT Signed-off-by: Kevin Atkinson --- core/coreunix/add_test.go | 101 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index abe82f0d51e..f1354efcc90 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -4,9 +4,14 @@ import ( "bytes" "io" "io/ioutil" + "math/rand" + "os" "testing" "time" + "github.com/ipfs/go-ipfs/blocks" + "github.com/ipfs/go-ipfs/blocks/blockstore" + "github.com/ipfs/go-ipfs/blockservice" "github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/core" dag "github.com/ipfs/go-ipfs/merkledag" @@ -162,3 +167,99 @@ func TestAddGCLive(t *testing.T) { t.Fatal(err) } } + +func TestAddWPosInfo(t *testing.T) { + r := &repo.Mock{ + C: config.Config{ + Identity: config.Identity{ + PeerID: "Qmfoo", // required by offline node + }, + }, + D: testutil.ThreadSafeCloserMapDatastore(), + } + node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r}) + if err != nil { + t.Fatal(err) + } + + bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: "/tmp/foo.txt", t: t} + bserv := blockservice.New(bs, node.Exchange) + dserv := dag.NewDAGService(bserv) + adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv) + if err != nil { + t.Fatal(err) + } + adder.Out = make(chan interface{}) + adder.Progress = true + + data := make([]byte, 5*1024*1024) + rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error + fileData := ioutil.NopCloser(bytes.NewBuffer(data)) + fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()} + file := files.NewReaderFile("foo.txt", "/tmp/foo.txt", fileData, &fileInfo) + + go func() { + defer close(adder.Out) + err = adder.AddFile(file) + if err != nil { + t.Fatal(err) + } + }() + for _ = range adder.Out {} + + if bs.countAtOffsetZero != 2 { + t.Fatal("expected 2 blocks with an offset at zero (one root, and one leaf), got %d", bs.countAtOffsetZero) + } + if bs.countAtOffsetNonZero != 19 { + // note: the exact number will depend on the size and the sharding algo. used + t.Fatal("expected 19 blocks with an offset > 0, got %d", bs.countAtOffsetNonZero) + } +} + +type testBlockstore struct { + blockstore.GCBlockstore + expectedPath string + t *testing.T + countAtOffsetZero int + countAtOffsetNonZero int +} + +func (bs *testBlockstore) Put(block blocks.Block) error { + bs.CheckForPosInfo(block) + return bs.GCBlockstore.Put(block) +} + +func (bs *testBlockstore) PutMany(blocks []blocks.Block) error { + for _, blk := range blocks { + bs.CheckForPosInfo(blk) + } + return bs.GCBlockstore.PutMany(blocks) +} + +func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) error { + posInfo := block.PosInfo() + if posInfo != nil { + if posInfo.FullPath != bs.expectedPath { + bs.t.Fatal("PosInfo does not have the expected path") + } + if posInfo.Offset == 0 { + bs.countAtOffsetZero += 1 + } else { + bs.countAtOffsetNonZero += 1 + } + } + return nil +} + +type dummyFileInfo struct { + name string + size int64 + modTime time.Time +} + +func (fi *dummyFileInfo) Name() string { return fi.name } +func (fi *dummyFileInfo) Size() int64 { return fi.size } +func (fi *dummyFileInfo) Mode() os.FileMode { return 0 } +func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime } +func (fi *dummyFileInfo) IsDir() bool { return false } +func (fi *dummyFileInfo) Sys() interface{} { return nil }