From b54f2f4e496e62b937b77750bb7cf79cb668288a Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sun, 22 May 2016 00:48:02 -0400 Subject: [PATCH] Factor out filestore specific code from DagServices. This also eliminate the need for AddOpts and ExtraInfo interface. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blocks.go | 16 ----- commands/files/adv_reader.go | 43 ++----------- commands/files/readerfile.go | 16 ++--- core/commands/add.go | 12 +--- core/coreunix/add.go | 28 +------- filestore/dataobj.go | 13 ---- filestore/datastore.go | 25 +++---- filestore/support/blockstore.go | 15 ++--- filestore/support/misc.go | 48 ++++++++++++++ importer/chunk/splitting.go | 4 +- importer/helpers/dagbuilder.go | 47 ++++++-------- importer/helpers/helpers.go | 76 +++++++++++----------- importer/trickle/trickledag.go | 4 +- merkledag/coding.go | 23 ------- merkledag/merkledag.go | 111 +++++++++++++------------------- merkledag/node.go | 6 +- 16 files changed, 187 insertions(+), 300 deletions(-) create mode 100644 filestore/support/misc.go diff --git a/blocks/blocks.go b/blocks/blocks.go index 994c855c33d..046b61c6e73 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -5,7 +5,6 @@ package blocks import ( "errors" "fmt" - "time" key "github.com/ipfs/go-ipfs/blocks/key" mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash" @@ -26,21 +25,6 @@ type BasicBlock struct { data []byte } -type FilestoreBlock struct { - BasicBlock - *DataPtr - AddOpts interface{} -} - -// This DataPtr has different AltData than the node DataPtr -type DataPtr struct { - AltData []byte - FilePath string - Offset uint64 - Size uint64 - ModTime time.Time -} - // NewBlock creates a Block object from opaque data. It will hash the data. func NewBlock(data []byte) *BasicBlock { return &BasicBlock{data: data, multihash: u.Hash(data)} diff --git a/commands/files/adv_reader.go b/commands/files/adv_reader.go index d1f658ed27d..69889980e75 100644 --- a/commands/files/adv_reader.go +++ b/commands/files/adv_reader.go @@ -1,53 +1,31 @@ package files import ( - "errors" "io" - "time" + "os" ) // An AdvReader is like a Reader but supports getting the current file // path and offset into the file when applicable. type AdvReader interface { io.Reader - ExtraInfo() ExtraInfo - SetExtraInfo(inf ExtraInfo) error -} - -type ExtraInfo interface { - Offset() int64 - AbsPath() string - // Clone creates a copy with different offset - Clone(offset int64) ExtraInfo + PosInfo() *PosInfo } type PosInfo struct { - offset int64 - absPath string -} - -func (i PosInfo) Offset() int64 { return i.offset } - -func (i PosInfo) AbsPath() string { return i.absPath } - -func (i PosInfo) Clone(offset int64) ExtraInfo { return PosInfo{offset, i.absPath} } - -func NewPosInfo(offset int64, absPath string) PosInfo { - return PosInfo{offset, absPath} + Offset uint64 + FullPath string + Stat os.FileInfo // can be nil } type advReaderAdapter struct { io.Reader } -func (advReaderAdapter) ExtraInfo() ExtraInfo { +func (advReaderAdapter) PosInfo() *PosInfo { return nil } -func (advReaderAdapter) SetExtraInfo(_ ExtraInfo) error { - return errors.New("Reader does not support setting ExtraInfo.") -} - func AdvReaderAdapter(r io.Reader) AdvReader { switch t := r.(type) { case AdvReader: @@ -57,12 +35,3 @@ func AdvReaderAdapter(r io.Reader) AdvReader { } } -type InfoForFilestore struct { - ExtraInfo - AddOpts interface{} - ModTime time.Time -} - -func (i InfoForFilestore) Clone(offset int64) ExtraInfo { - return InfoForFilestore{i.ExtraInfo.Clone(offset), i.AddOpts, i.ModTime} -} diff --git a/commands/files/readerfile.go b/commands/files/readerfile.go index 2508fe15655..b51821941b4 100644 --- a/commands/files/readerfile.go +++ b/commands/files/readerfile.go @@ -13,12 +13,11 @@ type ReaderFile struct { fullpath string reader io.ReadCloser stat os.FileInfo - offset int64 - baseInfo ExtraInfo + offset uint64 } func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile { - return &ReaderFile{filename, path, reader, stat, 0, PosInfo{0, path}} + return &ReaderFile{filename, path, reader, stat, 0} } func (f *ReaderFile) IsDirectory() bool { @@ -37,18 +36,13 @@ func (f *ReaderFile) FullPath() string { return f.fullpath } -func (f *ReaderFile) ExtraInfo() ExtraInfo { - return f.baseInfo.Clone(f.offset) -} - -func (f *ReaderFile) SetExtraInfo(info ExtraInfo) error { - f.baseInfo = info - return nil +func (f *ReaderFile) PosInfo() *PosInfo { + return &PosInfo{f.offset,f.fullpath,f.stat} } func (f *ReaderFile) Read(p []byte) (int, error) { res, err := f.reader.Read(p) - f.offset += int64(res) + f.offset += uint64(res) return res, err } diff --git a/core/commands/add.go b/core/commands/add.go index 55f33ef673e..7c9f0393260 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" "github.com/ipfs/go-ipfs/core/coreunix" - "github.com/ipfs/go-ipfs/filestore" "github.com/ipfs/go-ipfs/filestore/support" bserv "github.com/ipfs/go-ipfs/blockservice" @@ -167,7 +166,9 @@ You can now refer to the added file in a gateway, like so: if nocopy || link { ds.Blockstore = filestore_support.NewBlockstore(ds.Blockstore, n.Repo.Datastore()) blockService := bserv.New(ds.Blockstore, n.Exchange) - ds.DAG = dag.NewDAGService(blockService) + dagService := dag.NewDAGService(blockService) + dagService.NodeToBlock = filestore_support.NodeToBlock{} + ds.DAG = dagService } fileAdder, err := coreunix.NewAdder(ds, outChan) @@ -183,13 +184,6 @@ You can now refer to the added file in a gateway, like so: fileAdder.Pin = dopin fileAdder.Silent = silent - if nocopy { - fileAdder.AddOpts = filestore.AddNoCopy - } - if link { - fileAdder.AddOpts = filestore.AddLink - } - addAllAndPin := func(f files.File) error { // Iterate over each top-level file and add individually. Otherwise the // single files.File f is treated as a directory, affecting hidden file diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 279a16104da..f11b5c61e2e 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -2,7 +2,7 @@ package coreunix import ( "bytes" - "errors" + //"errors" "fmt" "io" "io/ioutil" @@ -104,28 +104,10 @@ type Adder struct { mr *mfs.Root unlocker bs.Unlocker tempRoot key.Key - AddOpts interface{} } // Perform the actual add & pin locally, outputting results to reader func (adder Adder) add(reader files.AdvReader) (*dag.Node, error) { - if adder.AddOpts != nil { - info := reader.ExtraInfo() - if info == nil { - return nil, errors.New("Reader does not support ExtraInfo.") - } - // We need to get the ModTime before any part of the - // file is read to catch the case when the file is - // modified as we are reading it - fileInfo, err := os.Stat(info.AbsPath()) - if err != nil { - return nil, err - } - err = reader.SetExtraInfo(files.InfoForFilestore{info, adder.AddOpts, fileInfo.ModTime()}) - if err != nil { - return nil, err - } - } chnk, err := chunk.FromString(reader, adder.Chunker) if err != nil { return nil, err @@ -553,10 +535,6 @@ func (i *progressReader) Read(p []byte) (int, error) { return n, err } -func (i *progressReader) ExtraInfo() files.ExtraInfo { - return i.reader.ExtraInfo() -} - -func (i *progressReader) SetExtraInfo(info files.ExtraInfo) error { - return i.reader.SetExtraInfo(info) +func (i *progressReader) PosInfo() *files.PosInfo { + return i.reader.PosInfo() } diff --git a/filestore/dataobj.go b/filestore/dataobj.go index 486bd689566..620e450bc4e 100644 --- a/filestore/dataobj.go +++ b/filestore/dataobj.go @@ -7,19 +7,6 @@ import ( "time" ) -// A hack to get around the fact that the Datastore interface does not -// accept options -type DataWOpts struct { - DataObj interface{} - AddOpts interface{} -} - -// Constants to indicate how the data should be added. -const ( - AddNoCopy = 1 - AddLink = 2 -) - const ( // If NoBlockData is true the Data is missing the Block data // as that is provided by the underlying file diff --git a/filestore/datastore.go b/filestore/datastore.go index cb87a955485..ed16e74bb15 100644 --- a/filestore/datastore.go +++ b/filestore/datastore.go @@ -36,20 +36,7 @@ func New(d ds.Datastore, fileStorePath string, verify int) (*Datastore, error) { } func (d *Datastore) Put(key ds.Key, value interface{}) (err error) { - val, ok := value.(*DataWOpts) - if !ok { - panic(ds.ErrInvalidType) - } - - addType, ok := val.AddOpts.(int) - if !ok { - panic(ds.ErrInvalidType) - } - if addType != AddNoCopy { - return errors.New("Only \"no-copy\" mode supported for now.") - } - - dataObj, ok := val.DataObj.(*DataObj) + dataObj, ok := value.(*DataObj) if !ok { panic(ds.ErrInvalidType) } @@ -87,6 +74,7 @@ func (d *Datastore) put(key ds.Key, dataObj *DataObj) (err error) { if err != nil { return err } + log.Debugf("adding block %s\n", b58.Encode(key.Bytes()[1:])) return d.ds.Put(key, data) } @@ -167,6 +155,9 @@ func (d *Datastore) GetData(key ds.Key, val *DataObj, verify int, update bool) ( } modtime = FromTime(fileInfo.ModTime()) } + if err != nil { + log.Debugf("invalid block: %s: %s\n", b58.Encode(key.Bytes()[1:]), err.Error()) + } invalid := val.Invalid() || err != nil if err == nil && (verify == VerifyAlways || (verify == VerifyIfChanged && modtime != val.ModTime)) { log.Debugf("verifying block %s\n", b58.Encode(key.Bytes()[1:])) @@ -182,7 +173,11 @@ func (d *Datastore) GetData(key ds.Key, val *DataObj, verify int, update bool) ( _ = d.put(key, &newVal) } if invalid { - log.Debugf("invalid block %s\n", b58.Encode(key.Bytes()[1:])) + if err != nil { + log.Debugf("invalid block %s: %s\n", b58.Encode(key.Bytes()[1:]), err.Error()) + } else { + log.Debugf("invalid block %s\n", b58.Encode(key.Bytes()[1:])) + } return nil, InvalidBlock{} } else { return data, nil diff --git a/filestore/support/blockstore.go b/filestore/support/blockstore.go index 7a293f37318..69b8ec79049 100644 --- a/filestore/support/blockstore.go +++ b/filestore/support/blockstore.go @@ -1,5 +1,3 @@ -// package blockstore implements a thin wrapper over a datastore, giving a -// clean interface for Getting and Putting block objects. package filestore_support import ( @@ -22,7 +20,6 @@ func NewBlockstore(b bs.GCBlockstore, d ds.Batching) bs.GCBlockstore { func (bs *blockstore) Put(block blocks.Block) error { k := block.Key().DsKey() - println("putting...") data := bs.prepareBlock(k, block) if data == nil { @@ -32,7 +29,6 @@ func (bs *blockstore) Put(block blocks.Block) error { } func (bs *blockstore) PutMany(blocks []blocks.Block) error { - println("put many...") t, err := bs.datastore.Batch() if err != nil { return err @@ -52,7 +48,8 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error { } func (bs *blockstore) prepareBlock(k ds.Key, block blocks.Block) interface{} { - if fsBlock, ok := block.(*blocks.FilestoreBlock); !ok { + if fsBlock, ok := block.(*FilestoreBlock); !ok { + //println("Non DataObj") // Has is cheaper than Put, so see if we already have it exists, err := bs.datastore.Has(k) if err == nil && exists { @@ -60,12 +57,12 @@ func (bs *blockstore) prepareBlock(k ds.Key, block blocks.Block) interface{} { } return block.Data() } else { - println("DataObj") + //println("DataObj") d := &fs.DataObj{ - FilePath: fsBlock.FilePath, + FilePath: fsBlock.FullPath, Offset: fsBlock.Offset, Size: fsBlock.Size, - ModTime: fs.FromTime(fsBlock.ModTime), + ModTime: fs.FromTime(fsBlock.Stat.ModTime()), } if fsBlock.AltData == nil { d.Flags |= fs.WholeFile | fs.FileRoot @@ -74,7 +71,7 @@ func (bs *blockstore) prepareBlock(k ds.Key, block blocks.Block) interface{} { d.Flags |= fs.NoBlockData d.Data = fsBlock.AltData } - return &fs.DataWOpts{d, fsBlock.AddOpts} + return d } } diff --git a/filestore/support/misc.go b/filestore/support/misc.go new file mode 100644 index 00000000000..f12d7bb17ed --- /dev/null +++ b/filestore/support/misc.go @@ -0,0 +1,48 @@ +package filestore_support + +import ( + //ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" + "github.com/ipfs/go-ipfs/blocks" + "github.com/ipfs/go-ipfs/commands/files" + "github.com/ipfs/go-ipfs/merkledag" +) + +type FilestoreBlock struct { + blocks.BasicBlock + AltData []byte + files.PosInfo + Size uint64 +} + +type NodeToBlock struct{} + +func (NodeToBlock) CreateBlock(nd *merkledag.Node) (blocks.Block, error) { + //println("filestore create block") + b0, err := merkledag.CreateBasicBlock(nd) + if err != nil { + return nil, err + } + if nd.DataPtr == nil { + return b0, nil + } + + b := &FilestoreBlock{ + BasicBlock: *b0, + PosInfo: nd.DataPtr.PosInfo, + Size: nd.DataPtr.Size} + + if nd.DataPtr.AltData == nil { + return b, nil + } + d, err := nd.MarshalNoData() + if err != nil { + return nil, err + } + b.AltData = d + return b, nil +} + +func (NodeToBlock) NeedAltData() bool { + return true +} + diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index 8b39f39cfcc..ddd7eebb798 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -13,7 +13,7 @@ var log = logging.Logger("chunk") var DefaultBlockSize int64 = 1024 * 256 type Bytes struct { - PosInfo files.ExtraInfo + PosInfo *files.PosInfo Data []byte } @@ -68,7 +68,7 @@ func NewSizeSplitter(r io.Reader, size int64) Splitter { } func (ss *sizeSplitterv2) NextBytes() (Bytes, error) { - posInfo := ss.r.ExtraInfo() + posInfo := ss.r.PosInfo() if ss.err != nil { return Bytes{posInfo, nil}, ss.err } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index a3f5adcbc1c..1fa964ed290 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -1,7 +1,6 @@ package helpers import ( - "github.com/ipfs/go-ipfs/commands/files" "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" ) @@ -12,20 +11,12 @@ type DagBuilderHelper struct { dserv dag.DAGService spl chunk.Splitter recvdErr error - nextData []byte // the next item to return. - posInfo files.ExtraInfo + nextData chunk.Bytes // the next item to return. maxlinks int + needAltData bool batch *dag.Batch } -func (db *DagBuilderHelper) addOpts() interface{} { - if inf, ok := db.posInfo.(files.InfoForFilestore); ok { - return inf.AddOpts - } else { - return nil - } -} - type DagBuilderParams struct { // Maximum number of links per intermediate node Maxlinks int @@ -41,6 +32,7 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { dserv: dbp.Dagserv, spl: spl, maxlinks: dbp.Maxlinks, + needAltData: dbp.Dagserv.NeedAltData(), batch: dbp.Dagserv.Batch(), } } @@ -50,14 +42,12 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { // it will do nothing. func (db *DagBuilderHelper) prepareNext() { // if we already have data waiting to be consumed, we're ready - if db.nextData != nil { + if db.nextData.Data != nil { return } // TODO: handle err (which wasn't handled either when the splitter was channeled) - nextData, _ := db.spl.NextBytes() - db.nextData = nextData.Data - db.posInfo = nextData.PosInfo + db.nextData, _ = db.spl.NextBytes() } // Done returns whether or not we're done consuming the incoming data. @@ -65,16 +55,16 @@ func (db *DagBuilderHelper) Done() bool { // ensure we have an accurate perspective on data // as `done` this may be called before `next`. db.prepareNext() // idempotent - return db.nextData == nil + return db.nextData.Data == nil } // Next returns the next chunk of data to be inserted into the dag // if it returns nil, that signifies that the stream is at an end, and // that the current building operation should finish -func (db *DagBuilderHelper) Next() []byte { +func (db *DagBuilderHelper) Next() chunk.Bytes { db.prepareNext() // idempotent d := db.nextData - db.nextData = nil // signal we've consumed it + db.nextData.Data = nil // signal we've consumed it return d } @@ -106,36 +96,35 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error { data := db.Next() - if data == nil { // we're done! + if data.Data == nil { // we're done! return nil } - if len(data) > BlockSizeLimit { + if len(data.Data) > BlockSizeLimit { return ErrSizeLimitExceeded } node.SetData(data) - if posInfo, ok := db.posInfo.(files.InfoForFilestore); ok { - node.SetDataPtr(posInfo.AbsPath(), posInfo.Offset(), posInfo.ModTime) - } return nil } func (db *DagBuilderHelper) SetAsRoot(node *UnixfsNode) { - if posInfo, ok := db.posInfo.(files.InfoForFilestore); ok { - node.SetDataPtr(posInfo.AbsPath(), 0, posInfo.ModTime) - node.SetAsRoot() - } + node.SetAsRoot(db.nextData.PosInfo) +// if posInfo, ok := db.posInfo.(files.InfoForFilestore); ok { +// node.SetDataPtr(posInfo.AbsPath(), 0, posInfo.ModTime) +// node.SetAsRoot() +// } } func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) { - dn, err := node.GetDagNode() + //println("dag builder add") + dn, err := node.GetDagNode(db.needAltData) if err != nil { return nil, err } - _, err = db.dserv.AddWOpts(dn, db.addOpts()) + _, err = db.dserv.Add(dn) if err != nil { return nil, err } diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 0c0d2525996..976bfd480de 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -2,8 +2,8 @@ package helpers import ( "fmt" - "time" + "github.com/ipfs/go-ipfs/commands/files" chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" @@ -40,10 +40,8 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded") type UnixfsNode struct { node *dag.Node ufmt *ft.FSNode - filePath string - offset int64 + posInfo *files.PosInfo fileRoot bool - modTime time.Time } // NewUnixfsNode creates a new Unixfs node to represent a file @@ -94,7 +92,7 @@ func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds dag.DAGService) (*U func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { n.ufmt.AddBlockSize(child.ufmt.FileSize()) - childnode, err := child.GetDagNode() + childnode, err := child.GetDagNode(db.needAltData) if err != nil { return err } @@ -106,7 +104,7 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { return err } - _, err = db.batch.AddWOpts(childnode, db.addOpts()) + _, err = db.batch.Add(childnode) if err != nil { return err } @@ -120,52 +118,50 @@ func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) { n.node.Links = append(n.node.Links[:index], n.node.Links[index+1:]...) } -func (n *UnixfsNode) SetData(data []byte) { - n.ufmt.Data = data +func (n *UnixfsNode) SetData(data chunk.Bytes) { + n.ufmt.Data = data.Data + n.posInfo = data.PosInfo } -func (n *UnixfsNode) SetDataPtr(filePath string, offset int64, modTime time.Time) { - //fmt.Println("SetDataPtr: ", filePath, offset) - //debug.PrintStack() - n.filePath = filePath - n.offset = offset - n.modTime = modTime -} -func (n *UnixfsNode) SetAsRoot() { + +func (n *UnixfsNode) SetAsRoot(posInfo *files.PosInfo) { + if n.posInfo == nil { + n.posInfo = posInfo + } n.fileRoot = true } // getDagNode fills out the proper formatting for the unixfs node // inside of a DAG node and returns the dag node -func (n *UnixfsNode) GetDagNode() (*dag.Node, error) { +func (n *UnixfsNode) GetDagNode(needAltData bool) (*dag.Node, error) { //fmt.Println("GetDagNode") data, err := n.ufmt.GetBytes() if err != nil { return nil, err } n.node.Data = data - if n.filePath != "" { - if n.ufmt.NumChildren() == 0 && (n.ufmt.Type == ft.TFile || n.ufmt.Type == ft.TRaw) { - //fmt.Println("We have a block.") - // We have a block - d, _ := n.ufmt.GetBytesNoData() - n.node.DataPtr = &dag.DataPtr{ - AltData: d, - FilePath: n.filePath, - Offset: uint64(n.offset), - Size: uint64(len(n.ufmt.Data)), - ModTime: n.modTime} - } else if n.ufmt.Type == ft.TFile && n.fileRoot { - //fmt.Println("We have a root.") - // We have a root - n.node.DataPtr = &dag.DataPtr{ - AltData: nil, - FilePath: n.filePath, - Offset: 0, - Size: n.ufmt.FileSize(), - ModTime: n.modTime} - } else { - // We have something else, nothing to do - } + if needAltData { + n.node.DataPtr = n.getAltData() } return n.node, nil } + +func (n *UnixfsNode) getAltData() (*dag.DataPtr) { + if n.ufmt.NumChildren() == 0 && (n.ufmt.Type == ft.TFile || n.ufmt.Type == ft.TRaw) { + //fmt.Println("We have a block.") + // We have a block + d, _ := n.ufmt.GetBytesNoData() + return &dag.DataPtr{ + AltData: d, + PosInfo: *n.posInfo, + Size: uint64(len(n.ufmt.Data))} + } else if n.ufmt.Type == ft.TFile && n.fileRoot { + //fmt.Println("We have a root.") + // We have a root + return &dag.DataPtr{ + AltData: nil, + PosInfo: files.PosInfo{0, n.posInfo.FullPath, n.posInfo.Stat}, + Size: n.ufmt.FileSize()} + } else { + return nil; + } +} diff --git a/importer/trickle/trickledag.go b/importer/trickle/trickledag.go index 7bfc42b53d2..3598d1838fd 100644 --- a/importer/trickle/trickledag.go +++ b/importer/trickle/trickledag.go @@ -91,7 +91,7 @@ func TrickleAppend(ctx context.Context, base *dag.Node, db *h.DagBuilderHelper) } if db.Done() { - return ufsn.GetDagNode() + return ufsn.GetDagNode(false) } // If continuing, our depth has increased by one @@ -124,7 +124,7 @@ func TrickleAppend(ctx context.Context, base *dag.Node, db *h.DagBuilderHelper) } } - return ufsn.GetDagNode() + return ufsn.GetDagNode(false) } // appendFillLastChild will take in an incomplete trickledag node (uncomplete meaning, not full) and diff --git a/merkledag/coding.go b/merkledag/coding.go index a6be84ee6cd..2899a35913e 100644 --- a/merkledag/coding.go +++ b/merkledag/coding.go @@ -6,7 +6,6 @@ import ( mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash" - blocks "github.com/ipfs/go-ipfs/blocks" pb "github.com/ipfs/go-ipfs/merkledag/pb" u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" ) @@ -102,28 +101,6 @@ func (n *Node) EncodeProtobuf(force bool) ([]byte, error) { return n.encoded, nil } -// Converts the node DataPtr to a block DataPtr, must be called after -// EncodeProtobuf -func (n *Node) EncodeDataPtr() (*blocks.DataPtr, error) { - if n.DataPtr == nil { - return nil, nil - } - bl := &blocks.DataPtr{ - FilePath: n.DataPtr.FilePath, - Offset: n.DataPtr.Offset, - Size: n.DataPtr.Size, - ModTime: n.DataPtr.ModTime} - if n.DataPtr.AltData == nil { - return bl, nil - } - d, err := n.MarshalNoData() - if err != nil { - return nil, err - } - bl.AltData = d - return bl, nil -} - // Decoded decodes raw data and returns a new Node instance. func DecodeProtobuf(encoded []byte) (*Node, error) { n := new(Node) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 91d00dc0634..ff87127d427 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -18,7 +18,6 @@ var ErrNotFound = fmt.Errorf("merkledag: not found") // DAGService is an IPFS Merkle DAG service. type DAGService interface { Add(*Node) (key.Key, error) - AddWOpts(*Node, interface{}) (key.Key, error) Get(context.Context, key.Key) (*Node, error) Remove(*Node) error @@ -27,70 +26,78 @@ type DAGService interface { GetMany(context.Context, []key.Key) <-chan *NodeOption Batch() *Batch -} -func NewDAGService(bs *bserv.BlockService) DAGService { - return &dagService{bs} + NeedAltData() bool } + // dagService is an IPFS Merkle DAG service. // - the root is virtual (like a forest) // - stores nodes' data in a BlockService // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high -type dagService struct { - Blocks *bserv.BlockService +type DefaultDagService struct { + Blocks *bserv.BlockService + NodeToBlock NodeToBlock } -// Add adds a node to the dagService, storing the block in the BlockService -func (n *dagService) Add(nd *Node) (key.Key, error) { - return n.AddWOpts(nd, nil) +func (n *DefaultDagService) NeedAltData() bool { + return n.NodeToBlock.NeedAltData() } -// Add a node that has data possible stored locally to the dagService, -// storing the block in the BlockService -func (n *dagService) AddWOpts(nd *Node, addOpts interface{}) (key.Key, error) { - if n == nil { // FIXME remove this assertion. protect with constructor invariant - return "", fmt.Errorf("dagService is nil") - } +type NodeToBlock interface { + CreateBlock(nd *Node) (blocks.Block, error) + NeedAltData() bool +} + +type nodeToBlock struct{} +func (nodeToBlock) CreateBlock(nd *Node) (blocks.Block, error) { + return CreateBasicBlock(nd) +} + +func CreateBasicBlock(nd *Node) (*blocks.BasicBlock, error) { d, err := nd.EncodeProtobuf(false) if err != nil { - return "", err + return nil, err } mh, err := nd.Multihash() if err != nil { - return "", err + return nil, err } - b0, err := blocks.NewBlockWithHash(d, mh) - if err != nil { - return "", err - } + return blocks.NewBlockWithHash(d, mh) +} - var dataPtr *blocks.DataPtr - if addOpts != nil { - dataPtr, err = nd.EncodeDataPtr() - if err != nil { - return "", err - } +func (nodeToBlock) NeedAltData() bool { + return false +} + +func NewDAGService(bs *bserv.BlockService) *DefaultDagService { + return &DefaultDagService{bs, nodeToBlock{}} +} + +// Add adds a node to the dagService, storing the block in the BlockService +func (n *DefaultDagService) Add(nd *Node) (key.Key, error) { + if n == nil { // FIXME remove this assertion. protect with constructor invariant + return "", fmt.Errorf("dagService is nil") } - var b blocks.Block = b0 - if dataPtr != nil { - b = &blocks.FilestoreBlock{*b0, dataPtr, addOpts} + b, err := n.NodeToBlock.CreateBlock(nd) + if err != nil { + return "", err } return n.Blocks.AddBlock(b) } -func (n *dagService) Batch() *Batch { +func (n *DefaultDagService) Batch() *Batch { return &Batch{ds: n, MaxSize: 8 * 1024 * 1024} } // Get retrieves a node from the dagService, fetching the block in the BlockService -func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) { +func (n *DefaultDagService) Get(ctx context.Context, k key.Key) (*Node, error) { if n == nil { return nil, fmt.Errorf("dagService is nil") } @@ -112,7 +119,7 @@ func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) { return res, nil } -func (n *dagService) Remove(nd *Node) error { +func (n *DefaultDagService) Remove(nd *Node) error { k, err := nd.Key() if err != nil { return err @@ -142,7 +149,7 @@ type NodeOption struct { Err error } -func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption { +func (ds *DefaultDagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption { out := make(chan *NodeOption, len(keys)) blocks := ds.Blocks.GetBlocks(ctx, keys) var count int @@ -337,51 +344,25 @@ func (np *nodePromise) Get(ctx context.Context) (*Node, error) { } type Batch struct { - ds *dagService + ds *DefaultDagService blocks []blocks.Block size int MaxSize int } -//func (t *Batch) Add(nd *Node) (key.Key, error) { -// return t.AddWOpts(nd, nil) -//} - -func (t *Batch) AddWOpts(nd *Node, addOpts interface{}) (key.Key, error) { - d, err := nd.EncodeProtobuf(false) +func (t *Batch) Add(nd *Node) (key.Key, error) { + b, err := t.ds.NodeToBlock.CreateBlock(nd) if err != nil { return "", err } - mh, err := nd.Multihash() - if err != nil { - return "", err - } - - b0, _ := blocks.NewBlockWithHash(d, mh) - - var dataPtr *blocks.DataPtr - if addOpts != nil { - dataPtr, err = nd.EncodeDataPtr() - if err != nil { - return "", err - } - } - - var b blocks.Block = b0 - if dataPtr != nil { - b = &blocks.FilestoreBlock{*b0, dataPtr, addOpts} - } - - k := key.Key(mh) - t.blocks = append(t.blocks, b) t.size += len(b.Data()) if t.size > t.MaxSize { - return k, t.Commit() + return b.Key(), t.Commit() } - return k, nil + return b.Key(), nil } func (t *Batch) Commit() error { diff --git a/merkledag/node.go b/merkledag/node.go index 8ef83afbd28..d187cccb0e8 100644 --- a/merkledag/node.go +++ b/merkledag/node.go @@ -2,12 +2,12 @@ package merkledag import ( "fmt" - "time" "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" key "github.com/ipfs/go-ipfs/blocks/key" mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash" + "github.com/ipfs/go-ipfs/commands/files" ) var ErrLinkNotFound = fmt.Errorf("no link by that name") @@ -28,10 +28,8 @@ type Node struct { type DataPtr struct { AltData []byte - FilePath string - Offset uint64 + files.PosInfo Size uint64 - ModTime time.Time } // NodeStat is a statistics object for a Node. Mostly sizes.