Skip to content

Commit

Permalink
Store needed parts of IpfsNode in Adder.
Browse files Browse the repository at this point in the history
This will make it easier to set up a specialized data pipeline.

License: MIT
Signed-off-by: Kevin Atkinson <[email protected]>
  • Loading branch information
kevina committed Jun 1, 2016
1 parent dbabcf9 commit ef9c724
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 45 deletions.
4 changes: 3 additions & 1 deletion core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ You can now refer to the added file in a gateway, like so:
outChan := make(chan interface{}, 8)
res.SetOutput((<-chan interface{})(outChan))

fileAdder, err := coreunix.NewAdder(req.Context(), n, outChan)
fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, n.DAG)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

fileAdder.Out = outChan
fileAdder.Chunker = chunker
fileAdder.Progress = progress
fileAdder.Hidden = hidden
Expand Down
91 changes: 48 additions & 43 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coreunix

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -67,42 +68,46 @@ type AddedObject struct {
Bytes int64 `json:",omitempty"`
}

func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) (*Adder, error) {
mr, err := mfs.NewRoot(ctx, n.DAG, newDirNode(), nil)
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds dag.DAGService) (*Adder, error) {
mr, err := mfs.NewRoot(ctx, ds, newDirNode(), nil)
if err != nil {
return nil, err
}

return &Adder{
mr: mr,
ctx: ctx,
node: n,
out: out,
Progress: false,
Hidden: true,
Pin: true,
Trickle: false,
Wrap: false,
Chunker: "",
mr: mr,
ctx: ctx,
pinning: p,
blockstore: bs,
dagService: ds,
Progress: false,
Hidden: true,
Pin: true,
Trickle: false,
Wrap: false,
Chunker: "",
}, nil

}

// Internal structure for holding the switches passed to the `add` call
type Adder struct {
ctx context.Context
node *core.IpfsNode
out chan interface{}
Progress bool
Hidden bool
Pin bool
Trickle bool
Silent bool
Wrap bool
Chunker string
root *dag.Node
mr *mfs.Root
unlocker bs.Unlocker
tempRoot key.Key
ctx context.Context
pinning pin.Pinner
blockstore bstore.GCBlockstore
dagService dag.DAGService
Out chan interface{}
Progress bool
Hidden bool
Pin bool
Trickle bool
Silent bool
Wrap bool
Chunker string
root *dag.Node
mr *mfs.Root
unlocker bs.Unlocker
tempRoot key.Key
}

// Perform the actual add & pin locally, outputting results to reader
Expand All @@ -114,12 +119,12 @@ func (adder Adder) add(reader io.Reader) (*dag.Node, error) {

if adder.Trickle {
return importer.BuildTrickleDagFromReader(
adder.node.DAG,
adder.dagService,
chnk,
)
}
return importer.BuildDagFromReader(
adder.node.DAG,
adder.dagService,
chnk,
)
}
Expand All @@ -137,7 +142,7 @@ func (adder *Adder) RootNode() (*dag.Node, error) {

// if not wrapping, AND one root file, use that hash as root.
if !adder.Wrap && len(root.Links) == 1 {
root, err = root.Links[0].GetNode(adder.ctx, adder.node.DAG)
root, err = root.Links[0].GetNode(adder.ctx, adder.dagService)
if err != nil {
return nil, err
}
Expand All @@ -156,21 +161,21 @@ func (adder *Adder) PinRoot() error {
return nil
}

rnk, err := adder.node.DAG.Add(root)
rnk, err := adder.dagService.Add(root)
if err != nil {
return err
}

if adder.tempRoot != "" {
err := adder.node.Pinning.Unpin(adder.ctx, adder.tempRoot, true)
err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
if err != nil {
return err
}
adder.tempRoot = rnk
}

adder.node.Pinning.PinWithMode(rnk, pin.Recursive)
return adder.node.Pinning.Flush()
adder.pinning.PinWithMode(rnk, pin.Recursive)
return adder.pinning.Flush()
}

func (adder *Adder) Finalize() (*dag.Node, error) {
Expand Down Expand Up @@ -237,15 +242,15 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error {
}
}

return outputDagnode(adder.out, path, nd)
return outputDagnode(adder.Out, path, nd)
}

// Add builds a merkledag from the a reader, pinning all objects to the local
// datastore. Returns a key representing the root node.
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
defer n.Blockstore.PinLock().Unlock()

fileAdder, err := NewAdder(n.Context(), n, nil)
fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -277,7 +282,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
}
defer f.Close()

fileAdder, err := NewAdder(n.Context(), n, nil)
fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -306,7 +311,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
// the directory, and and error if any.
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) {
file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
fileAdder, err := NewAdder(n.Context(), n, nil)
fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -355,14 +360,14 @@ func (adder *Adder) addNode(node *dag.Node, path string) error {
}

if !adder.Silent {
return outputDagnode(adder.out, path, node)
return outputDagnode(adder.Out, path, node)
}
return nil
}

// Add the given file while respecting the adder.
func (adder *Adder) AddFile(file files.File) error {
adder.unlocker = adder.node.Blockstore.PinLock()
adder.unlocker = adder.blockstore.PinLock()
defer func() {
adder.unlocker.Unlock()
}()
Expand All @@ -388,7 +393,7 @@ func (adder *Adder) addFile(file files.File) error {
}

dagnode := &dag.Node{Data: sdata}
_, err = adder.node.DAG.Add(dagnode)
_, err = adder.dagService.Add(dagnode)
if err != nil {
return err
}
Expand All @@ -401,7 +406,7 @@ func (adder *Adder) addFile(file files.File) error {
// progress updates to the client (over the output channel)
var reader io.Reader = file
if adder.Progress {
reader = &progressReader{file: file, out: adder.out}
reader = &progressReader{file: file, out: adder.Out}
}

dagnode, err := adder.add(reader)
Expand Down Expand Up @@ -445,14 +450,14 @@ func (adder *Adder) addDir(dir files.File) error {
}

func (adder *Adder) maybePauseForGC() error {
if adder.node.Blockstore.GCRequested() {
if adder.blockstore.GCRequested() {
err := adder.PinRoot()
if err != nil {
return err
}

adder.unlocker.Unlock()
adder.unlocker = adder.node.Blockstore.PinLock()
adder.unlocker = adder.blockstore.PinLock()
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func TestAddGCLive(t *testing.T) {

errs := make(chan error)
out := make(chan interface{})
adder, err := NewAdder(context.Background(), node, out)
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
adder.Out = out
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit ef9c724

Please sign in to comment.