Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add support for multiple blockstores #3257

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
var log = logging.Logger("blockstore")

// BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("blocks")
const DefaultPrefix = "/blocks"

var blockPrefix = ds.NewKey(DefaultPrefix)

var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
Expand Down Expand Up @@ -71,20 +73,23 @@ type gcBlockstore struct {
}

func NewBlockstore(d ds.Batching) *blockstore {
return NewBlockstoreWPrefix(d, DefaultPrefix)
}

func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore {
var dsb ds.Batching
dd := dsns.Wrap(d, BlockPrefix)
prefixKey := ds.NewKey(prefix)
dd := dsns.Wrap(d, prefixKey)
dsb = dd
return &blockstore{
datastore: dsb,
prefix: prefixKey,
}
}

type blockstore struct {
datastore ds.Batching

lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
prefix ds.Key

rehash bool
}
Expand Down Expand Up @@ -130,11 +135,8 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
func (bs *blockstore) Put(block blocks.Block) error {
k := dshelp.CidToDsKey(block.Cid())

// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
if err == nil && exists {
return nil // already stored.
}
// Note: The Has Check is now done by the MultiBlockstore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree its probably okay to remove this and do it intentionally in a certain place (instead of at every layer). But this gets a bit weird... For example, if someone just wants to use the blockstore on its own, they don't get this nice optimization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure about that? MultiBlockstore just calls Has method of Blockstore but it doesn't mean that Blockstore (on Datastore) will check the Has of datastore.

Copy link
Member

@Kubuxu Kubuxu Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPDATE: ahh, you call an explicit Has.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So can I take it that this is resolved?


return bs.datastore.Put(k, block.RawData())
}

