Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] MFS improvements #4517

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/commands/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ Examples:
return
}

rfd, err := fi.Open(mfs.OpenReadOnly, false)
rfd, err := fi.Open(mfs.Flags{Read: true, Sync: false})
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down Expand Up @@ -678,7 +678,7 @@ stat' on the file or any of its ancestors.
fi.RawLeaves = rawLeaves
}

wfd, err := fi.Open(mfs.OpenWriteOnly, flush)
wfd, err := fi.Open(mfs.Flags{Write: true, Sync: flush})
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand Down
22 changes: 12 additions & 10 deletions fuse/ipns/ipns_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,19 +440,20 @@ func (dir *Directory) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Nod
}

func (fi *FileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
var mfsflag int
mfsflags := mfs.Flags{Sync: true}
switch {
case req.Flags.IsReadOnly():
mfsflag = mfs.OpenReadOnly
mfsflags.Read = true
case req.Flags.IsWriteOnly():
mfsflag = mfs.OpenWriteOnly
mfsflags.Write = true
case req.Flags.IsReadWrite():
mfsflag = mfs.OpenReadWrite
mfsflags.Write = true
mfsflags.Read = true
default:
return nil, errors.New("unsupported flag type")
}

fd, err := fi.fi.Open(mfsflag, true)
fd, err := fi.fi.Open(mfsflags)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -508,19 +509,20 @@ func (dir *Directory) Create(ctx context.Context, req *fuse.CreateRequest, resp

nodechild := &FileNode{fi: fi}

var openflag int
openflags := mfs.Flags{Sync: true}
switch {
case req.Flags.IsReadOnly():
openflag = mfs.OpenReadOnly
openflags.Read = true
case req.Flags.IsWriteOnly():
openflag = mfs.OpenWriteOnly
openflags.Write = true
case req.Flags.IsReadWrite():
openflag = mfs.OpenReadWrite
openflags.Read = true
openflags.Write = true
default:
return nil, nil, errors.New("unsupported open mode")
}

fd, err := fi.Open(openflag, true)
fd, err := fi.Open(openflags)
if err != nil {
return nil, nil, err
}
Expand Down
162 changes: 103 additions & 59 deletions mfs/fd.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package mfs

import (
"context"
"fmt"
"io"

mod "github.com/ipfs/go-ipfs/unixfs/mod"

context "context"
node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
)

type state uint8

const (
stateFlushed state = iota
stateSynced
stateDirty
stateClosed
)

type FileDescriptor interface {
Expand All @@ -26,13 +36,31 @@ type FileDescriptor interface {
}

type fileDescriptor struct {
inode *File
mod *mod.DagModifier
perms int
sync bool
hasChanges bool
inode *File
mod *mod.DagModifier
flags Flags

state state
}

func (fi *fileDescriptor) checkWrite() error {
if fi.state == stateClosed {
return ErrClosed
}
if !fi.flags.Write {
return fmt.Errorf("file is read-only")
}
return nil
}

closed bool
func (fi *fileDescriptor) checkRead() error {
if fi.state == stateClosed {
return ErrClosed
}
if !fi.flags.Read {
return fmt.Errorf("file is write-only")
}
return nil
}

// Size returns the size of the file referred to by this descriptor
Expand All @@ -42,110 +70,126 @@ func (fi *fileDescriptor) Size() (int64, error) {

// Truncate truncates the file to size
func (fi *fileDescriptor) Truncate(size int64) error {
if fi.perms == OpenReadOnly {
return fmt.Errorf("cannot call truncate on readonly file descriptor")
if err := fi.checkWrite(); err != nil {
return fmt.Errorf("truncate failed: %s", err)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.Truncate(size)
}

// Write writes the given data to the file at its current offset
func (fi *fileDescriptor) Write(b []byte) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
if err := fi.checkWrite(); err != nil {
return 0, fmt.Errorf("write failed: %s", err)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.Write(b)
}

// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) Read(b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
if err := fi.checkRead(); err != nil {
return 0, fmt.Errorf("read failed: %s", err)
}
return fi.mod.Read(b)
}

// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if fi.perms == OpenWriteOnly {
return 0, fmt.Errorf("cannot read on write-only descriptor")
if err := fi.checkRead(); err != nil {
return 0, fmt.Errorf("read failed: %s", err)
}
return fi.mod.CtxReadFull(ctx, b)
}

// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *fileDescriptor) Close() error {
defer func() {
switch fi.perms {
case OpenReadOnly:
fi.inode.desclock.RUnlock()
case OpenWriteOnly, OpenReadWrite:
fi.inode.desclock.Unlock()
}
}()

if fi.closed {
panic("attempted to close file descriptor twice!")
if fi.state == stateClosed {
return ErrClosed
}

if fi.hasChanges {
err := fi.mod.Sync()
if err != nil {
return err
}

fi.hasChanges = false

// explicitly stay locked for flushUp call,
// it will manage the lock for us
return fi.flushUp(fi.sync)
if fi.flags.Write {
defer fi.inode.desclock.Unlock()
} else if fi.flags.Read {
defer fi.inode.desclock.RUnlock()
}

return nil
err := fi.flushUp(fi.flags.Sync)
fi.state = stateClosed
return err
}

func (fi *fileDescriptor) Sync() error {
if fi.state == stateClosed {
return ErrClosed
}
return fi.flushUp(false)
}

func (fi *fileDescriptor) Flush() error {
if fi.state == stateClosed {
return ErrClosed
}
return fi.flushUp(true)
}

// flushUp syncs the file and adds it to the dagservice
// it *must* be called with the File's lock taken
func (fi *fileDescriptor) flushUp(fullsync bool) error {
nd, err := fi.mod.GetNode()
if err != nil {
return err
}
var nd node.Node
switch fi.state {
case stateDirty:
// calls mod.Sync internally.
var err error
nd, err = fi.mod.GetNode()
if err != nil {
return err
}

_, err = fi.inode.dserv.Add(nd)
if err != nil {
return err
}
_, err = fi.inode.dserv.Add(nd)
if err != nil {
return err
}

fi.inode.nodelk.Lock()
fi.inode.node = nd
name := fi.inode.name
parent := fi.inode.parent
fi.inode.nodelk.Unlock()
fi.inode.nodelk.Lock()
fi.inode.node = nd
fi.inode.nodelk.Unlock()
fi.state = stateSynced
fallthrough
case stateSynced:
if !fullsync {
return nil
}
if nd == nil {
fi.inode.nodelk.RLock()
nd = fi.inode.node
fi.inode.nodelk.RUnlock()
}

return parent.closeChild(name, nd, fullsync)
if err := fi.inode.parent.closeChild(fi.inode.name, nd, fullsync); err != nil {
return err
}
fi.state = stateFlushed
return nil
case stateFlushed:
return nil
default:
panic("invalid state")
}
}

// Seek implements io.Seeker
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
if fi.state == stateClosed {
return 0, fmt.Errorf("seek failed: %s", ErrClosed)
}
return fi.mod.Seek(offset, whence)
}

// Write At writes the given bytes at the offset 'at'
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
if fi.perms == OpenReadOnly {
return 0, fmt.Errorf("cannot write on not writeable descriptor")
if err := fi.checkWrite(); err != nil {
return 0, fmt.Errorf("write-at failed: %s", err)
}
fi.hasChanges = true
fi.state = stateDirty
return fi.mod.WriteAt(b, at)
}
Loading