Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flush command to ipfs files #2196

Merged
merged 9 commits into from
Feb 9, 2016
25 changes: 19 additions & 6 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ type GCBlockstore interface {
// GCLock locks the blockstore for garbage collection. No operations
// that expect to finish with a pin should ocurr simultaneously.
// Reading during GC is safe, and requires no lock.
GCLock() func()
GCLock() Unlocker

// PinLock locks the blockstore for sequences of puts expected to finish
// with a pin (before GC). Multiple put->pin sequences can write through
// at the same time, but no GC should not happen simulatenously.
// Reading during Pinning is safe, and requires no lock.
PinLock() func()
PinLock() Unlocker

// GcRequested returns true if GCLock has been called and is waiting to
// take the lock
Expand Down Expand Up @@ -198,16 +198,29 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return output, nil
}

func (bs *blockstore) GCLock() func() {
type Unlocker interface {
Unlock()
}

type unlocker struct {
unlock func()
}

func (u *unlocker) Unlock() {
u.unlock()
u.unlock = nil // ensure its not called twice
}

func (bs *blockstore) GCLock() Unlocker {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return bs.lk.Unlock
return &unlocker{bs.lk.Unlock}
}

func (bs *blockstore) PinLock() func() {
func (bs *blockstore) PinLock() Unlocker {
bs.lk.RLock()
return bs.lk.RUnlock
return &unlocker{bs.lk.RUnlock}
}

func (bs *blockstore) GCRequested() bool {
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/write_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return w.blockstore.AllKeysChan(ctx)
}

func (w *writecache) GCLock() func() {
func (w *writecache) GCLock() Unlocker {
return w.blockstore.(GCBlockstore).GCLock()
}

func (w *writecache) PinLock() func() {
func (w *writecache) PinLock() Unlocker {
return w.blockstore.(GCBlockstore).PinLock()
}

Expand Down
117 changes: 93 additions & 24 deletions core/commands/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ var FilesCmd = &cmds.Command{
Tagline: "Manipulate unixfs files.",
ShortDescription: `
Files is an API for manipulating ipfs objects as if they were a unix filesystem.

Note:
Most of the subcommands of 'ipfs files' accept the '--flush' flag. It defaults to
true. Use caution when setting this flag to false, It will improve performance
for large numbers of file operations, but it does so at the cost of consistency
guarantees. If the daemon is unexpectedly killed before running 'ipfs files flush'
on the files in question, then data may be lost. This also applies to running
'ipfs repo gc' concurrently with '--flush=false' operations.
`,
},
Options: []cmds.Option{
Expand All @@ -41,6 +49,7 @@ Files is an API for manipulating ipfs objects as if they were a unix filesystem.
"mkdir": FilesMkdirCmd,
"stat": FilesStatCmd,
"rm": FilesRmCmd,
"flush": FilesFlushCmd,
},
}

Expand Down Expand Up @@ -100,8 +109,7 @@ func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) {
return nil, err
}

// add to dagserv to ensure its available
k, err := ds.Add(nd)
k, err := nd.Key()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -150,6 +158,11 @@ var FilesCpCmd = &cmds.Command{
return
}

flush, found, _ := req.Option("flush").Bool()
if !found {
flush = true
}

src, err := checkPath(req.Arguments()[0])
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand All @@ -172,6 +185,14 @@ var FilesCpCmd = &cmds.Command{
res.SetError(err, cmds.ErrNormal)
return
}

if flush {
err := mfs.FlushPath(node.FilesRoot, dst)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}
},
}

Expand Down Expand Up @@ -257,17 +278,10 @@ Examples:
switch fsn := fsn.(type) {
case *mfs.Directory:
if !long {
mdnd, err := fsn.GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

var output []mfs.NodeListing
for _, lnk := range mdnd.Links {
for _, name := range fsn.ListNames() {
output = append(output, mfs.NodeListing{
Name: lnk.Name,
Hash: lnk.Hash.B58String(),
Name: name,
})
}
res.SetOutput(&FilesLsOutput{output})
Expand Down Expand Up @@ -354,6 +368,14 @@ Examples:
return
}

rfd, err := fi.Open(mfs.OpenReadOnly, false)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

defer rfd.Close()

offset, _, err := req.Option("offset").Int()
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand All @@ -364,7 +386,7 @@ Examples:
return
}

filen, err := fi.Size()
filen, err := rfd.Size()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
Expand All @@ -375,12 +397,13 @@ Examples:
return
}

_, err = fi.Seek(int64(offset), os.SEEK_SET)
_, err = rfd.Seek(int64(offset), os.SEEK_SET)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
var r io.Reader = fi

