Skip to content

Commit

Permalink
add global config switch for sharding
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Mar 23, 2017
1 parent e876434 commit c4c6653
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 39 deletions.
4 changes: 4 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
uio "github.com/ipfs/go-ipfs/unixfs/io"

ci "gx/ipfs/QmPGxZ1DP2w45WcogpW1h43BvseXbfke9N91qotpoQcUeS/go-libp2p-crypto"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
Expand Down Expand Up @@ -175,6 +176,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}

// TEMP: setting global sharding switch here
uio.UseHAMTSharding = conf.Experimental.ShardingEnabled

opts.HasBloomFilterSize = conf.Datastore.BloomFilterSize
if !cfg.Permament {
opts.HasBloomFilterSize = 0
Expand Down
17 changes: 15 additions & 2 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,24 @@ The JSON output contains type information.

output := make([]LsObject, len(req.Arguments()))
for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(nd.DAG, dagnode)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

links, err := dir.Links()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

output[i] = LsObject{
Hash: paths[i],
Links: make([]LsLink, len(dagnode.Links())),
Links: make([]LsLink, len(links)),
}
for j, link := range dagnode.Links() {

for j, link := range links {
t := unixfspb.Data_DataType(-1)

linkNode, err := link.GetNode(req.Context(), dserv)
Expand Down
9 changes: 6 additions & 3 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,18 @@ func (adder *Adder) PinRoot() error {
func (adder *Adder) Finalize() (node.Node, error) {
root := adder.mr.GetValue()

// cant just call adder.RootNode() here as we need the name for printing
rootNode, err := root.GetNode()
err := root.Flush()
if err != nil {
return nil, err
}

var name string
if !adder.Wrap {
name = rootNode.Links()[0].Name
children, err := root.(*mfs.Directory).ListNames()
if err != nil {
return nil, err
}
name = children[0]

dir, ok := adder.mr.GetValue().(*mfs.Directory)
if !ok {
Expand Down
9 changes: 3 additions & 6 deletions mfs/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewDirectory(ctx context.Context, name string, node node.Node, parent child

// closeChild updates the child by the given name to the dag node 'nd'
// and changes its own dag node
func (d *Directory) closeChild(name string, nd *dag.ProtoNode, sync bool) error {
func (d *Directory) closeChild(name string, nd node.Node, sync bool) error {
mynd, err := d.closeChildUpdate(name, nd, sync)
if err != nil {
return err
Expand All @@ -72,7 +72,7 @@ func (d *Directory) closeChild(name string, nd *dag.ProtoNode, sync bool) error
}

// closeChildUpdate is the portion of closeChild that needs to be locked around
func (d *Directory) closeChildUpdate(name string, nd *dag.ProtoNode, sync bool) (*dag.ProtoNode, error) {
func (d *Directory) closeChildUpdate(name string, nd node.Node, sync bool) (*dag.ProtoNode, error) {
d.lock.Lock()
defer d.lock.Unlock()

Expand Down Expand Up @@ -329,13 +329,10 @@ func (d *Directory) Unlink(name string) error {
}

func (d *Directory) Flush() error {
d.lock.Lock()
nd, err := d.flushCurrentNode()
nd, err := d.GetNode()
if err != nil {
d.lock.Unlock()
return err
}
d.lock.Unlock()

return d.parent.closeChild(d.name, nd, true)
}
Expand Down
2 changes: 1 addition & 1 deletion mfs/mfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func TestMfsHugeDir(t *testing.T) {
defer cancel()
_, rt := setupRoot(ctx, t)

for i := 0; i < 100000; i++ {
for i := 0; i < 10000; i++ {
err := Mkdir(rt, fmt.Sprintf("/dir%d", i), false, false)
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 5 additions & 4 deletions mfs/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package mfs
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -30,7 +31,7 @@ var log = logging.Logger("mfs")
var ErrIsDirectory = errors.New("error: is a directory")

type childCloser interface {
closeChild(string, *dag.ProtoNode, bool) error
closeChild(string, node.Node, bool) error
}

type NodeType int
Expand Down Expand Up @@ -87,7 +88,7 @@ func NewRoot(parent context.Context, ds dag.DAGService, node *dag.ProtoNode, pf
}

switch pbn.GetType() {
case ft.TDirectory:
case ft.TDirectory, ft.THAMTShard:
rval, err := NewDirectory(parent, node.String(), node, root, ds)
if err != nil {
return nil, err
Expand All @@ -101,7 +102,7 @@ func NewRoot(parent context.Context, ds dag.DAGService, node *dag.ProtoNode, pf
}
root.val = fi
default:
panic("unrecognized! (NYI)")
return nil, fmt.Errorf("unrecognized unixfs type: %s", pbn.GetType())
}
return root, nil
}
Expand All @@ -124,7 +125,7 @@ func (kr *Root) Flush() error {

// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *Root) closeChild(name string, nd *dag.ProtoNode, sync bool) error {
func (kr *Root) closeChild(name string, nd node.Node, sync bool) error {
c, err := kr.dserv.Add(nd)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions repo/config/experiments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package config

type Experiments struct {
FilestoreEnabled bool
ShardingEnabled bool
}
18 changes: 0 additions & 18 deletions test/sharness/t0040-add-and-cat.sh
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,6 @@ test_add_named_pipe() {
'
}

test_add_sharded_dir() {
mkdir testdata
for i in `seq 2000`
do
echo $i > testdata/file$i
done

test_expect_success "ipfs add on very large directory succeeds" '
ipfs add -r -q testdata | tail -n1 > sharddir_out &&
echo QmSCJD1KYLhVVHqBK3YyXuoEqHt7vggyJhzoFYbT8v1XYL > sharddir_exp &&
test_cmp sharddir_exp sharddir_out
'
}

test_add_pwd_is_symlink() {
test_expect_success "ipfs add -r adds directory content when ./ is symlink" '
mkdir hellodir &&
Expand Down Expand Up @@ -453,8 +439,6 @@ test_kill_ipfs_daemon

test_add_cat_file

test_add_sharded_dir

test_add_cat_raw

test_expect_success "ipfs add --only-hash succeeds" '
Expand All @@ -475,8 +459,6 @@ test_launch_ipfs_daemon --offline

test_add_cat_file

test_add_sharded_dir

test_kill_ipfs_daemon

test_done
20 changes: 17 additions & 3 deletions test/sharness/t0250-files-api.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ test_sharding() {
ipfs files mkdir /foo
'

test_expect_success "can make 1100 files in a directory" '
test_expect_success "can make 100 files in a directory" '
printf "" > list_exp_raw
for i in `seq 1100`
for i in `seq 100`
do
echo $i | ipfs files write --create /foo/file$i
echo file$i >> list_exp_raw
Expand All @@ -71,6 +71,12 @@ test_sharding() {
echo "65" > file_exp &&
test_cmp file_out file_exp
'

test_expect_success "output object was really sharded" '
ipfs files stat --hash /foo > expected_foo_hash &&
echo QmPkwLJTYZRGPJ8Lazr9qPdrLmswPtUjaDbEpmR9jEh1se > actual_foo_hash &&
test_cmp expected_foo_hash actual_foo_hash
'
}

test_files_api() {
Expand Down Expand Up @@ -508,7 +514,7 @@ test_files_api() {
}

# test offline and online
#test_files_api
test_files_api

test_expect_success "clean up objects from previous test run" '
ipfs repo gc
Expand All @@ -518,6 +524,14 @@ test_launch_ipfs_daemon

ONLINE=1 # set online flag so tests can easily tell
test_files_api
test_kill_ipfs_daemon

test_expect_success "enable sharding in config" '
ipfs config --json Experimental.ShardingEnabled true
'

test_launch_ipfs_daemon
test_sharding
test_kill_ipfs_daemon

test_done
58 changes: 58 additions & 0 deletions test/sharness/t0260-sharding-flag.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/bin/sh
#
# Copyright (c) 2014 Christian Couder
# MIT Licensed; see the LICENSE file in this repository.
#

test_description="Test global enable sharding flag"

. lib/test-lib.sh

test_expect_success "set up test data" '
mkdir testdata
for i in `seq 2000`
do
echo $i > testdata/file$i
done
'

test_add_large_dir() {
exphash="$1"
test_expect_success "ipfs add on very large directory succeeds" '
ipfs add -r -q testdata | tail -n1 > sharddir_out &&
echo "$exphash" > sharddir_exp &&
test_cmp sharddir_exp sharddir_out
'
}

test_init_ipfs

UNSHARDED="QmavrTrQG4VhoJmantURAYuw3bowq3E2WcvP36NRQDAC1N"
test_add_large_dir "$UNSHARDED"

test_launch_ipfs_daemon

test_add_large_dir "$UNSHARDED"

test_kill_ipfs_daemon

test_expect_success "enable sharding" '
ipfs config --json Experimental.ShardingEnabled true
'

SHARDED="QmSCJD1KYLhVVHqBK3YyXuoEqHt7vggyJhzoFYbT8v1XYL"
test_add_large_dir "$SHARDED"

test_launch_ipfs_daemon

test_add_large_dir "$SHARDED"

test_kill_ipfs_daemon

test_expect_success "sharded and unsharded output look the same" '
ipfs ls "$SHARDED" | sort > sharded_out &&
ipfs ls "$UNSHARDED" | sort > unsharded_out &&
test_cmp sharded_out unsharded_out
'

test_done
16 changes: 14 additions & 2 deletions unixfs/io/dirbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
// result in the node being restructured into a sharded object.
var ShardSplitThreshold = 1000

// UseHAMTSharding is a global flag that signifies whether or not to use the
// HAMT sharding scheme for directory creation
var UseHAMTSharding = false

// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256

Expand All @@ -31,7 +35,15 @@ type Directory struct {
func NewDirectory(dserv mdag.DAGService) *Directory {
db := new(Directory)
db.dserv = dserv
db.dirnode = format.EmptyDirNode()
if UseHAMTSharding {
s, err := hamt.NewHamtShard(dserv, DefaultShardWidth)
if err != nil {
panic(err) // will only panic if DefaultShardWidth is a bad value
}
db.shard = s
} else {
db.dirnode = format.EmptyDirNode()
}
return db
}

Expand Down Expand Up @@ -70,7 +82,7 @@ func NewDirectoryFromNode(dserv mdag.DAGService, nd node.Node) (*Directory, erro
// AddChild adds a (name, key)-pair to the root node.
func (d *Directory) AddChild(ctx context.Context, name string, nd node.Node) error {
if d.shard == nil {
if len(d.dirnode.Links()) < ShardSplitThreshold {
if !UseHAMTSharding {
_ = d.dirnode.RemoveNodeLink(name)
return d.dirnode.AddNodeLinkClean(name, nd)
}
Expand Down

0 comments on commit c4c6653

Please sign in to comment.