Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splitstore: support on-disk marksets using badger #6833

Merged
merged 16 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions blockstore/splitstore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,38 @@ If you intend to use the discard coldstore, your also need to add the following:
ColdStoreType = "discard"
vyzo marked this conversation as resolved.
Show resolved Hide resolved
```
In general you _should not_ have to use the discard store, unless you
are running a network booster or have very constrained hardware with
not enough disk space to maintain a coldstore, even with garbage
collection.
are running a network assistive node (like a bootstrapper or booster)
or have very constrained hardware with not enough disk space to
maintain a coldstore, even with garbage collection. It is also appropriate
for small nodes that are simply watching the chain.

*Warning:* Using the discard store for a general purpose node is discouraged, unless
you really know what you are doing. Use it at your own risk.

## Configuration Options

These are options in the `[Chainstore.Splitstore]` section of the configuration:

- `HotStoreType` -- specifies the type of hotstore to use.
The only currently supported option is `"badger"`.
- `ColdStoreType` -- specifies the type of coldstore to use.
The default value is `"universal"`, which will use the initial monolith blockstore
as the coldstore.
The other possible value is `"discard"`, as outlined above, which is specialized for
running without a coldstore. Note that the discard store wraps the initial monolith
blockstore and discards writes; this is necessary to support syncing from a snapshot.
- `MarkSetType` -- specifies the type of markset to use during compaction.
vyzo marked this conversation as resolved.
Show resolved Hide resolved
The markset is the data structure used by compaction/gc to track live objects.
The default value is `"map"`, which will use an in-memory map; if you are limited
in memory (or indeed see compaction run out of memory), you can also specify
`"badger"` which will use an disk backed markset, using badger. This will use
much less memory, but will also make compaction slower.
- `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4
finalities maintained by default, to maintain messages and message receipts in the
hotstore. This is useful for assistive nodes that want to support syncing for other
nodes beyond 4 finalities, while running with the discard coldstore option.
It is also useful for miners who accept deals and need to lookback messages beyond
the 4 finalities, which would otherwise hit the coldstore.


## Operation
Expand Down Expand Up @@ -67,6 +96,6 @@ Compaction works transactionally with the following algorithm:
- We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
- We then end the transaction and compact/gc the hotstore.

## Coldstore Garbage Collection
## Garbage Collection

TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)
2 changes: 2 additions & 0 deletions blockstore/splitstore/markset.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
return NewBloomMarkSetEnv()
case "map":
return NewMapMarkSetEnv()
case "badger":
return NewBadgerMarkSetEnv(path)
default:
return nil, xerrors.Errorf("unknown mark set type %s", mtype)
}
Expand Down
230 changes: 230 additions & 0 deletions blockstore/splitstore/markset_badger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package splitstore

import (
"os"
"path/filepath"
"sync"

"golang.org/x/xerrors"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"go.uber.org/zap"

cid "github.com/ipfs/go-cid"
)

type BadgerMarkSetEnv struct {
path string
}

var _ MarkSetEnv = (*BadgerMarkSetEnv)(nil)

type BadgerMarkSet struct {
mx sync.RWMutex
cond sync.Cond
pend map[string]struct{}
writing map[int]map[string]struct{}
writers int
seqno int

db *badger.DB
path string
}

var _ MarkSet = (*BadgerMarkSet)(nil)

var badgerMarkSetBatchSize = 16384

func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
msPath := filepath.Join(path, "markset.badger")
err := os.MkdirAll(msPath, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}

return &BadgerMarkSetEnv{path: msPath}, nil
}

func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
path := filepath.Join(e.path, name)
raulk marked this conversation as resolved.
Show resolved Hide resolved

// clean up first
err := os.RemoveAll(path)
if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
}

err = os.MkdirAll(path, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}

opts := badger.DefaultOptions(path)
opts.SyncWrites = false
raulk marked this conversation as resolved.
Show resolved Hide resolved
opts.CompactL0OnClose = false
opts.Compression = options.None
// Note: We use FileIO for loading modes to avoid memory thrashing and interference
// between the system blockstore and the markset.
// It was observed that using the default memory mapped option resulted in
// significant interference and unacceptably high block validation times once the markset
// exceeded 1GB in size.
opts.TableLoadingMode = options.FileIO
opts.ValueLogLoadingMode = options.FileIO
vyzo marked this conversation as resolved.
Show resolved Hide resolved
opts.Logger = &badgerLogger{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}

db, err := badger.Open(opts)
if err != nil {
return nil, xerrors.Errorf("error creating badger markset: %w", err)
}

ms := &BadgerMarkSet{
pend: make(map[string]struct{}),
writing: make(map[int]map[string]struct{}),
db: db,
path: path,
}
ms.cond.L = &ms.mx

return ms, nil
}

func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
}

func (s *BadgerMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()

if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}

s.pend[string(c.Hash())] = struct{}{}