var r io.Reader = &contextReaderWrapper{R: rfd, ctx: req.Context()}
count, found, err := req.Option("count").Int()
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand All @@ -391,13 +414,26 @@ Examples:
res.SetError(fmt.Errorf("cannot specify negative 'count'"), cmds.ErrNormal)
return
}
r = io.LimitReader(fi, int64(count))
r = io.LimitReader(r, int64(count))
}

res.SetOutput(r)
},
}

type contextReader interface {
CtxReadFull(context.Context, []byte) (int, error)
}

type contextReaderWrapper struct {
R contextReader
ctx context.Context
}

func (crw *contextReaderWrapper) Read(b []byte) (int, error) {
return crw.R.CtxReadFull(crw.ctx, b)
}

var FilesMvCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Move files.",
Expand Down Expand Up @@ -486,8 +522,8 @@ Warning:

create, _, _ := req.Option("create").Bool()
trunc, _, _ := req.Option("truncate").Bool()
flush, set, _ := req.Option("flush").Bool()
if !set {
flush, fset, _ := req.Option("flush").Bool()
if !fset {
flush = true
}

Expand All @@ -513,14 +549,16 @@ Warning:
return
}

if flush {
defer fi.Close()
} else {
defer fi.Sync()
wfd, err := fi.Open(mfs.OpenWriteOnly, flush)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

defer wfd.Close()

if trunc {
if err := fi.Truncate(0); err != nil {
if err := wfd.Truncate(0); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
Expand All @@ -536,7 +574,7 @@ Warning:
return
}

_, err = fi.Seek(int64(offset), os.SEEK_SET)
_, err = wfd.Seek(int64(offset), os.SEEK_SET)
if err != nil {
log.Error("seekfail: ", err)
res.SetError(err, cmds.ErrNormal)
Expand All @@ -554,7 +592,7 @@ Warning:
r = io.LimitReader(r, int64(count))
}

n, err := io.Copy(fi, input)
n, err := io.Copy(wfd, input)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
Expand Down Expand Up @@ -613,6 +651,37 @@ Examples:
},
}

var FilesFlushCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "flush a given path's data to disk",
ShortDescription: `
flush a given path to disk. This is only useful when other commands
are run with the '--flush=false'.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("path", false, false, "path to flush (default '/')"),
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

path := "/"
if len(req.Arguments()) > 0 {
path = req.Arguments()[0]
}

err = mfs.FlushPath(nd.FilesRoot, path)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
},
}

var FilesRmCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Remove a file.",
Expand Down
3 changes: 1 addition & 2 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ on disk.
return
}

unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()

// set recursive flag
recursive, found, err := req.Option("recursive").Bool()
Expand Down
22 changes: 11 additions & 11 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ipfs/go-ipfs/pin"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"

bs "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/commands/files"
core "github.com/ipfs/go-ipfs/core"
dag "github.com/ipfs/go-ipfs/merkledag"
Expand Down Expand Up @@ -100,7 +101,7 @@ type Adder struct {
Chunker string
root *dag.Node
mr *mfs.Root
unlock func()
unlocker bs.Unlocker
tempRoot key.Key
}

Expand Down Expand Up @@ -225,8 +226,7 @@ func (adder *Adder) outputDirs(path string, nd *dag.Node) error {
// Add builds a merkledag from the a reader, pinning all objects to the local
// datastore. Returns a key representing the root node.
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()

fileAdder, err := NewAdder(n.Context(), n, nil)
if err != nil {
Expand All @@ -247,8 +247,7 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) {

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
unlock := n.Blockstore.PinLock()
defer unlock()
n.Blockstore.PinLock().Unlock()

stat, err := os.Lstat(root)
if err != nil {
Expand Down Expand Up @@ -296,8 +295,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No
}
fileAdder.Wrap = true

unlock := n.Blockstore.PinLock()
defer unlock()
defer n.Blockstore.PinLock().Unlock()

err = fileAdder.addFile(file)
if err != nil {
Expand Down Expand Up @@ -347,8 +345,10 @@ func (adder *Adder) addNode(node *dag.Node, path string) error {

// Add the given file while respecting the adder.
func (adder *Adder) AddFile(file files.File) error {
adder.unlock = adder.node.Blockstore.PinLock()
defer adder.unlock()
adder.unlocker = adder.node.Blockstore.PinLock()
defer func() {
adder.unlocker.Unlock()
}()

return adder.addFile(file)
}
Expand Down Expand Up @@ -434,8 +434,8 @@ func (adder *Adder) maybePauseForGC() error {
return err
}

adder.unlock()
adder.unlock = adder.node.Blockstore.PinLock()
adder.unlocker.Unlock()
adder.unlocker = adder.node.Blockstore.PinLock()
}
return nil
}
Expand Down
Loading