From 1feea8bc5fd898b9d5f944233ba5c3903224eaaf Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 11 Jan 2016 03:20:41 -0800 Subject: [PATCH 1/9] flushing and shallow list names License: MIT Signed-off-by: Jeromy --- core/commands/files/files.go | 55 ++++++++++++++++++++++------ core/core.go | 14 ++++++++ mfs/dir.go | 24 +++++++++++++ mfs/mfs_test.go | 50 ++++++++++++++++++++++++++ mfs/ops.go | 61 ++++++++++++++++++++++++++++++++ test/sharness/t0250-files-api.sh | 8 ++++- 6 files changed, 201 insertions(+), 11 deletions(-) diff --git a/core/commands/files/files.go b/core/commands/files/files.go index eb17c674eba..9099167d542 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -41,6 +41,7 @@ Files is an API for manipulating ipfs objects as if they were a unix filesystem. "mkdir": FilesMkdirCmd, "stat": FilesStatCmd, "rm": FilesRmCmd, + "flush": FilesFlushCmd, }, } @@ -257,17 +258,10 @@ Examples: switch fsn := fsn.(type) { case *mfs.Directory: if !long { - mdnd, err := fsn.GetNode() - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - var output []mfs.NodeListing - for _, lnk := range mdnd.Links { + for _, name := range fsn.ListNames() { output = append(output, mfs.NodeListing{ - Name: lnk.Name, - Hash: lnk.Hash.B58String(), + Name: name, }) } res.SetOutput(&FilesLsOutput{output}) @@ -514,7 +508,14 @@ Warning: } if flush { - defer fi.Close() + defer func() { + fi.Close() + err := mfs.FlushPath(nd.FilesRoot, path) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + }() } else { defer fi.Sync() } @@ -613,6 +614,40 @@ Examples: }, } +var FilesFlushCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "flush a given path's data to disk", + ShortDescription: ` +flush a given path to disk. This is only useful when other commands +are run with the '--flush=false'. +`, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("path", false, false, "path to flush (default '/')"), + }, + Run: func(req cmds.Request, res cmds.Response) { + nd, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + // take the lock and defer the unlock + defer nd.Blockstore.PinLock()() + + path := "/" + if len(req.Arguments()) > 0 { + path = req.Arguments()[0] + } + + err = mfs.FlushPath(nd.FilesRoot, path) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + }, +} + var FilesRmCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Remove a file.", diff --git a/core/core.go b/core/core.go index 761a17389c2..a573679f3cd 100644 --- a/core/core.go +++ b/core/core.go @@ -474,6 +474,20 @@ func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) { func (n *IpfsNode) loadFilesRoot() error { dsk := ds.NewKey("/local/filesroot") pf := func(ctx context.Context, k key.Key) error { + ds := n.Repo.Datastore() + if old, err := ds.Get(dsk); err == nil { + _ = n.Pinning.Unpin(n.Context(), key.Key(old.([]byte)), true) + } + nnd, err := n.DAG.Get(n.Context(), k) + if err != nil { + return err + } + + err = n.Pinning.Pin(n.Context(), nnd, true) + if err != nil { + return err + } + return n.Repo.Datastore().Put(dsk, []byte(k)) } diff --git a/mfs/dir.go b/mfs/dir.go index 28d9f730684..f4147dd2a11 100644 --- a/mfs/dir.go +++ b/mfs/dir.go @@ -175,6 +175,30 @@ type NodeListing struct { Hash string } +func (d *Directory) ListNames() []string { + d.Lock() + defer d.Unlock() + + names := make(map[string]struct{}) + for n, _ := range d.childDirs { + names[n] = struct{}{} + } + for n, _ := range d.files { + names[n] = struct{}{} + } + + for _, l := range d.node.Links { + names[l.Name] = struct{}{} + } + + var out []string + for n, _ := range names { + out = append(out, n) + } + + return out +} + func (d *Directory) List() ([]NodeListing, error) { d.lock.Lock() defer d.lock.Unlock() diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index 917845f5afa..161a8945aaf 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -675,3 +675,53 @@ func TestMfsStress(t *testing.T) { } } } + +func TestFlushing(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, rt := setupRoot(ctx, t) + + dir := rt.GetValue().(*Directory) + c := mkdirP(t, dir, "a/b/c") + d := mkdirP(t, dir, "a/b/d") + e := mkdirP(t, dir, "a/b/e") + + data := []byte("this is a test\n") + nd1 := &dag.Node{Data: ft.FilePBData(data, uint64(len(data)))} + + if err := c.AddChild("TEST", nd1); err != nil { + t.Fatal(err) + } + if err := d.AddChild("TEST", nd1); err != nil { + t.Fatal(err) + } + if err := e.AddChild("TEST", nd1); err != nil { + t.Fatal(err) + } + + if err := FlushPath(rt, "/a/b/c/TEST"); err != nil { + t.Fatal(err) + } + + if err := FlushPath(rt, "/a/b/d/TEST"); err != nil { + t.Fatal(err) + } + + if err := FlushPath(rt, "/a/b/e/TEST"); err != nil { + t.Fatal(err) + } + + rnd, err := dir.GetNode() + if err != nil { + t.Fatal(err) + } + + rnk, err := rnd.Key() + if err != nil { + t.Fatal(err) + } + + if rnk.B58String() != "QmWcvrHUFk7LQRrA4WqKjqy7ZyRGFLVagtgNxbEodTEzQ4" { + t.Fatal("dag looks wrong") + } +} diff --git a/mfs/ops.go b/mfs/ops.go index 75c5d6a844b..6edc9dd76a3 100644 --- a/mfs/ops.go +++ b/mfs/ops.go @@ -194,3 +194,64 @@ func DirLookup(d *Directory, pth string) (FSNode, error) { } return cur, nil } + +func FlushPath(r *Root, pth string) error { + parts := path.SplitList(strings.Trim(pth, "/")) + + d, ok := r.GetValue().(*Directory) + if !ok { + return errors.New("mfs root somehow didnt point to a directory") + } + + nd, err := flushPathRec(d, parts) + if err != nil { + return err + } + + k, err := nd.Key() + if err != nil { + return err + } + + r.repub.Update(k) + return nil +} + +func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { + if len(parts) == 0 { + return d.GetNode() + } + + d.Lock() + defer d.Unlock() + + next, err := d.childUnsync(parts[0]) + if err != nil { + log.Errorf("childnode: %q %q", parts[0], err) + return nil, err + } + + switch next := next.(type) { + case *Directory: + nd, err := flushPathRec(next, parts[1:]) + if err != nil { + return nil, err + } + + newnode, err := d.node.UpdateNodeLink(parts[0], nd) + if err != nil { + return nil, err + } + + d.node = newnode + return newnode, nil + case *File: + if len(parts) > 1 { + return nil, fmt.Errorf("%s is a file, not a directory", parts[0]) + } + + return next.GetNode() + default: + return nil, fmt.Errorf("unrecognized FSNode type: %#v", next) + } +} diff --git a/test/sharness/t0250-files-api.sh b/test/sharness/t0250-files-api.sh index 59dc2c76ef7..43fcb66c8be 100755 --- a/test/sharness/t0250-files-api.sh +++ b/test/sharness/t0250-files-api.sh @@ -331,10 +331,16 @@ test_files_api() { ' test_expect_success "root hash looks good" ' - echo "QmcwKfTMCT7AaeiD92hWjnZn9b6eh9NxnhfSzN5x2vnDpt" > root_hash_exp && + export EXP_ROOT_HASH="QmcwKfTMCT7AaeiD92hWjnZn9b6eh9NxnhfSzN5x2vnDpt" && + echo $EXP_ROOT_HASH > root_hash_exp && test_cmp root_hash_exp root_hash ' + test_expect_success "root hash is pinned" ' + ipfs pin ls + return 1 + ' + # test mv test_expect_success "can mv dir" ' ipfs files mv /cats/this/is /cats/ From d2e0d73bd52de0f6a6925b4e195a7f85d4098fe1 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 11 Jan 2016 05:37:34 -0800 Subject: [PATCH 2/9] flush pinning improvements License: MIT Signed-off-by: Jeromy --- core/core.go | 7 +++++-- mfs/ops.go | 22 +++++++++++++++++++++- mfs/system.go | 19 +++++++++++++++---- test/sharness/t0250-files-api.sh | 10 +++++++--- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/core/core.go b/core/core.go index a573679f3cd..c76943dc2d7 100644 --- a/core/core.go +++ b/core/core.go @@ -483,8 +483,11 @@ func (n *IpfsNode) loadFilesRoot() error { return err } - err = n.Pinning.Pin(n.Context(), nnd, true) - if err != nil { + if err := n.Pinning.Pin(n.Context(), nnd, true); err != nil { + return err + } + + if err := n.Pinning.Flush(); err != nil { return err } diff --git a/mfs/ops.go b/mfs/ops.go index 6edc9dd76a3..3348d45227d 100644 --- a/mfs/ops.go +++ b/mfs/ops.go @@ -197,6 +197,9 @@ func DirLookup(d *Directory, pth string) (FSNode, error) { func FlushPath(r *Root, pth string) error { parts := path.SplitList(strings.Trim(pth, "/")) + if len(parts) == 1 && parts[0] == "" { + parts = nil + } d, ok := r.GetValue().(*Directory) if !ok { @@ -214,12 +217,24 @@ func FlushPath(r *Root, pth string) error { } r.repub.Update(k) + r.repub.WaitPub() + return nil } func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { if len(parts) == 0 { - return d.GetNode() + nd, err := d.GetNode() + if err != nil { + return nil, err + } + + _, err = d.dserv.Add(nd) + if err != nil { + return nil, err + } + + return nd, nil } d.Lock() @@ -243,6 +258,11 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { return nil, err } + _, err = d.dserv.Add(newnode) + if err != nil { + return nil, err + } + d.node = newnode return newnode, nil case *File: diff --git a/mfs/system.go b/mfs/system.go index c059bf5ce5a..b5fe3876808 100644 --- a/mfs/system.go +++ b/mfs/system.go @@ -165,7 +165,7 @@ type Republisher struct { TimeoutShort time.Duration Publish chan struct{} pubfunc PubFunc - pubnowch chan struct{} + pubnowch chan chan struct{} ctx context.Context cancel func() @@ -190,7 +190,7 @@ func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration TimeoutLong: tlong, Publish: make(chan struct{}, 1), pubfunc: pf, - pubnowch: make(chan struct{}), + pubnowch: make(chan chan struct{}), ctx: ctx, cancel: cancel, } @@ -204,11 +204,17 @@ func (p *Republisher) setVal(k key.Key) { func (p *Republisher) pubNow() { select { - case p.pubnowch <- struct{}{}: + case p.pubnowch <- nil: default: } } +func (p *Republisher) WaitPub() { + wait := make(chan struct{}) + p.pubnowch <- wait + <-wait +} + func (p *Republisher) Close() error { err := p.publish(p.ctx) p.cancel() @@ -235,6 +241,8 @@ func (np *Republisher) Run() { longer := time.After(np.TimeoutLong) wait: + var pubnowresp chan struct{} + select { case <-np.ctx.Done(): return @@ -243,10 +251,13 @@ func (np *Republisher) Run() { goto wait case <-quick: case <-longer: - case <-np.pubnowch: + case pubnowresp = <-np.pubnowch: } err := np.publish(np.ctx) + if pubnowresp != nil { + pubnowresp <- struct{}{} + } if err != nil { log.Error("republishRoot error: %s", err) } diff --git a/test/sharness/t0250-files-api.sh b/test/sharness/t0250-files-api.sh index 43fcb66c8be..8732cda5939 100755 --- a/test/sharness/t0250-files-api.sh +++ b/test/sharness/t0250-files-api.sh @@ -336,9 +336,13 @@ test_files_api() { test_cmp root_hash_exp root_hash ' - test_expect_success "root hash is pinned" ' - ipfs pin ls - return 1 + test_expect_success "flush root succeeds" ' + ipfs files flush / + ' + + test_expect_success "root hash is pinned after flush" ' + ipfs pin ls > pins && + grep $EXP_ROOT_HASH pins || (cat pins && exit 1) ' # test mv From 5fcd9a1f8e0fb3a3c24eef9f123b2b4dcc2585d4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 11 Jan 2016 06:04:04 -0800 Subject: [PATCH 3/9] use correct context in pubfunc pinning License: MIT Signed-off-by: Jeromy --- core/core.go | 8 ++++---- mfs/ops.go | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/core.go b/core/core.go index c76943dc2d7..5b42545e86a 100644 --- a/core/core.go +++ b/core/core.go @@ -475,15 +475,15 @@ func (n *IpfsNode) loadFilesRoot() error { dsk := ds.NewKey("/local/filesroot") pf := func(ctx context.Context, k key.Key) error { ds := n.Repo.Datastore() - if old, err := ds.Get(dsk); err == nil { - _ = n.Pinning.Unpin(n.Context(), key.Key(old.([]byte)), true) + if old, err := ds.Get(dsk); err == nil && old != nil { + _ = n.Pinning.Unpin(ctx, key.Key(old.([]byte)), true) } - nnd, err := n.DAG.Get(n.Context(), k) + nnd, err := n.DAG.Get(ctx, k) if err != nil { return err } - if err := n.Pinning.Pin(n.Context(), nnd, true); err != nil { + if err := n.Pinning.Pin(ctx, nnd, true); err != nil { return err } diff --git a/mfs/ops.go b/mfs/ops.go index 3348d45227d..f12dfa7430a 100644 --- a/mfs/ops.go +++ b/mfs/ops.go @@ -162,6 +162,7 @@ func Mkdir(r *Root, pth string, mkparents bool, flush bool) error { func Lookup(r *Root, path string) (FSNode, error) { dir, ok := r.GetValue().(*Directory) if !ok { + log.Error("root not a dir: %#v", r.GetValue()) return nil, errors.New("root was not a directory") } From b4a47e35db6c2908588eb80a465d84716cef61de Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 11 Jan 2016 06:06:33 -0800 Subject: [PATCH 4/9] sort ListNames output License: MIT Signed-off-by: Jeromy --- mfs/dir.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mfs/dir.go b/mfs/dir.go index f4147dd2a11..c70555bb73d 100644 --- a/mfs/dir.go +++ b/mfs/dir.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path" + "sort" "sync" "time" @@ -195,6 +196,7 @@ func (d *Directory) ListNames() []string { for n, _ := range names { out = append(out, n) } + sort.Strings(out) return out } From b078915fb412f83ed01f5b44041a3d6b3db6a7dc Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 12 Jan 2016 06:06:10 -0800 Subject: [PATCH 5/9] add context to files reading License: MIT Signed-off-by: Jeromy --- core/commands/files/files.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/commands/files/files.go b/core/commands/files/files.go index 9099167d542..786728a729a 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -374,7 +374,7 @@ Examples: res.SetError(err, cmds.ErrNormal) return } - var r io.Reader = fi + var r io.Reader = &contextReaderWrapper{R: fi, ctx: req.Context()} count, found, err := req.Option("count").Int() if err != nil { res.SetError(err, cmds.ErrNormal) @@ -392,6 +392,19 @@ Examples: }, } +type contextReader interface { + CtxReadFull(context.Context, []byte) (int, error) +} + +type contextReaderWrapper struct { + R contextReader + ctx context.Context +} + +func (crw *contextReaderWrapper) Read(b []byte) (int, error) { + return crw.R.CtxReadFull(crw.ctx, b) +} + var FilesMvCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Move files.", From 829606a9ede28b348803dc196107e06fceeaeb31 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 15 Jan 2016 09:57:45 -0800 Subject: [PATCH 6/9] add note about flush flag License: MIT Signed-off-by: Jeromy --- core/commands/files/files.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/commands/files/files.go b/core/commands/files/files.go index 786728a729a..6d20de2d665 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -27,6 +27,14 @@ var FilesCmd = &cmds.Command{ Tagline: "Manipulate unixfs files.", ShortDescription: ` Files is an API for manipulating ipfs objects as if they were a unix filesystem. + +Note: +Most of the subcommands of 'ipfs files' accept the '--flush' flag. Use caution +when using this flag, It will improve performance for large numbers of file +operations, but it does so at the cost of consistency guarantees. If the daemon +is unexpectedly killed before running 'ipfs files flush' on the files in question, +then data may be lost. This also applies to running 'ipfs repo gc' concurrently +with '--flush=false' operations. `, }, Options: []cmds.Option{ From 5474e15e93e0103b887b09c0c4c96b2c99c5862e Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 15 Jan 2016 15:14:29 -0800 Subject: [PATCH 7/9] blockstore locks return unlocker object now License: MIT Signed-off-by: Jeromy --- blocks/blockstore/blockstore.go | 25 ++++++-- blocks/blockstore/write_cache.go | 4 +- core/commands/files/files.go | 32 ++++++----- core/commands/pin.go | 3 +- core/core.go | 17 ------ core/coreunix/add.go | 22 +++---- mfs/dir.go | 36 ++++++------ mfs/file.go | 14 ++--- mfs/mfs_test.go | 83 ++++++++++++++++++++++----- mfs/ops.go | 38 ++++++------ mfs/system.go | 8 +-- pin/gc/gc.go | 4 +- pin/pin.go | 4 +- test/sharness/t0250-files-api.sh | 5 -- test/sharness/t0251-files-flushing.sh | 53 +++++++++++++++++ unixfs/mod/dagmodifier.go | 6 -- 16 files changed, 228 insertions(+), 126 deletions(-) create mode 100755 test/sharness/t0251-files-flushing.sh diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 8221ec4a5de..51979e6534d 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -43,13 +43,13 @@ type GCBlockstore interface { // GCLock locks the blockstore for garbage collection. No operations // that expect to finish with a pin should ocurr simultaneously. // Reading during GC is safe, and requires no lock. - GCLock() func() + GCLock() Unlocker // PinLock locks the blockstore for sequences of puts expected to finish // with a pin (before GC). Multiple put->pin sequences can write through // at the same time, but no GC should not happen simulatenously. // Reading during Pinning is safe, and requires no lock. - PinLock() func() + PinLock() Unlocker // GcRequested returns true if GCLock has been called and is waiting to // take the lock @@ -198,16 +198,29 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { return output, nil } -func (bs *blockstore) GCLock() func() { +type Unlocker interface { + Unlock() +} + +type unlocker struct { + unlock func() +} + +func (u *unlocker) Unlock() { + u.unlock() + u.unlock = nil // ensure its not called twice +} + +func (bs *blockstore) GCLock() Unlocker { atomic.AddInt32(&bs.gcreq, 1) bs.lk.Lock() atomic.AddInt32(&bs.gcreq, -1) - return bs.lk.Unlock + return &unlocker{bs.lk.Unlock} } -func (bs *blockstore) PinLock() func() { +func (bs *blockstore) PinLock() Unlocker { bs.lk.RLock() - return bs.lk.RUnlock + return &unlocker{bs.lk.RUnlock} } func (bs *blockstore) GCRequested() bool { diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go index 90109e8a2bd..2567a721603 100644 --- a/blocks/blockstore/write_cache.go +++ b/blocks/blockstore/write_cache.go @@ -59,11 +59,11 @@ func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { return w.blockstore.AllKeysChan(ctx) } -func (w *writecache) GCLock() func() { +func (w *writecache) GCLock() Unlocker { return w.blockstore.(GCBlockstore).GCLock() } -func (w *writecache) PinLock() func() { +func (w *writecache) PinLock() Unlocker { return w.blockstore.(GCBlockstore).PinLock() } diff --git a/core/commands/files/files.go b/core/commands/files/files.go index 6d20de2d665..d31fa76688a 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -109,8 +109,7 @@ func statNode(ds dag.DAGService, fsn mfs.FSNode) (*Object, error) { return nil, err } - // add to dagserv to ensure its available - k, err := ds.Add(nd) + k, err := nd.Key() if err != nil { return nil, err } @@ -159,6 +158,11 @@ var FilesCpCmd = &cmds.Command{ return } + flush, found, _ := req.Option("flush").Bool() + if !found { + flush = true + } + src, err := checkPath(req.Arguments()[0]) if err != nil { res.SetError(err, cmds.ErrNormal) @@ -181,6 +185,14 @@ var FilesCpCmd = &cmds.Command{ res.SetError(err, cmds.ErrNormal) return } + + if flush { + err := mfs.FlushPath(node.FilesRoot, dst) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + } }, } @@ -501,8 +513,8 @@ Warning: create, _, _ := req.Option("create").Bool() trunc, _, _ := req.Option("truncate").Bool() - flush, set, _ := req.Option("flush").Bool() - if !set { + flush, fset, _ := req.Option("flush").Bool() + if !fset { flush = true } @@ -529,14 +541,7 @@ Warning: } if flush { - defer func() { - fi.Close() - err := mfs.FlushPath(nd.FilesRoot, path) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - }() + defer fi.Close() } else { defer fi.Sync() } @@ -653,9 +658,6 @@ are run with the '--flush=false'. return } - // take the lock and defer the unlock - defer nd.Blockstore.PinLock()() - path := "/" if len(req.Arguments()) > 0 { path = req.Arguments()[0] diff --git a/core/commands/pin.go b/core/commands/pin.go index 55262e3715d..cc3333328fc 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -54,8 +54,7 @@ on disk. return } - unlock := n.Blockstore.PinLock() - defer unlock() + defer n.Blockstore.PinLock().Unlock() // set recursive flag recursive, found, err := req.Option("recursive").Bool() diff --git a/core/core.go b/core/core.go index 5b42545e86a..761a17389c2 100644 --- a/core/core.go +++ b/core/core.go @@ -474,23 +474,6 @@ func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) { func (n *IpfsNode) loadFilesRoot() error { dsk := ds.NewKey("/local/filesroot") pf := func(ctx context.Context, k key.Key) error { - ds := n.Repo.Datastore() - if old, err := ds.Get(dsk); err == nil && old != nil { - _ = n.Pinning.Unpin(ctx, key.Key(old.([]byte)), true) - } - nnd, err := n.DAG.Get(ctx, k) - if err != nil { - return err - } - - if err := n.Pinning.Pin(ctx, nnd, true); err != nil { - return err - } - - if err := n.Pinning.Flush(); err != nil { - return err - } - return n.Repo.Datastore().Put(dsk, []byte(k)) } diff --git a/core/coreunix/add.go b/core/coreunix/add.go index a7f6951162c..c8a61c306ab 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/go-ipfs/pin" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" + bs "github.com/ipfs/go-ipfs/blocks/blockstore" "github.com/ipfs/go-ipfs/commands/files" core "github.com/ipfs/go-ipfs/core" dag "github.com/ipfs/go-ipfs/merkledag" @@ -100,7 +101,7 @@ type Adder struct { Chunker string root *dag.Node mr *mfs.Root - unlock func() + unlocker bs.Unlocker tempRoot key.Key } @@ -225,8 +226,7 @@ func (adder *Adder) outputDirs(path string, nd *dag.Node) error { // Add builds a merkledag from the a reader, pinning all objects to the local // datastore. Returns a key representing the root node. func Add(n *core.IpfsNode, r io.Reader) (string, error) { - unlock := n.Blockstore.PinLock() - defer unlock() + defer n.Blockstore.PinLock().Unlock() fileAdder, err := NewAdder(n.Context(), n, nil) if err != nil { @@ -247,8 +247,7 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) { // AddR recursively adds files in |path|. func AddR(n *core.IpfsNode, root string) (key string, err error) { - unlock := n.Blockstore.PinLock() - defer unlock() + n.Blockstore.PinLock().Unlock() stat, err := os.Lstat(root) if err != nil { @@ -296,8 +295,7 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.No } fileAdder.Wrap = true - unlock := n.Blockstore.PinLock() - defer unlock() + defer n.Blockstore.PinLock().Unlock() err = fileAdder.addFile(file) if err != nil { @@ -347,8 +345,10 @@ func (adder *Adder) addNode(node *dag.Node, path string) error { // Add the given file while respecting the adder. func (adder *Adder) AddFile(file files.File) error { - adder.unlock = adder.node.Blockstore.PinLock() - defer adder.unlock() + adder.unlocker = adder.node.Blockstore.PinLock() + defer func() { + adder.unlocker.Unlock() + }() return adder.addFile(file) } @@ -434,8 +434,8 @@ func (adder *Adder) maybePauseForGC() error { return err } - adder.unlock() - adder.unlock = adder.node.Blockstore.PinLock() + adder.unlocker.Unlock() + adder.unlocker = adder.node.Blockstore.PinLock() } return nil } diff --git a/mfs/dir.go b/mfs/dir.go index c70555bb73d..d437b28d74a 100644 --- a/mfs/dir.go +++ b/mfs/dir.go @@ -51,17 +51,20 @@ func NewDirectory(ctx context.Context, name string, node *dag.Node, parent child // closeChild updates the child by the given name to the dag node 'nd' // and changes its own dag node -func (d *Directory) closeChild(name string, nd *dag.Node) error { - mynd, err := d.closeChildUpdate(name, nd) +func (d *Directory) closeChild(name string, nd *dag.Node, sync bool) error { + mynd, err := d.closeChildUpdate(name, nd, sync) if err != nil { return err } - return d.parent.closeChild(d.name, mynd) + if sync { + return d.parent.closeChild(d.name, mynd, true) + } + return nil } // closeChildUpdate is the portion of closeChild that needs to be locked around -func (d *Directory) closeChildUpdate(name string, nd *dag.Node) (*dag.Node, error) { +func (d *Directory) closeChildUpdate(name string, nd *dag.Node, sync bool) (*dag.Node, error) { d.lock.Lock() defer d.lock.Unlock() @@ -70,7 +73,10 @@ func (d *Directory) closeChildUpdate(name string, nd *dag.Node) (*dag.Node, erro return nil, err } - return d.flushCurrentNode() + if sync { + return d.flushCurrentNode() + } + return nil, nil } func (d *Directory) flushCurrentNode() (*dag.Node, error) { @@ -295,12 +301,15 @@ func (d *Directory) Unlink(name string) error { } func (d *Directory) Flush() error { + d.lock.Lock() nd, err := d.flushCurrentNode() if err != nil { + d.lock.Unlock() return err } + d.lock.Unlock() - return d.parent.closeChild(d.name, nd) + return d.parent.closeChild(d.name, nd, true) } // AddChild adds the node 'nd' under this directory giving it the name 'name' @@ -335,11 +344,6 @@ func (d *Directory) sync() error { return err } - _, err = d.dserv.Add(nd) - if err != nil { - return err - } - err = d.updateChild(name, nd) if err != nil { return err @@ -352,11 +356,6 @@ func (d *Directory) sync() error { return err } - _, err = d.dserv.Add(nd) - if err != nil { - return err - } - err = d.updateChild(name, nd) if err != nil { return err @@ -385,6 +384,11 @@ func (d *Directory) GetNode() (*dag.Node, error) { return nil, err } + _, err = d.dserv.Add(d.node) + if err != nil { + return nil, err + } + return d.node.Copy(), nil } diff --git a/mfs/file.go b/mfs/file.go index da473714065..99dfff0df1e 100644 --- a/mfs/file.go +++ b/mfs/file.go @@ -73,7 +73,7 @@ func (fi *File) Close() error { // explicitly stay locked for flushUp call, // it will manage the lock for us - return fi.flushUp() + return fi.flushUp(true) } fi.Unlock() @@ -82,7 +82,7 @@ func (fi *File) Close() error { // flushUp syncs the file and adds it to the dagservice // it *must* be called with the File's lock taken -func (fi *File) flushUp() error { +func (fi *File) flushUp(fullsync bool) error { nd, err := fi.mod.GetNode() if err != nil { fi.Unlock() @@ -95,20 +95,18 @@ func (fi *File) flushUp() error { return err } - //name := fi.name - //parent := fi.parent + name := fi.name + parent := fi.parent // explicit unlock *only* before closeChild call fi.Unlock() - return nil - //return parent.closeChild(name, nd) + return parent.closeChild(name, nd, fullsync) } // Sync flushes the changes in the file to disk func (fi *File) Sync() error { fi.Lock() - defer fi.Unlock() - return fi.mod.Sync() + return fi.flushUp(false) } // Seek implements io.Seeker diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index 161a8945aaf..38237c2183b 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -576,15 +576,15 @@ func actorRemoveFile(d *Directory) error { return d.Unlink(re.Name) } -func actorReadFile(d *Directory) error { +func randomFile(d *Directory) (*File, error) { d, err := randomWalk(d, rand.Intn(6)) if err != nil { - return err + return nil, err } ents, err := d.List() if err != nil { - return err + return nil, err } var files []string @@ -595,18 +595,61 @@ func actorReadFile(d *Directory) error { } if len(files) == 0 { - return nil + return nil, nil } fname := files[rand.Intn(len(files))] fsn, err := d.Child(fname) if err != nil { - return err + return nil, err } fi, ok := fsn.(*File) if !ok { - return errors.New("file wasnt a file, race?") + return nil, errors.New("file wasnt a file, race?") + } + + return fi, nil +} + +func actorWriteFile(d *Directory) error { + fi, err := randomFile(d) + if err != nil { + return err + } + if fi == nil { + return nil + } + + size := rand.Intn(1024) + buf := make([]byte, size) + randbo.New().Read(buf) + + s, err := fi.Size() + if err != nil { + return err + } + + offset := rand.Int63n(s) + + n, err := fi.WriteAt(buf, offset) + if err != nil { + return err + } + if n != size { + return fmt.Errorf("didnt write enough") + } + + return fi.Close() +} + +func actorReadFile(d *Directory) error { + fi, err := randomFile(d) + if err != nil { + return err + } + if fi == nil { + return nil } _, err = fi.Size() @@ -637,12 +680,7 @@ func testActor(rt *Root, iterations int, errs chan error) { return } case 3: - continue - // randomly deleting things - // doesnt really give us any sort of useful test results. - // you will never have this in a real environment where - // you expect anything productive to happen... - if err := actorRemoveFile(d); err != nil { + if err := actorWriteFile(d); err != nil { errs <- err return } @@ -698,6 +736,9 @@ func TestFlushing(t *testing.T) { if err := e.AddChild("TEST", nd1); err != nil { t.Fatal(err) } + if err := dir.AddChild("FILE", nd1); err != nil { + t.Fatal(err) + } if err := FlushPath(rt, "/a/b/c/TEST"); err != nil { t.Fatal(err) @@ -711,17 +752,31 @@ func TestFlushing(t *testing.T) { t.Fatal(err) } + if err := FlushPath(rt, "/FILE"); err != nil { + t.Fatal(err) + } + rnd, err := dir.GetNode() if err != nil { t.Fatal(err) } + fsnode, err := ft.FSNodeFromBytes(rnd.Data) + if err != nil { + t.Fatal(err) + } + + if fsnode.Type != ft.TDirectory { + t.Fatal("root wasnt a directory") + } + rnk, err := rnd.Key() if err != nil { t.Fatal(err) } - if rnk.B58String() != "QmWcvrHUFk7LQRrA4WqKjqy7ZyRGFLVagtgNxbEodTEzQ4" { - t.Fatal("dag looks wrong") + exp := "QmWMVyhTuyxUrXX3ynz171jq76yY3PktfY9Bxiph7b9ikr" + if rnk.B58String() != exp { + t.Fatalf("dag looks wrong, expected %s, but got %s", exp, rnk.B58String()) } } diff --git a/mfs/ops.go b/mfs/ops.go index f12dfa7430a..9bc59994ce7 100644 --- a/mfs/ops.go +++ b/mfs/ops.go @@ -230,11 +230,6 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { return nil, err } - _, err = d.dserv.Add(nd) - if err != nil { - return nil, err - } - return nd, nil } @@ -247,6 +242,7 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { return nil, err } + var ndagnode *dag.Node switch next := next.(type) { case *Directory: nd, err := flushPathRec(next, parts[1:]) @@ -254,25 +250,33 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { return nil, err } - newnode, err := d.node.UpdateNodeLink(parts[0], nd) - if err != nil { - return nil, err - } - - _, err = d.dserv.Add(newnode) - if err != nil { - return nil, err - } + ndagnode = nd - d.node = newnode - return newnode, nil case *File: if len(parts) > 1 { return nil, fmt.Errorf("%s is a file, not a directory", parts[0]) } - return next.GetNode() + child, err := next.GetNode() + if err != nil { + return nil, err + } + + ndagnode = child default: return nil, fmt.Errorf("unrecognized FSNode type: %#v", next) } + + newnode, err := d.node.UpdateNodeLink(parts[0], ndagnode) + if err != nil { + return nil, err + } + + _, err = d.dserv.Add(newnode) + if err != nil { + return nil, err + } + + d.node = newnode + return newnode, nil } diff --git a/mfs/system.go b/mfs/system.go index b5fe3876808..454577c457c 100644 --- a/mfs/system.go +++ b/mfs/system.go @@ -29,7 +29,7 @@ var log = logging.Logger("mfs") var ErrIsDirectory = errors.New("error: is a directory") type childCloser interface { - closeChild(string, *dag.Node) error + closeChild(string, *dag.Node, bool) error } type NodeType int @@ -115,7 +115,7 @@ func (kr *Root) Flush() error { return err } - k, err := kr.dserv.Add(nd) + k, err := nd.Key() if err != nil { return err } @@ -128,7 +128,7 @@ func (kr *Root) Flush() error { // closeChild implements the childCloser interface, and signals to the publisher that // there are changes ready to be published -func (kr *Root) closeChild(name string, nd *dag.Node) error { +func (kr *Root) closeChild(name string, nd *dag.Node, sync bool) error { k, err := kr.dserv.Add(nd) if err != nil { return err @@ -146,7 +146,7 @@ func (kr *Root) Close() error { return err } - k, err := kr.dserv.Add(nd) + k, err := nd.Key() if err != nil { return err } diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 695da62ec88..0aad6c03ffc 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -23,7 +23,7 @@ var log = logging.Logger("gc") // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) { - unlock := bs.GCLock() + unlocker := bs.GCLock() bsrv := bserv.New(bs, offline.Exchange(bs)) ds := dag.NewDAGService(bsrv) @@ -41,7 +41,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key. output := make(chan key.Key) go func() { defer close(output) - defer unlock() + defer unlocker.Unlock() for { select { case k, ok := <-keychan: diff --git a/pin/pin.go b/pin/pin.go index fb6269d3ebe..a7f62417f64 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -123,6 +123,8 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error { return nil } +var ErrNotPinned = fmt.Errorf("not pinned") + // Unpin a given key func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error { p.lock.Lock() @@ -132,7 +134,7 @@ func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error { return err } if !pinned { - return fmt.Errorf("%s is not pinned", k) + return ErrNotPinned } switch reason { case "recursive": diff --git a/test/sharness/t0250-files-api.sh b/test/sharness/t0250-files-api.sh index 8732cda5939..4544da473a5 100755 --- a/test/sharness/t0250-files-api.sh +++ b/test/sharness/t0250-files-api.sh @@ -340,11 +340,6 @@ test_files_api() { ipfs files flush / ' - test_expect_success "root hash is pinned after flush" ' - ipfs pin ls > pins && - grep $EXP_ROOT_HASH pins || (cat pins && exit 1) - ' - # test mv test_expect_success "can mv dir" ' ipfs files mv /cats/this/is /cats/ diff --git a/test/sharness/t0251-files-flushing.sh b/test/sharness/t0251-files-flushing.sh new file mode 100755 index 00000000000..629a93016a4 --- /dev/null +++ b/test/sharness/t0251-files-flushing.sh @@ -0,0 +1,53 @@ +#!/bin/sh +# +# Copyright (c) 2016 Jeromy Johnson +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="test the unix files api flushing" + +. lib/test-lib.sh + +test_init_ipfs + +verify_path_exists() { + # simply running ls on a file should be a good 'check' + ipfs files ls $1 +} + +verify_dir_contents() { + dir=$1 + shift + rm -f expected + touch expected + for e in $@ + do + echo $e >> expected + done + + test_expect_success "can list dir" ' + ipfs files ls $dir > output + ' + + test_expect_success "dir entries look good" ' + test_sort_cmp output expected + ' +} + +test_launch_ipfs_daemon + +test_expect_success "can copy a file in" ' + HASH=$(echo "foo" | ipfs add -q) && + ipfs files cp /ipfs/$HASH /file +' + +test_kill_ipfs_daemon +test_launch_ipfs_daemon + +test_expect_success "file is still there" ' + verify_path_exists /file +' + +test_kill_ipfs_daemon + +test_done diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index e9dbe40a012..5306399f61f 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -169,12 +169,6 @@ func (dm *DagModifier) Sync() error { // Number of bytes we're going to write buflen := dm.wrBuf.Len() - // Grab key for unpinning after mod operation - _, err := dm.curNode.Key() - if err != nil { - return err - } - // overwrite existing dag nodes thisk, done, err := dm.modifyDag(dm.curNode, dm.writeStart, dm.wrBuf) if err != nil { From 4b7e3282f12e3cc1189c91abf75cc36ba8068c6d Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 3 Feb 2016 20:45:12 -0800 Subject: [PATCH 8/9] introduce concept of filedescriptors to mfs, adjust fuse code to use them License: MIT Signed-off-by: Jeromy --- core/commands/files/files.go | 33 +++-- fuse/ipns/ipns_test.go | 20 ++- fuse/ipns/ipns_unix.go | 92 ++++++++++--- mfs/dir.go | 12 +- mfs/fd.go | 151 +++++++++++++++++++++ mfs/file.go | 174 +++++++++--------------- mfs/mfs_test.go | 255 ++++++++++++++++++++++++++++++++--- mfs/ops.go | 78 +---------- mfs/system.go | 11 +- unixfs/mod/dagmodifier.go | 2 +- 10 files changed, 575 insertions(+), 253 deletions(-) create mode 100644 mfs/fd.go diff --git a/core/commands/files/files.go b/core/commands/files/files.go index d31fa76688a..e7978d72c46 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -368,6 +368,14 @@ Examples: return } + rfd, err := fi.Open(mfs.OpenReadOnly, false) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + defer rfd.Close() + offset, _, err := req.Option("offset").Int() if err != nil { res.SetError(err, cmds.ErrNormal) @@ -378,7 +386,7 @@ Examples: return } - filen, err := fi.Size() + filen, err := rfd.Size() if err != nil { res.SetError(err, cmds.ErrNormal) return @@ -389,12 +397,13 @@ Examples: return } - _, err = fi.Seek(int64(offset), os.SEEK_SET) + _, err = rfd.Seek(int64(offset), os.SEEK_SET) if err != nil { res.SetError(err, cmds.ErrNormal) return } - var r io.Reader = &contextReaderWrapper{R: fi, ctx: req.Context()} + + var r io.Reader = &contextReaderWrapper{R: rfd, ctx: req.Context()} count, found, err := req.Option("count").Int() if err != nil { res.SetError(err, cmds.ErrNormal) @@ -405,7 +414,7 @@ Examples: res.SetError(fmt.Errorf("cannot specify negative 'count'"), cmds.ErrNormal) return } - r = io.LimitReader(fi, int64(count)) + r = io.LimitReader(r, int64(count)) } res.SetOutput(r) @@ -540,14 +549,16 @@ Warning: return } - if flush { - defer fi.Close() - } else { - defer fi.Sync() + wfd, err := fi.Open(mfs.OpenWriteOnly, flush) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return } + defer wfd.Close() + if trunc { - if err := fi.Truncate(0); err != nil { + if err := wfd.Truncate(0); err != nil { res.SetError(err, cmds.ErrNormal) return } @@ -563,7 +574,7 @@ Warning: return } - _, err = fi.Seek(int64(offset), os.SEEK_SET) + _, err = wfd.Seek(int64(offset), os.SEEK_SET) if err != nil { log.Error("seekfail: ", err) res.SetError(err, cmds.ErrNormal) @@ -581,7 +592,7 @@ Warning: r = io.LimitReader(r, int64(count)) } - n, err := io.Copy(fi, input) + n, err := io.Copy(wfd, input) if err != nil { res.SetError(err, cmds.ErrNormal) return diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index 321376d7568..15b02eea997 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -88,7 +88,7 @@ func checkExists(t *testing.T, path string) { } } -func closeMount(mnt *fstest.Mount) { +func closeMount(mnt *mountWrap) { if err := recover(); err != nil { log.Error("Recovered panic") log.Error(err) @@ -96,7 +96,18 @@ func closeMount(mnt *fstest.Mount) { mnt.Close() } -func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.Mount) { +type mountWrap struct { + *fstest.Mount + Fs *FileSystem +} + +func (m *mountWrap) Close() error { + m.Fs.Destroy() + m.Mount.Close() + return nil +} + +func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *mountWrap) { maybeSkipFuseTests(t) var err error @@ -129,7 +140,10 @@ func setupIpnsTest(t *testing.T, node *core.IpfsNode) (*core.IpfsNode, *fstest.M t.Fatal(err) } - return node, mnt + return node, &mountWrap{ + Mount: mnt, + Fs: fs, + } } func TestIpnsLocalLink(t *testing.T) { diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index c9ccfe27140..1130a4b2984 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -23,6 +23,14 @@ import ( ci "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/crypto" ) +func init() { + if os.Getenv("IPFS_FUSE_DEBUG") != "" { + fuse.Debug = func(msg interface{}) { + fmt.Println(msg) + } + } +} + var log = logging.Logger("fuse/ipns") // FileSystem is the readwrite IPNS Fuse Filesystem. @@ -102,7 +110,7 @@ func loadRoot(ctx context.Context, rt *keyRoot, ipfs *core.IpfsNode, name string case *mfs.Directory: return &Directory{dir: val}, nil case *mfs.File: - return &File{fi: val}, nil + return &FileNode{fi: val}, nil default: return nil, errors.New("unrecognized type") } @@ -177,7 +185,7 @@ func (s *Root) Lookup(ctx context.Context, name string) (fs.Node, error) { switch nd := nd.(type) { case *Directory: return nd, nil - case *File: + case *FileNode: return nd, nil default: return nil, fuse.EIO @@ -248,15 +256,15 @@ func (r *Root) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) { // Directory is wrapper over an mfs directory to satisfy the fuse fs interface type Directory struct { dir *mfs.Directory +} - fs.NodeRef +type FileNode struct { + fi *mfs.File } // File is wrapper over an mfs file to satisfy the fuse fs interface type File struct { - fi *mfs.File - - fs.NodeRef + fi mfs.FileDescriptor } // Attr returns the attributes of a given node. @@ -269,7 +277,7 @@ func (d *Directory) Attr(ctx context.Context, a *fuse.Attr) error { } // Attr returns the attributes of a given node. -func (fi *File) Attr(ctx context.Context, a *fuse.Attr) error { +func (fi *FileNode) Attr(ctx context.Context, a *fuse.Attr) error { log.Debug("File Attr") size, err := fi.fi.Size() if err != nil { @@ -295,7 +303,7 @@ func (s *Directory) Lookup(ctx context.Context, name string) (fs.Node, error) { case *mfs.Directory: return &Directory{dir: child}, nil case *mfs.File: - return &File{fi: child}, nil + return &FileNode{fi: child}, nil default: // NB: if this happens, we do not want to continue, unpredictable behaviour // may occur. @@ -365,7 +373,7 @@ func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wr func (fi *File) Flush(ctx context.Context, req *fuse.FlushRequest) error { errs := make(chan error, 1) go func() { - errs <- fi.fi.Close() + errs <- fi.fi.Flush() }() select { case err := <-errs: @@ -393,7 +401,7 @@ func (fi *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus // Fsync flushes the content in the file to disk, but does not // update the dag tree internally -func (fi *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { +func (fi *FileNode) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { errs := make(chan error, 1) go func() { errs <- fi.fi.Sync() @@ -422,25 +430,49 @@ func (dir *Directory) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Nod return &Directory{dir: child}, nil } -func (fi *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { +func (fi *FileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { + var mfsflag int + switch { + case req.Flags.IsReadOnly(): + mfsflag = mfs.OpenReadOnly + case req.Flags.IsWriteOnly(): + mfsflag = mfs.OpenWriteOnly + case req.Flags.IsReadWrite(): + mfsflag = mfs.OpenReadWrite + default: + return nil, errors.New("unsupported flag type") + } + + fd, err := fi.fi.Open(mfsflag, true) + if err != nil { + return nil, err + } + if req.Flags&fuse.OpenTruncate != 0 { + if req.Flags.IsReadOnly() { + log.Error("tried to open a readonly file with truncate") + return nil, fuse.ENOTSUP + } log.Info("Need to truncate file!") - err := fi.fi.Truncate(0) + err := fd.Truncate(0) if err != nil { return nil, err } } else if req.Flags&fuse.OpenAppend != 0 { log.Info("Need to append to file!") + if req.Flags.IsReadOnly() { + log.Error("tried to open a readonly file with append") + return nil, fuse.ENOTSUP + } - // seek(0) essentially resets the file object, this is required for appends to work - // properly - _, err := fi.fi.Seek(0, os.SEEK_SET) + _, err := fd.Seek(0, os.SEEK_END) if err != nil { log.Error("seek reset failed: ", err) return nil, err } } - return fi, nil + + return &File{fi: fd}, nil } func (fi *File) Release(ctx context.Context, req *fuse.ReleaseRequest) error { @@ -465,8 +497,26 @@ func (dir *Directory) Create(ctx context.Context, req *fuse.CreateRequest, resp return nil, nil, errors.New("child creation failed") } - nodechild := &File{fi: fi} - return nodechild, nodechild, nil + nodechild := &FileNode{fi: fi} + + var openflag int + switch { + case req.Flags.IsReadOnly(): + openflag = mfs.OpenReadOnly + case req.Flags.IsWriteOnly(): + openflag = mfs.OpenWriteOnly + case req.Flags.IsReadWrite(): + openflag = mfs.OpenReadWrite + default: + return nil, nil, errors.New("unsupported open mode") + } + + fd, err := fi.Open(openflag, true) + if err != nil { + return nil, nil, err + } + + return nodechild, &File{fi: fd}, nil } func (dir *Directory) Remove(ctx context.Context, req *fuse.RemoveRequest) error { @@ -500,7 +550,7 @@ func (dir *Directory) Rename(ctx context.Context, req *fuse.RenameRequest, newDi if err != nil { return err } - case *File: + case *FileNode: log.Error("Cannot move node into a file!") return fuse.EPERM default: @@ -543,9 +593,13 @@ type ipnsFile interface { fs.HandleReader fs.HandleWriter fs.HandleReleaser +} + +type ipnsFileNode interface { fs.Node fs.NodeFsyncer fs.NodeOpener } +var _ ipnsFileNode = (*FileNode)(nil) var _ ipnsFile = (*File)(nil) diff --git a/mfs/dir.go b/mfs/dir.go index d437b28d74a..fc949621ac8 100644 --- a/mfs/dir.go +++ b/mfs/dir.go @@ -183,8 +183,8 @@ type NodeListing struct { } func (d *Directory) ListNames() []string { - d.Lock() - defer d.Unlock() + d.lock.Lock() + defer d.lock.Unlock() names := make(map[string]struct{}) for n, _ := range d.childDirs { @@ -391,11 +391,3 @@ func (d *Directory) GetNode() (*dag.Node, error) { return d.node.Copy(), nil } - -func (d *Directory) Lock() { - d.lock.Lock() -} - -func (d *Directory) Unlock() { - d.lock.Unlock() -} diff --git a/mfs/fd.go b/mfs/fd.go new file mode 100644 index 00000000000..2d3f2f3d0ed --- /dev/null +++ b/mfs/fd.go @@ -0,0 +1,151 @@ +package mfs + +import ( + "fmt" + "io" + + mod "github.com/ipfs/go-ipfs/unixfs/mod" + + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" +) + +type FileDescriptor interface { + io.Reader + CtxReadFull(context.Context, []byte) (int, error) + + io.Writer + io.WriterAt + + io.Closer + io.Seeker + + Truncate(int64) error + Size() (int64, error) + Sync() error + Flush() error +} + +type fileDescriptor struct { + inode *File + mod *mod.DagModifier + perms int + sync bool + hasChanges bool + + closed bool +} + +// Size returns the size of the file referred to by this descriptor +func (fi *fileDescriptor) Size() (int64, error) { + return fi.mod.Size() +} + +// Truncate truncates the file to size +func (fi *fileDescriptor) Truncate(size int64) error { + if fi.perms == OpenReadOnly { + return fmt.Errorf("cannot call truncate on readonly file descriptor") + } + fi.hasChanges = true + return fi.mod.Truncate(size) +} + +// Write writes the given data to the file at its current offset +func (fi *fileDescriptor) Write(b []byte) (int, error) { + if fi.perms == OpenReadOnly { + return 0, fmt.Errorf("cannot write on not writeable descriptor") + } + fi.hasChanges = true + return fi.mod.Write(b) +} + +// Read reads into the given buffer from the current offset +func (fi *fileDescriptor) Read(b []byte) (int, error) { + if fi.perms == OpenWriteOnly { + return 0, fmt.Errorf("cannot read on write-only descriptor") + } + return fi.mod.Read(b) +} + +// Read reads into the given buffer from the current offset +func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) { + if fi.perms == OpenWriteOnly { + return 0, fmt.Errorf("cannot read on write-only descriptor") + } + return fi.mod.CtxReadFull(ctx, b) +} + +// Close flushes, then propogates the modified dag node up the directory structure +// and signals a republish to occur +func (fi *fileDescriptor) Close() error { + defer func() { + switch fi.perms { + case OpenReadOnly: + fi.inode.desclock.RUnlock() + case OpenWriteOnly, OpenReadWrite: + fi.inode.desclock.Unlock() + } + }() + + if fi.closed { + panic("attempted to close file descriptor twice!") + } + + if fi.hasChanges { + err := fi.mod.Sync() + if err != nil { + return err + } + + fi.hasChanges = false + + // explicitly stay locked for flushUp call, + // it will manage the lock for us + return fi.flushUp(fi.sync) + } + + return nil +} + +func (fi *fileDescriptor) Sync() error { + return fi.flushUp(false) +} + +func (fi *fileDescriptor) Flush() error { + return fi.flushUp(true) +} + +// flushUp syncs the file and adds it to the dagservice +// it *must* be called with the File's lock taken +func (fi *fileDescriptor) flushUp(fullsync bool) error { + nd, err := fi.mod.GetNode() + if err != nil { + return err + } + + _, err = fi.inode.dserv.Add(nd) + if err != nil { + return err + } + + fi.inode.nodelk.Lock() + fi.inode.node = nd + name := fi.inode.name + parent := fi.inode.parent + fi.inode.nodelk.Unlock() + + return parent.closeChild(name, nd, fullsync) +} + +// Seek implements io.Seeker +func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) { + return fi.mod.Seek(offset, whence) +} + +// Write At writes the given bytes at the offset 'at' +func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) { + if fi.perms == OpenReadOnly { + return 0, fmt.Errorf("cannot write on not writeable descriptor") + } + fi.hasChanges = true + return fi.mod.WriteAt(b, at) +} diff --git a/mfs/file.go b/mfs/file.go index 99dfff0df1e..578da98f6dd 100644 --- a/mfs/file.go +++ b/mfs/file.go @@ -1,10 +1,12 @@ package mfs import ( + "fmt" "sync" chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" + ft "github.com/ipfs/go-ipfs/unixfs" mod "github.com/ipfs/go-ipfs/unixfs/mod" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" @@ -13,150 +15,98 @@ import ( type File struct { parent childCloser - name string - hasChanges bool + name string - dserv dag.DAGService - mod *mod.DagModifier - lock sync.Mutex + desclock sync.RWMutex + + dserv dag.DAGService + node *dag.Node + nodelk sync.Mutex } // NewFile returns a NewFile object with the given parameters func NewFile(name string, node *dag.Node, parent childCloser, dserv dag.DAGService) (*File, error) { - dmod, err := mod.NewDagModifier(context.Background(), node, dserv, chunk.DefaultSplitter) - if err != nil { - return nil, err - } - return &File{ dserv: dserv, parent: parent, name: name, - mod: dmod, + node: node, }, nil } -// Write writes the given data to the file at its current offset -func (fi *File) Write(b []byte) (int, error) { - fi.Lock() - defer fi.Unlock() - fi.hasChanges = true - return fi.mod.Write(b) -} - -// Read reads into the given buffer from the current offset -func (fi *File) Read(b []byte) (int, error) { - fi.Lock() - defer fi.Unlock() - return fi.mod.Read(b) -} - -// Read reads into the given buffer from the current offset -func (fi *File) CtxReadFull(ctx context.Context, b []byte) (int, error) { - fi.Lock() - defer fi.Unlock() - return fi.mod.CtxReadFull(ctx, b) -} +const ( + OpenReadOnly = iota + OpenWriteOnly + OpenReadWrite +) -// Close flushes, then propogates the modified dag node up the directory structure -// and signals a republish to occur -func (fi *File) Close() error { - fi.Lock() - if fi.hasChanges { - err := fi.mod.Sync() - if err != nil { - fi.Unlock() - return err - } - - fi.hasChanges = false - - // explicitly stay locked for flushUp call, - // it will manage the lock for us - return fi.flushUp(true) +func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) { + fi.nodelk.Lock() + node := fi.node + fi.nodelk.Unlock() + + switch flags { + case OpenReadOnly: + fi.desclock.RLock() + case OpenWriteOnly, OpenReadWrite: + fi.desclock.Lock() + default: + // TODO: support other modes + return nil, fmt.Errorf("mode not supported") } - fi.Unlock() - return nil -} - -// flushUp syncs the file and adds it to the dagservice -// it *must* be called with the File's lock taken -func (fi *File) flushUp(fullsync bool) error { - nd, err := fi.mod.GetNode() + dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dserv, chunk.DefaultSplitter) if err != nil { - fi.Unlock() - return err + return nil, err } - _, err = fi.dserv.Add(nd) + return &fileDescriptor{ + inode: fi, + perms: flags, + sync: sync, + mod: dmod, + }, nil +} + +// Size returns the size of this file +func (fi *File) Size() (int64, error) { + fi.nodelk.Lock() + defer fi.nodelk.Unlock() + pbd, err := ft.FromBytes(fi.node.Data) if err != nil { - fi.Unlock() - return err + return 0, err } - name := fi.name - parent := fi.parent - - // explicit unlock *only* before closeChild call - fi.Unlock() - return parent.closeChild(name, nd, fullsync) + return int64(pbd.GetFilesize()), nil } -// Sync flushes the changes in the file to disk -func (fi *File) Sync() error { - fi.Lock() - return fi.flushUp(false) +// GetNode returns the dag node associated with this file +func (fi *File) GetNode() (*dag.Node, error) { + fi.nodelk.Lock() + defer fi.nodelk.Unlock() + return fi.node, nil } -// Seek implements io.Seeker -func (fi *File) Seek(offset int64, whence int) (int64, error) { - fi.Lock() - defer fi.Unlock() - return fi.mod.Seek(offset, whence) -} +func (fi *File) Flush() error { + // open the file in fullsync mode + fd, err := fi.Open(OpenWriteOnly, true) + if err != nil { + return err + } -// Write At writes the given bytes at the offset 'at' -func (fi *File) WriteAt(b []byte, at int64) (int, error) { - fi.Lock() - defer fi.Unlock() - fi.hasChanges = true - return fi.mod.WriteAt(b, at) -} + defer fd.Close() -// Size returns the size of this file -func (fi *File) Size() (int64, error) { - fi.Lock() - defer fi.Unlock() - return fi.mod.Size() + return fd.Flush() } -// GetNode returns the dag node associated with this file -func (fi *File) GetNode() (*dag.Node, error) { - fi.Lock() - defer fi.Unlock() - return fi.mod.GetNode() -} - -// Truncate truncates the file to size -func (fi *File) Truncate(size int64) error { - fi.Lock() - defer fi.Unlock() - fi.hasChanges = true - return fi.mod.Truncate(size) +func (fi *File) Sync() error { + // just being able to take the writelock means the descriptor is synced + fi.desclock.Lock() + fi.desclock.Unlock() + return nil } // Type returns the type FSNode this is func (fi *File) Type() NodeType { return TFile } - -// Lock the file -func (fi *File) Lock() { - fi.lock.Lock() -} - -// Unlock the file -func (fi *File) Unlock() { - fi.lock.Unlock() -} diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index 38237c2183b..b8ba320ce9e 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -9,7 +9,9 @@ import ( "math/rand" "os" "sort" + "sync" "testing" + "time" randbo "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/dustin/randbo" ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" @@ -38,6 +40,10 @@ func getDagserv(t *testing.T) dag.DAGService { func getRandFile(t *testing.T, ds dag.DAGService, size int64) *dag.Node { r := io.LimitReader(u.NewTimeSeededRand(), size) + return fileNodeFromReader(t, ds, r) +} + +func fileNodeFromReader(t *testing.T, ds dag.DAGService, r io.Reader) *dag.Node { nd, err := importer.BuildDagFromReader(ds, chunk.DefaultSplitter(r)) if err != nil { t.Fatal(err) @@ -143,7 +149,12 @@ func assertFileAtPath(ds dag.DAGService, root *Directory, exp *dag.Node, pth str return fmt.Errorf("%s was not a file!", pth) } - out, err := ioutil.ReadAll(file) + rfd, err := file.Open(OpenReadOnly, false) + if err != nil { + return err + } + + out, err := ioutil.ReadAll(rfd) if err != nil { return err } @@ -374,6 +385,11 @@ func TestMfsFile(t *testing.T) { t.Fatal("some is seriously wrong here") } + wfd, err := fi.Open(OpenReadWrite, true) + if err != nil { + t.Fatal(err) + } + // assert size is as expected size, err := fi.Size() if size != int64(fisize) { @@ -382,7 +398,7 @@ func TestMfsFile(t *testing.T) { // write to beginning of file b := []byte("THIS IS A TEST") - n, err := fi.Write(b) + n, err := wfd.Write(b) if err != nil { t.Fatal(err) } @@ -392,19 +408,19 @@ func TestMfsFile(t *testing.T) { } // sync file - err = fi.Sync() + err = wfd.Sync() if err != nil { t.Fatal(err) } // make sure size hasnt changed - size, err = fi.Size() + size, err = wfd.Size() if size != int64(fisize) { t.Fatal("size isnt correct") } // seek back to beginning - ns, err := fi.Seek(0, os.SEEK_SET) + ns, err := wfd.Seek(0, os.SEEK_SET) if err != nil { t.Fatal(err) } @@ -415,7 +431,7 @@ func TestMfsFile(t *testing.T) { // read back bytes we wrote buf := make([]byte, len(b)) - n, err = fi.Read(buf) + n, err = wfd.Read(buf) if err != nil { t.Fatal(err) } @@ -429,12 +445,12 @@ func TestMfsFile(t *testing.T) { } // truncate file to ten bytes - err = fi.Truncate(10) + err = wfd.Truncate(10) if err != nil { t.Fatal(err) } - size, err = fi.Size() + size, err = wfd.Size() if err != nil { t.Fatal(err) } @@ -445,7 +461,7 @@ func TestMfsFile(t *testing.T) { // 'writeAt' to extend it data := []byte("this is a test foo foo foo") - nwa, err := fi.WriteAt(data, 5) + nwa, err := wfd.WriteAt(data, 5) if err != nil { t.Fatal(err) } @@ -455,7 +471,7 @@ func TestMfsFile(t *testing.T) { } // assert size once more - size, err = fi.Size() + size, err = wfd.Size() if err != nil { t.Fatal(err) } @@ -464,14 +480,14 @@ func TestMfsFile(t *testing.T) { t.Fatal("size was incorrect") } - // make sure we can get node. TODO: verify it later - _, err = fi.GetNode() + // close it out! + err = wfd.Close() if err != nil { t.Fatal(err) } - // close it out! - err = fi.Close() + // make sure we can get node. TODO: verify it later + _, err = fi.GetNode() if err != nil { t.Fatal(err) } @@ -529,13 +545,18 @@ func actorMakeFile(d *Directory) error { return err } + wfd, err := f.Open(OpenWriteOnly, true) + if err != nil { + return err + } + r := io.LimitReader(randbo.New(), int64(77*rand.Intn(123))) - _, err = io.Copy(f, r) + _, err = io.Copy(wfd, r) if err != nil { return err } - err = f.Close() + err = wfd.Close() if err != nil { return err } @@ -630,9 +651,14 @@ func actorWriteFile(d *Directory) error { return err } + wfd, err := fi.Open(OpenWriteOnly, true) + if err != nil { + return err + } + offset := rand.Int63n(s) - n, err := fi.WriteAt(buf, offset) + n, err := wfd.WriteAt(buf, offset) if err != nil { return err } @@ -640,7 +666,7 @@ func actorWriteFile(d *Directory) error { return fmt.Errorf("didnt write enough") } - return fi.Close() + return wfd.Close() } func actorReadFile(d *Directory) error { @@ -657,12 +683,17 @@ func actorReadFile(d *Directory) error { return err } - _, err = ioutil.ReadAll(fi) + rfd, err := fi.Open(OpenReadOnly, false) if err != nil { return err } - return fi.Close() + _, err = ioutil.ReadAll(rfd) + if err != nil { + return err + } + + return rfd.Close() } func testActor(rt *Root, iterations int, errs chan error) { @@ -780,3 +811,187 @@ func TestFlushing(t *testing.T) { t.Fatalf("dag looks wrong, expected %s, but got %s", exp, rnk.B58String()) } } + +func readFile(rt *Root, path string, offset int64, buf []byte) error { + n, err := Lookup(rt, path) + if err != nil { + return err + } + + fi, ok := n.(*File) + if !ok { + return fmt.Errorf("%s was not a file", path) + } + + fd, err := fi.Open(OpenReadOnly, false) + if err != nil { + return err + } + + _, err = fd.Seek(offset, os.SEEK_SET) + if err != nil { + return err + } + + nread, err := fd.Read(buf) + if err != nil { + return err + } + if nread != len(buf) { + return fmt.Errorf("didnt read enough!") + } + + return fd.Close() +} + +func TestConcurrentReads(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ds, rt := setupRoot(ctx, t) + + rootdir := rt.GetValue().(*Directory) + + path := "a/b/c" + d := mkdirP(t, rootdir, path) + + buf := make([]byte, 2048) + randbo.New().Read(buf) + + fi := fileNodeFromReader(t, ds, bytes.NewReader(buf)) + err := d.AddChild("afile", fi) + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + nloops := 100 + for i := 0; i < 10; i++ { + wg.Add(1) + go func(me int) { + defer wg.Done() + mybuf := make([]byte, len(buf)) + for j := 0; j < nloops; j++ { + offset := rand.Intn(len(buf)) + length := rand.Intn(len(buf) - offset) + + err := readFile(rt, "/a/b/c/afile", int64(offset), mybuf[:length]) + if err != nil { + t.Error("readfile failed: ", err) + return + } + + if !bytes.Equal(mybuf[:length], buf[offset:offset+length]) { + t.Error("incorrect read!") + } + } + }(i) + } + wg.Wait() +} + +func TestFileDescriptors(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ds, rt := setupRoot(ctx, t) + dir := rt.GetValue().(*Directory) + + nd := &dag.Node{Data: ft.FilePBData(nil, 0)} + fi, err := NewFile("test", nd, dir, ds) + if err != nil { + t.Fatal(err) + } + + // test read only + rfd1, err := fi.Open(OpenReadOnly, false) + if err != nil { + t.Fatal(err) + } + + err = rfd1.Truncate(0) + if err == nil { + t.Fatal("shouldnt be able to truncate readonly fd") + } + + _, err = rfd1.Write([]byte{}) + if err == nil { + t.Fatal("shouldnt be able to write to readonly fd") + } + + _, err = rfd1.Read([]byte{}) + if err != nil { + t.Fatalf("expected to be able to read from file: %s", err) + } + + done := make(chan struct{}) + go func() { + defer close(done) + // can open second readonly file descriptor + rfd2, err := fi.Open(OpenReadOnly, false) + if err != nil { + t.Error(err) + return + } + + rfd2.Close() + }() + + select { + case <-time.After(time.Second): + t.Fatal("open second file descriptor failed") + case <-done: + } + + if t.Failed() { + return + } + + // test not being able to open for write until reader are closed + done = make(chan struct{}) + go func() { + defer close(done) + wfd1, err := fi.Open(OpenWriteOnly, true) + if err != nil { + t.Error(err) + } + + wfd1.Close() + }() + + select { + case <-time.After(time.Millisecond * 200): + case <-done: + if t.Failed() { + return + } + + t.Fatal("shouldnt have been able to open file for writing") + } + + err = rfd1.Close() + if err != nil { + t.Fatal(err) + } + + select { + case <-time.After(time.Second): + t.Fatal("should have been able to open write fd after closing read fd") + case <-done: + } + + wfd, err := fi.Open(OpenWriteOnly, true) + if err != nil { + t.Fatal(err) + } + + _, err = wfd.Read([]byte{}) + if err == nil { + t.Fatal("shouldnt have been able to read from write only filedescriptor") + } + + _, err = wfd.Write([]byte{}) + if err != nil { + t.Fatal(err) + } +} diff --git a/mfs/ops.go b/mfs/ops.go index 9bc59994ce7..b02d64fd1f3 100644 --- a/mfs/ops.go +++ b/mfs/ops.go @@ -196,87 +196,17 @@ func DirLookup(d *Directory, pth string) (FSNode, error) { return cur, nil } -func FlushPath(r *Root, pth string) error { - parts := path.SplitList(strings.Trim(pth, "/")) - if len(parts) == 1 && parts[0] == "" { - parts = nil - } - - d, ok := r.GetValue().(*Directory) - if !ok { - return errors.New("mfs root somehow didnt point to a directory") - } - - nd, err := flushPathRec(d, parts) +func FlushPath(rt *Root, pth string) error { + nd, err := Lookup(rt, pth) if err != nil { return err } - k, err := nd.Key() + err = nd.Flush() if err != nil { return err } - r.repub.Update(k) - r.repub.WaitPub() - + rt.repub.WaitPub() return nil } - -func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { - if len(parts) == 0 { - nd, err := d.GetNode() - if err != nil { - return nil, err - } - - return nd, nil - } - - d.Lock() - defer d.Unlock() - - next, err := d.childUnsync(parts[0]) - if err != nil { - log.Errorf("childnode: %q %q", parts[0], err) - return nil, err - } - - var ndagnode *dag.Node - switch next := next.(type) { - case *Directory: - nd, err := flushPathRec(next, parts[1:]) - if err != nil { - return nil, err - } - - ndagnode = nd - - case *File: - if len(parts) > 1 { - return nil, fmt.Errorf("%s is a file, not a directory", parts[0]) - } - - child, err := next.GetNode() - if err != nil { - return nil, err - } - - ndagnode = child - default: - return nil, fmt.Errorf("unrecognized FSNode type: %#v", next) - } - - newnode, err := d.node.UpdateNodeLink(parts[0], ndagnode) - if err != nil { - return nil, err - } - - _, err = d.dserv.Add(newnode) - if err != nil { - return nil, err - } - - d.node = newnode - return newnode, nil -} diff --git a/mfs/system.go b/mfs/system.go index 454577c457c..2ccc6650c73 100644 --- a/mfs/system.go +++ b/mfs/system.go @@ -42,9 +42,8 @@ const ( // FSNode represents any node (directory, root, or file) in the mfs filesystem type FSNode interface { GetNode() (*dag.Node, error) + Flush() error Type() NodeType - Lock() - Unlock() } // Root represents the root of a filesystem tree @@ -210,6 +209,13 @@ func (p *Republisher) pubNow() { } func (p *Republisher) WaitPub() { + p.lk.Lock() + consistent := p.lastpub == p.val + p.lk.Unlock() + if consistent { + return + } + wait := make(chan struct{}) p.pubnowch <- wait <-wait @@ -273,7 +279,6 @@ func (np *Republisher) publish(ctx context.Context) error { topub := np.val np.lk.Unlock() - log.Info("Publishing Changes!") err := np.pubfunc(ctx, topub) if err != nil { return err diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index 5306399f61f..0f5793867f7 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -372,7 +372,7 @@ func (dm *DagModifier) Seek(offset int64, whence int) (int64, error) { case os.SEEK_SET: newoffset = uint64(offset) case os.SEEK_END: - return 0, ErrSeekEndNotImpl + newoffset = uint64(fisize) - uint64(offset) default: return 0, ErrUnrecognizedWhence } From c47e12ebb1562ad5c4ed846b32067c96b205fa29 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 4 Feb 2016 15:33:17 -0800 Subject: [PATCH 9/9] clear up comment about flush=false License: MIT Signed-off-by: Jeromy --- core/commands/files/files.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/commands/files/files.go b/core/commands/files/files.go index e7978d72c46..5841a8c5f81 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -29,12 +29,12 @@ var FilesCmd = &cmds.Command{ Files is an API for manipulating ipfs objects as if they were a unix filesystem. Note: -Most of the subcommands of 'ipfs files' accept the '--flush' flag. Use caution -when using this flag, It will improve performance for large numbers of file -operations, but it does so at the cost of consistency guarantees. If the daemon -is unexpectedly killed before running 'ipfs files flush' on the files in question, -then data may be lost. This also applies to running 'ipfs repo gc' concurrently -with '--flush=false' operations. +Most of the subcommands of 'ipfs files' accept the '--flush' flag. It defaults to +true. Use caution when setting this flag to false, It will improve performance +for large numbers of file operations, but it does so at the cost of consistency +guarantees. If the daemon is unexpectedly killed before running 'ipfs files flush' +on the files in question, then data may be lost. This also applies to running +'ipfs repo gc' concurrently with '--flush=false' operations. `, }, Options: []cmds.Option{