if len(s.pend) < badgerMarkSetBatchSize {
s.mx.Unlock()
return nil
}

pend := s.pend
seqno := s.seqno
s.seqno++
s.writing[seqno] = pend
s.pend = make(map[string]struct{})
s.writers++
s.mx.Unlock()

defer func() {
s.mx.Lock()
defer s.mx.Unlock()

delete(s.writing, seqno)
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()

empty := []byte{} // not nil

batch := s.db.NewWriteBatch()
defer batch.Cancel()

for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
return err
}
}

err := batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}

return nil
}

func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock()
defer s.mx.RUnlock()

if s.pend == nil {
return false, errMarkSetClosed
}

key := c.Hash()
pendKey := string(key)
_, ok := s.pend[pendKey]
if ok {
return true, nil
}

for _, wr := range s.writing {
_, ok := wr[pendKey]
if ok {
return true, nil
}
}

err := s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})

switch err {
case nil:
return true, nil

case badger.ErrKeyNotFound:
return false, nil

default:
return false, xerrors.Errorf("error checking badger markset: %w", err)
}
}

func (s *BadgerMarkSet) Close() error {
s.mx.Lock()
defer s.mx.Unlock()

if s.pend == nil {
return nil
}

for s.writers > 0 {
s.cond.Wait()
}

s.pend = nil
db := s.db
s.db = nil

err := db.Close()
if err != nil {
return xerrors.Errorf("error closing badger markset: %w", err)
}

err = os.RemoveAll(s.path)
if err != nil {
return xerrors.Errorf("error deleting badger markset: %w", err)
}
raulk marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

func (s *BadgerMarkSet) SetConcurrent() {}

// badger logging through go-log
type badgerLogger struct {
*zap.SugaredLogger
skip2 *zap.SugaredLogger
}

func (b *badgerLogger) Warningf(format string, args ...interface{}) {}
func (b *badgerLogger) Infof(format string, args ...interface{}) {}
func (b *badgerLogger) Debugf(format string, args ...interface{}) {}
9 changes: 9 additions & 0 deletions blockstore/splitstore/markset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ func TestBloomMarkSet(t *testing.T) {
testMarkSet(t, "bloom")
}

func TestBadgerMarkSet(t *testing.T) {
bs := badgerMarkSetBatchSize
badgerMarkSetBatchSize = 1
t.Cleanup(func() {
badgerMarkSetBatchSize = bs
})
testMarkSet(t, "badger")
}

func testMarkSet(t *testing.T, lsType string) {
t.Helper()

Expand Down
7 changes: 5 additions & 2 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ func init() {
type Config struct {
// MarkSetType is the type of mark set to use.
//
// Only current sane value is "map", but we may add an option for a disk-backed
// markset for memory-constrained situations.
// The default value is "map", which uses an in-memory map-backed markset.
// If you are constrained in memory (i.e. compaction runs out of memory), you
// can use "badger", which will use a disk-backed markset using badger.
// Note that compaction will take quite a bit longer when using the "badger" option,
// but that shouldn't really matter (as long as it is under 7.5hrs).
MarkSetType string

// DiscardColdBlocks indicates whether to skip moving cold blocks to the coldstore.
Expand Down
26 changes: 0 additions & 26 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,6 @@ func (s *SplitStore) trackTxnRef(c cid.Cid) {
return
}

if s.txnProtect != nil {
mark, err := s.txnProtect.Has(c)
if err != nil {
log.Warnf("error checking markset: %s", err)
// track it anyways
} else if mark {
return
}
}

raulk marked this conversation as resolved.
Show resolved Hide resolved
s.txnRefsMx.Lock()
s.txnRefs[c] = struct{}{}
s.txnRefsMx.Unlock()
Expand All @@ -209,27 +199,11 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
s.txnRefsMx.Lock()
defer s.txnRefsMx.Unlock()

quiet := false
for _, c := range cids {
if isUnitaryObject(c) {
continue
}

if s.txnProtect != nil {
mark, err := s.txnProtect.Has(c)
if err != nil {
if !quiet {
quiet = true
log.Warnf("error checking markset: %s", err)
}
// track it anyways
}

if mark {
continue
}
}

raulk marked this conversation as resolved.
Show resolved Hide resolved
s.txnRefs[c] = struct{}{}
}

Expand Down
9 changes: 9 additions & 0 deletions blockstore/splitstore/splitstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,15 @@ func TestSplitStoreCompaction(t *testing.T) {
testSplitStore(t, &Config{MarkSetType: "map"})
}

func TestSplitStoreCompactionWithBadger(t *testing.T) {
bs := badgerMarkSetBatchSize
badgerMarkSetBatchSize = 1
t.Cleanup(func() {
badgerMarkSetBatchSize = bs
})
testSplitStore(t, &Config{MarkSetType: "badger"})
}

type mockChain struct {
t testing.TB

Expand Down