Skip to content

Commit

Permalink
Implement basic filestore 'no-copy' functionality
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Jan 24, 2017
1 parent 416f025 commit 6176525
Show file tree
Hide file tree
Showing 25 changed files with 758 additions and 32 deletions.
6 changes: 5 additions & 1 deletion blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
}

func (s *blockstore) DeleteBlock(k *cid.Cid) error {
return s.datastore.Delete(dshelp.CidToDsKey(k))
err := s.datastore.Delete(dshelp.CidToDsKey(k))
if err == ds.ErrNotFound {
return ErrNotFound
}
return err
}

// AllKeysChan runs a query for keys from the blockstore.
Expand Down
6 changes: 3 additions & 3 deletions commands/files/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type File interface {
// they are not directories
io.ReadCloser

// FileName returns a filename path associated with this file
// FileName returns a filename associated with this file
FileName() string

// FullPath returns the full path in the os associated with this file
// FullPath returns the full path used when adding this file
FullPath() string

// IsDirectory returns true if the File is a directory (and therefore
Expand Down Expand Up @@ -57,6 +57,6 @@ type SizeFile interface {
}

type FileInfo interface {
FullPath() string
AbsPath() string
Stat() os.FileInfo
}
11 changes: 10 additions & 1 deletion commands/files/multipartfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (

applicationDirectory = "application/x-directory"
applicationSymlink = "application/symlink"
applicationFile = "application/octet-stream"

contentTypeHeader = "Content-Type"
)
Expand All @@ -34,7 +35,8 @@ func NewFileFromPart(part *multipart.Part) (File, error) {
}

contentType := part.Header.Get(contentTypeHeader)
if contentType == applicationSymlink {
switch contentType {
case applicationSymlink:
out, err := ioutil.ReadAll(part)
if err != nil {
return nil, err
Expand All @@ -44,6 +46,13 @@ func NewFileFromPart(part *multipart.Part) (File, error) {
Target: string(out),
name: f.FileName(),
}, nil
case applicationFile:
return &ReaderFile{
reader: part,
filename: f.FileName(),
abspath: part.Header.Get("abspath"),
fullpath: f.FullPath(),
}, nil
}

var err error
Expand Down
17 changes: 16 additions & 1 deletion commands/files/readerfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,30 @@ import (
"errors"
"io"
"os"
"path/filepath"
)

// ReaderFile is a implementation of File created from an `io.Reader`.
// ReaderFiles are never directories, and can be read from and closed.
type ReaderFile struct {
filename string
fullpath string
abspath string
reader io.ReadCloser
stat os.FileInfo
}

func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile {
return &ReaderFile{filename, path, reader, stat}
return &ReaderFile{filename, path, path, reader, stat}
}

func NewReaderPathFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) (*ReaderFile, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, err
}

return &ReaderFile{filename, path, abspath, reader, stat}, nil
}

func (f *ReaderFile) IsDirectory() bool {
Expand All @@ -35,6 +46,10 @@ func (f *ReaderFile) FullPath() string {
return f.fullpath
}

func (f *ReaderFile) AbsPath() string {
return f.abspath
}

func (f *ReaderFile) Read(p []byte) (int, error) {
return f.reader.Read(p)
}
Expand Down
3 changes: 2 additions & 1 deletion commands/files/serialfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type serialFile struct {
}

func NewSerialFile(name, path string, hidden bool, stat os.FileInfo) (File, error) {

switch mode := stat.Mode(); {
case mode.IsRegular():
file, err := os.Open(path)
if err != nil {
return nil, err
}
return NewReaderFile(name, path, file, stat), nil
return NewReaderPathFile(name, path, file, stat)
case mode.IsDir():
// for directories, stat all of the contents first, so we know what files to
// open when NextFile() is called
Expand Down
3 changes: 3 additions & 0 deletions commands/http/multifilereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func (mfr *MultiFileReader) Read(buf []byte) (written int, err error) {
header.Set("Content-Disposition", fmt.Sprintf("file; filename=\"%s\"", filename))

header.Set("Content-Type", contentType)
if rf, ok := file.(*files.ReaderFile); ok {
header.Set("abspath", rf.AbsPath())
}

_, err := mfr.mpWriter.CreatePart(header)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
filestore "github.com/ipfs/go-ipfs/filestore"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
Expand Down Expand Up @@ -166,8 +167,8 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
TempErrFunc: isTooManyFDError,
}

var err error
bs := bstore.NewBlockstore(rds)

opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
Expand All @@ -184,7 +185,14 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}

n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker())
n.BaseBlocks = cbs
n.GCLocker = bstore.NewGCLocker()
n.Blockstore = bstore.NewGCBlockstore(cbs, n.GCLocker)

if conf.Experimental.FilestoreEnabled {
n.Filestore = filestore.NewFilestore(bs, n.Repo.FileManager())
n.Blockstore = bstore.NewGCBlockstore(n.Filestore, n.GCLocker)
}

rcfg, err := n.Repo.Config()
if err != nil {
Expand Down
47 changes: 33 additions & 14 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-ipfs/core/coreunix"
"gx/ipfs/QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs/pb"

bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
blockservice "github.com/ipfs/go-ipfs/blockservice"
cmds "github.com/ipfs/go-ipfs/commands"
files "github.com/ipfs/go-ipfs/commands/files"
Expand All @@ -23,16 +24,18 @@ import (
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")

const (
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
noCopyOptionName = "nocopy"
fstoreCacheOptionName = "fscache"
)

var AddCmd = &cmds.Command{
Expand Down Expand Up @@ -78,6 +81,8 @@ You can now refer to the added file in a gateway, like so:
cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."),
cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true),
cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"),
cmds.BoolOption(noCopyOptionName, "Add the file using filestore. (experimental)"),
cmds.BoolOption(fstoreCacheOptionName, "Check the filestore for pre-existing blocks. (experimental)"),
},
PreRun: func(req cmds.Request) error {
quiet, _, _ := req.Option(quietOptionName).Bool()
Expand Down Expand Up @@ -140,6 +145,13 @@ You can now refer to the added file in a gateway, like so:
chunker, _, _ := req.Option(chunkerOptionName).String()
dopin, _, _ := req.Option(pinOptionName).Bool()
rawblks, _, _ := req.Option(rawLeavesOptionName).Bool()
nocopy, _, _ := req.Option(noCopyOptionName).Bool()
fscache, _, _ := req.Option(fstoreCacheOptionName).Bool()

if nocopy && !rawblks {
res.SetError(fmt.Errorf("nocopy option requires '--raw-leaves' to be enabled as well"), cmds.ErrNormal)
return
}

if hash {
nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{
Expand All @@ -154,14 +166,20 @@ You can now refer to the added file in a gateway, like so:
n = nilnode
}

dserv := n.DAG
addblockstore := n.Blockstore
if !fscache && !nocopy {
addblockstore = bstore.NewGCBlockstore(n.BaseBlocks, n.GCLocker)
}

exch := n.Exchange
local, _, _ := req.Option("local").Bool()
if local {
offlineexch := offline.Exchange(n.Blockstore)
bserv := blockservice.New(n.Blockstore, offlineexch)
dserv = dag.NewDAGService(bserv)
exch = offline.Exchange(addblockstore)
}

bserv := blockservice.New(addblockstore, exch)
dserv := dag.NewDAGService(bserv)

outChan := make(chan interface{}, 8)
res.SetOutput((<-chan interface{})(outChan))

Expand All @@ -180,6 +198,7 @@ You can now refer to the added file in a gateway, like so:
fileAdder.Pin = dopin
fileAdder.Silent = silent
fileAdder.RawLeaves = rawblks
fileAdder.NoCopy = nocopy

if hash {
md := dagtest.Mock()
Expand Down
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
filestore "github.com/ipfs/go-ipfs/filestore"
mount "github.com/ipfs/go-ipfs/fuse/mount"
merkledag "github.com/ipfs/go-ipfs/merkledag"
mfs "github.com/ipfs/go-ipfs/mfs"
Expand Down Expand Up @@ -107,6 +108,9 @@ type IpfsNode struct {
// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore // the filestore blockstore
BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Expand Down
2 changes: 2 additions & 0 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Adder struct {
RawLeaves bool
Silent bool
Wrap bool
NoCopy bool
Chunker string
root node.Node
mr *mfs.Root
Expand All @@ -124,6 +125,7 @@ func (adder Adder) add(reader io.Reader) (node.Node, error) {
Dagserv: adder.dagService,
RawLeaves: adder.RawLeaves,
Maxlinks: ihelper.DefaultLinksPerBlock,
NoCopy: adder.NoCopy,
}

if adder.Trickle {
Expand Down
Loading

0 comments on commit 6176525

Please sign in to comment.