Expand All @@ -145,11 +147,6 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {
}
for _, b := range blocks {
k := dshelp.CidToDsKey(b.Cid())
exists, err := bs.datastore.Has(k)
if err == nil && exists {
continue
}

err = t.Put(k, b.RawData())
if err != nil {
return err
Expand All @@ -175,7 +172,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
// datastore/namespace does *NOT* fix up Query.Prefix
q.Prefix = BlockPrefix.String()
q.Prefix = bs.prefix.String()
res, err := bs.datastore.Query(q)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestAllKeysRespectsContext(t *testing.T) {
default:
}

e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()}
e := dsq.Entry{Key: blockPrefix.ChildString("foo").String()}
resultChan <- dsq.Result{Entry: e} // let it go.
close(resultChan)
<-done // should be done now.
Expand All @@ -190,7 +190,7 @@ func TestValueTypeMismatch(t *testing.T) {
block := blocks.NewBlock([]byte("some data"))

datastore := ds.NewMapDatastore()
k := BlockPrefix.Child(dshelp.CidToDsKey(block.Cid()))
k := blockPrefix.Child(dshelp.CidToDsKey(block.Cid()))
datastore.Put(k, "data that isn't a block!")

blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
Expand Down
6 changes: 3 additions & 3 deletions blocks/blockstore/bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestHasIsBloomCached(t *testing.T) {
block := blocks.NewBlock([]byte("newBlock"))

cachedbs.PutMany([]blocks.Block{block})
if cacheFails != 2 {
t.Fatalf("expected two datastore hits: %d", cacheFails)
if cacheFails != 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this was correct, the change was due to an implementation detail. Unfortually, I can't remember why. If it is important enough I will spend an hour or two looking into it.

Copy link
Member

@whyrusleeping whyrusleeping Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It always weirds me out when i see tests changing. Maybe @Kubuxu is more familiar with this and can check it?

Copy link
Member

@Kubuxu Kubuxu Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to because PutMany on default blockstore was accessing datastore two times, once checking Has for a key and then calling Put.

This means that now we don't check the datastore with Has we just Put the value.

https://github.com/ipfs/go-ipfs/pull/3257/files#r80438802

t.Fatalf("expected datastore hits: %d", cacheFails)
}
cachedbs.Put(block)
if cacheFails != 3 {
if cacheFails != 2 {
t.Fatalf("expected datastore hit: %d", cacheFails)
}

Expand Down
158 changes: 158 additions & 0 deletions blocks/blockstore/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package blockstore

// A very simple multi-blockstore that analogous to a unionfs Put and
// DeleteBlock only go to the first blockstore all others are
// considered readonly.

import (
//"errors"
"context"

blocks "github.com/ipfs/go-ipfs/blocks"
cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid"
dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query"
)

type LocateInfo struct {
Prefix string
Error error
}

type MultiBlockstore interface {
Blockstore
GCLocker
FirstMount() Blockstore
Mounts() []string
Mount(prefix string) Blockstore
Locate(*cid.Cid) []LocateInfo
}

type Mount struct {
Prefix string
Blocks Blockstore
}

func NewMultiBlockstore(mounts ...Mount) *multiblockstore {
return &multiblockstore{
mounts: mounts,
}
}

type multiblockstore struct {
mounts []Mount
gclocker
}

func (bs *multiblockstore) FirstMount() Blockstore {
return bs.mounts[0].Blocks
}

func (bs *multiblockstore) Mounts() []string {
mounts := make([]string, 0, len(bs.mounts))
for _, mnt := range bs.mounts {
mounts = append(mounts, mnt.Prefix)
}
return mounts
}

func (bs *multiblockstore) Mount(prefix string) Blockstore {
for _, m := range bs.mounts {
if m.Prefix == prefix {
return m.Blocks
}
}
return nil
}

func (bs *multiblockstore) DeleteBlock(key *cid.Cid) error {
return bs.mounts[0].Blocks.DeleteBlock(key)
}

func (bs *multiblockstore) Has(c *cid.Cid) (bool, error) {
var firstErr error
for _, m := range bs.mounts {
have, err := m.Blocks.Has(c)
if have && err == nil {
return have, nil
}
if err != nil && firstErr == nil {
firstErr = err
}
}
return false, firstErr
}

func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) {
var firstErr error
for _, m := range bs.mounts {
blk, err := m.Blocks.Get(c)
if err == nil {
return blk, nil
}
if firstErr == nil || firstErr == ErrNotFound {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would replace it with firstErr == nil && err != ErrNotFound

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I do that than firstErr could end up being nil even if the block was not found. The idea is to return ErrNotFound only if that is the case for all the blockstore's. If not than return the more serious error.

firstErr = err
}
}
return nil, firstErr
}

func (bs *multiblockstore) Locate(c *cid.Cid) []LocateInfo {
res := make([]LocateInfo, 0, len(bs.mounts))
for _, m := range bs.mounts {
_, err := m.Blocks.Get(c)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should use Has

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with that. Not 100% happy with it as filestore Has doesn't really guarantee the block is available (and changing it so it does will make the Has() call really expensive), but its not a major problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I partly take the Has() comment back. The filestore Has() needs fixing or the Put below needs fixing.

res = append(res, LocateInfo{m.Prefix, err})
}
return res
}

func (bs *multiblockstore) Put(blk blocks.Block) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if i add a file with the filestore, then i add another file normally that overlaps a block with it. one block from my 'normal' add will be referencing a file on disk?

That feels odd to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but rather difficult to avoid.

// First call Has() to make sure the block doesn't exist in any of
// the sub-blockstores, otherwise we could end with data being
// duplicated in two blockstores.
exists, err := bs.Has(blk.Cid())
if err == nil && exists {
return nil // already stored
}
return bs.mounts[0].Blocks.Put(blk)
}

func (bs *multiblockstore) PutMany(blks []blocks.Block) error {
stilladd := make([]blocks.Block, 0, len(blks))
// First call Has() to make sure the block doesn't exist in any of
// the sub-blockstores, otherwise we could end with data being
// duplicated in two blockstores.
for _, blk := range blks {
exists, err := bs.Has(blk.Cid())
if err == nil && exists {
continue // already stored
}
stilladd = append(stilladd, blk)
}
if len(stilladd) == 0 {
return nil
}
return bs.mounts[0].Blocks.PutMany(stilladd)
}

func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
//return bs.mounts[0].Blocks.AllKeysChan(ctx)
//return nil, errors.New("Unimplemented")
in := make([]<-chan *cid.Cid, 0, len(bs.mounts))
for _, m := range bs.mounts {
ch, err := m.Blocks.AllKeysChan(ctx)
if err != nil {
return nil, err
}
in = append(in, ch)
}
out := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer close(out)
for _, in0 := range in {
for key := range in0 {
out <- key
}
}
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If @whyrusleeping agrees with me I would open a goroutine per BS and make them pipe from one AllKeysChan to one external. This way if first BS is slow and second is fast, the first won't slow down whole process.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return out, nil
}
29 changes: 24 additions & 5 deletions blocks/blockstore/util/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,23 @@ type RmBlocksOpts struct {
Force bool
}

func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error {
func RmBlocks(mbs bs.MultiBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error {
prefix := opts.Prefix
if prefix == "" {
prefix = mbs.Mounts()[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this only remove from one blockstore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

}
blocks := mbs.Mount(prefix)
if blocks == nil {
return fmt.Errorf("Could not find blockstore: %s\n", prefix)
}

go func() {
defer close(out)

unlocker := blocks.GCLock()
unlocker := mbs.GCLock()
defer unlocker.Unlock()

stillOkay := FilterPinned(pins, out, cids)
stillOkay := FilterPinned(mbs, pins, out, cids, prefix)

for _, c := range stillOkay {
err := blocks.DeleteBlock(c)
Expand All @@ -50,15 +59,15 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, c
return nil
}

func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*cid.Cid {
func FilterPinned(mbs bs.MultiBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, prefix string) []*cid.Cid {
stillOkay := make([]*cid.Cid, 0, len(cids))
res, err := pins.CheckIfPinned(cids...)
if err != nil {
out <- &RemovedBlock{Error: fmt.Sprintf("pin check failed: %s", err)}
return nil
}
for _, r := range res {
if !r.Pinned() {
if !r.Pinned() || AvailableElsewhere(mbs, prefix, r.Key) {
stillOkay = append(stillOkay, r.Key)
} else {
out <- &RemovedBlock{
Expand All @@ -70,6 +79,16 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c
return stillOkay
}

func AvailableElsewhere(mbs bs.MultiBlockstore, prefix string, c *cid.Cid) bool {
locations := mbs.Locate(c)
for _, loc := range locations {
if loc.Error == nil && loc.Prefix != prefix {
return true
}
}
return false
}

func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error {
someFailed := false
for res := range in {
Expand Down
7 changes: 5 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"

context "context"
retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore"
Expand Down Expand Up @@ -167,7 +168,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}

var err error
bs := bstore.NewBlockstore(rds)
bs := bstore.NewBlockstoreWPrefix(rds, fsrepo.CacheMount)
opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
Expand All @@ -184,7 +185,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}

n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker())
mounts := []bstore.Mount{{fsrepo.CacheMount, cbs}}

n.Blockstore = bstore.NewMultiBlockstore(mounts...)

rcfg, err := n.Repo.Config()
if err != nil {
Expand Down
Loading