diff --git a/blockstore/splitstore/README.md b/blockstore/splitstore/README.md index f69a056ca43..1490004cf03 100644 --- a/blockstore/splitstore/README.md +++ b/blockstore/splitstore/README.md @@ -49,10 +49,11 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration: blockstore and discards writes; this is necessary to support syncing from a snapshot. - `MarkSetType` -- specifies the type of markset to use during compaction. The markset is the data structure used by compaction/gc to track live objects. - The default value is `"map"`, which will use an in-memory map; if you are limited - in memory (or indeed see compaction run out of memory), you can also specify - `"badger"` which will use an disk backed markset, using badger. This will use - much less memory, but will also make compaction slower. + The default value is "badger", which will use a disk backed markset using badger. + If you have a lot of memory (48G or more) you can also use "map", which will use + an in memory markset, speeding up compaction at the cost of higher memory usage. + Note: If you are using a VPS with a network volume, you need to provision at least + 3000 IOPs with the badger markset. - `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4 finalities maintained by default, to maintain messages and message receipts in the hotstore. This is useful for assistive nodes that want to support syncing for other @@ -105,6 +106,12 @@ Compaction works transactionally with the following algorithm: - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live - We then end the transaction and compact/gc the hotstore. +As of [#8008](https://github.com/filecoin-project/lotus/pull/8008) the compaction algorithm has been +modified to eliminate sorting and maintain the cold object set on disk. This drastically reduces +memory usage; in fact, when using badger as the markset compaction uses very little memory, and +it should be now possible to run splitstore with 32GB of RAM or less without danger of running out of +memory during compaction. + ## Garbage Collection TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577) diff --git a/blockstore/splitstore/checkpoint.go b/blockstore/splitstore/checkpoint.go new file mode 100644 index 00000000000..d3cd4cba7dc --- /dev/null +++ b/blockstore/splitstore/checkpoint.go @@ -0,0 +1,118 @@ +package splitstore + +import ( + "bufio" + "io" + "os" + + "golang.org/x/xerrors" + + cid "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" +) + +type Checkpoint struct { + file *os.File + buf *bufio.Writer +} + +func NewCheckpoint(path string) (*Checkpoint, error) { + file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_SYNC, 0644) + if err != nil { + return nil, xerrors.Errorf("error creating checkpoint: %w", err) + } + buf := bufio.NewWriter(file) + + return &Checkpoint{ + file: file, + buf: buf, + }, nil +} + +func OpenCheckpoint(path string) (*Checkpoint, cid.Cid, error) { + filein, err := os.Open(path) + if err != nil { + return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for reading: %w", err) + } + defer filein.Close() //nolint:errcheck + + bufin := bufio.NewReader(filein) + start, err := readRawCid(bufin, nil) + if err != nil && err != io.EOF { + return nil, cid.Undef, xerrors.Errorf("error reading cid from checkpoint: %w", err) + } + + fileout, err := os.OpenFile(path, os.O_WRONLY|os.O_SYNC, 0644) + if err != nil { + return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for writing: %w", err) + } + bufout := bufio.NewWriter(fileout) + + return &Checkpoint{ + file: fileout, + buf: bufout, + }, start, nil +} + +func (cp *Checkpoint) Set(c cid.Cid) error { + if _, err := cp.file.Seek(0, io.SeekStart); err != nil { + return xerrors.Errorf("error seeking beginning of checkpoint: %w", err) + } + + if err := writeRawCid(cp.buf, c, true); err != nil { + return xerrors.Errorf("error writing cid to checkpoint: %w", err) + } + + return nil +} + +func (cp *Checkpoint) Close() error { + if cp.file == nil { + return nil + } + + err := cp.file.Close() + cp.file = nil + cp.buf = nil + + return err +} + +func readRawCid(buf *bufio.Reader, hbuf []byte) (cid.Cid, error) { + sz, err := buf.ReadByte() + if err != nil { + return cid.Undef, err // don't wrap EOF as it is not an error here + } + + if hbuf == nil { + hbuf = make([]byte, int(sz)) + } else { + hbuf = hbuf[:int(sz)] + } + + if _, err := io.ReadFull(buf, hbuf); err != nil { + return cid.Undef, xerrors.Errorf("error reading hash: %w", err) // wrap EOF, it's corrupt + } + + hash, err := mh.Cast(hbuf) + if err != nil { + return cid.Undef, xerrors.Errorf("error casting multihash: %w", err) + } + + return cid.NewCidV1(cid.Raw, hash), nil +} + +func writeRawCid(buf *bufio.Writer, c cid.Cid, flush bool) error { + hash := c.Hash() + if err := buf.WriteByte(byte(len(hash))); err != nil { + return err + } + if _, err := buf.Write(hash); err != nil { + return err + } + if flush { + return buf.Flush() + } + + return nil +} diff --git a/blockstore/splitstore/checkpoint_test.go b/blockstore/splitstore/checkpoint_test.go new file mode 100644 index 00000000000..4fefe40cf6f --- /dev/null +++ b/blockstore/splitstore/checkpoint_test.go @@ -0,0 +1,147 @@ +package splitstore + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" +) + +func TestCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "checkpoint.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(dir) + }) + + path := filepath.Join(dir, "checkpoint") + + makeCid := func(key string) cid.Cid { + h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + + return cid.NewCidV1(cid.Raw, h) + } + + k1 := makeCid("a") + k2 := makeCid("b") + k3 := makeCid("c") + k4 := makeCid("d") + + cp, err := NewCheckpoint(path) + if err != nil { + t.Fatal(err) + } + + if err := cp.Set(k1); err != nil { + t.Fatal(err) + } + if err := cp.Set(k2); err != nil { + t.Fatal(err) + } + + if err := cp.Close(); err != nil { + t.Fatal(err) + } + + cp, start, err := OpenCheckpoint(path) + if err != nil { + t.Fatal(err) + } + if !start.Equals(k2) { + t.Fatalf("expected start to be %s; got %s", k2, start) + } + + if err := cp.Set(k3); err != nil { + t.Fatal(err) + } + if err := cp.Set(k4); err != nil { + t.Fatal(err) + } + + if err := cp.Close(); err != nil { + t.Fatal(err) + } + + cp, start, err = OpenCheckpoint(path) + if err != nil { + t.Fatal(err) + } + if !start.Equals(k4) { + t.Fatalf("expected start to be %s; got %s", k4, start) + } + + if err := cp.Close(); err != nil { + t.Fatal(err) + } + + // also test correct operation with an empty checkpoint + cp, err = NewCheckpoint(path) + if err != nil { + t.Fatal(err) + } + + if err := cp.Close(); err != nil { + t.Fatal(err) + } + + cp, start, err = OpenCheckpoint(path) + if err != nil { + t.Fatal(err) + } + + if start.Defined() { + t.Fatal("expected start to be undefined") + } + + if err := cp.Set(k1); err != nil { + t.Fatal(err) + } + if err := cp.Set(k2); err != nil { + t.Fatal(err) + } + + if err := cp.Close(); err != nil { + t.Fatal(err) + } + + cp, start, err = OpenCheckpoint(path) + if err != nil { + t.Fatal(err) + } + if !start.Equals(k2) { + t.Fatalf("expected start to be %s; got %s", k2, start) + } + + if err := cp.Set(k3); err != nil { + t.Fatal(err) + } + if err := cp.Set(k4); err != nil { + t.Fatal(err) + } + + if err := cp.Close(); err != nil { + t.Fatal(err) + } + + cp, start, err = OpenCheckpoint(path) + if err != nil { + t.Fatal(err) + } + if !start.Equals(k4) { + t.Fatalf("expected start to be %s; got %s", k4, start) + } + + if err := cp.Close(); err != nil { + t.Fatal(err) + } + +} diff --git a/blockstore/splitstore/coldset.go b/blockstore/splitstore/coldset.go new file mode 100644 index 00000000000..129e2ed9226 --- /dev/null +++ b/blockstore/splitstore/coldset.go @@ -0,0 +1,102 @@ +package splitstore + +import ( + "bufio" + "io" + "os" + + "golang.org/x/xerrors" + + cid "github.com/ipfs/go-cid" +) + +type ColdSetWriter struct { + file *os.File + buf *bufio.Writer +} + +type ColdSetReader struct { + file *os.File + buf *bufio.Reader +} + +func NewColdSetWriter(path string) (*ColdSetWriter, error) { + file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + return nil, xerrors.Errorf("error creating coldset: %w", err) + } + buf := bufio.NewWriter(file) + + return &ColdSetWriter{ + file: file, + buf: buf, + }, nil +} + +func NewColdSetReader(path string) (*ColdSetReader, error) { + file, err := os.Open(path) + if err != nil { + return nil, xerrors.Errorf("error opening coldset: %w", err) + } + buf := bufio.NewReader(file) + + return &ColdSetReader{ + file: file, + buf: buf, + }, nil +} + +func (s *ColdSetWriter) Write(c cid.Cid) error { + return writeRawCid(s.buf, c, false) +} + +func (s *ColdSetWriter) Close() error { + if s.file == nil { + return nil + } + + err1 := s.buf.Flush() + err2 := s.file.Close() + s.buf = nil + s.file = nil + + if err1 != nil { + return err1 + } + return err2 +} + +func (s *ColdSetReader) ForEach(f func(cid.Cid) error) error { + hbuf := make([]byte, 256) + for { + next, err := readRawCid(s.buf, hbuf) + if err != nil { + if err == io.EOF { + return nil + } + + return xerrors.Errorf("error reading coldset: %w", err) + } + + if err := f(next); err != nil { + return err + } + } +} + +func (s *ColdSetReader) Reset() error { + _, err := s.file.Seek(0, io.SeekStart) + return err +} + +func (s *ColdSetReader) Close() error { + if s.file == nil { + return nil + } + + err := s.file.Close() + s.file = nil + s.buf = nil + + return err +} diff --git a/blockstore/splitstore/coldset_test.go b/blockstore/splitstore/coldset_test.go new file mode 100644 index 00000000000..60216ebd4e6 --- /dev/null +++ b/blockstore/splitstore/coldset_test.go @@ -0,0 +1,99 @@ +package splitstore + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" +) + +func TestColdSet(t *testing.T) { + dir, err := ioutil.TempDir("", "coldset.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(dir) + }) + + path := filepath.Join(dir, "coldset") + + makeCid := func(i int) cid.Cid { + h, err := multihash.Sum([]byte(fmt.Sprintf("cid.%d", i)), multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + + return cid.NewCidV1(cid.Raw, h) + } + + const count = 1000 + cids := make([]cid.Cid, 0, count) + for i := 0; i < count; i++ { + cids = append(cids, makeCid(i)) + } + + cw, err := NewColdSetWriter(path) + if err != nil { + t.Fatal(err) + } + + for _, c := range cids { + if err := cw.Write(c); err != nil { + t.Fatal(err) + } + } + + if err := cw.Close(); err != nil { + t.Fatal(err) + } + + cr, err := NewColdSetReader(path) + if err != nil { + t.Fatal(err) + } + + index := 0 + err = cr.ForEach(func(c cid.Cid) error { + if index >= count { + t.Fatal("too many cids") + } + + if !c.Equals(cids[index]) { + t.Fatalf("wrong cid %d; expected %s but got %s", index, cids[index], c) + } + + index++ + return nil + }) + if err != nil { + t.Fatal(err) + } + + if err := cr.Reset(); err != nil { + t.Fatal(err) + } + + index = 0 + err = cr.ForEach(func(c cid.Cid) error { + if index >= count { + t.Fatal("too many cids") + } + + if !c.Equals(cids[index]) { + t.Fatalf("wrong cid; expected %s but got %s", cids[index], c) + } + + index++ + return nil + }) + if err != nil { + t.Fatal(err) + } + +} diff --git a/blockstore/splitstore/markset.go b/blockstore/splitstore/markset.go index f173be575b4..e6749453887 100644 --- a/blockstore/splitstore/markset.go +++ b/blockstore/splitstore/markset.go @@ -14,15 +14,24 @@ var errMarkSetClosed = errors.New("markset closed") type MarkSet interface { ObjectVisitor Mark(cid.Cid) error + MarkMany([]cid.Cid) error Has(cid.Cid) (bool, error) Close() error + + // BeginCriticalSection ensures that the markset is persisted to disk for recovery in case + // of abnormal termination during the critical section span. + BeginCriticalSection() error + // EndCriticalSection ends the critical section span. + EndCriticalSection() } type MarkSetEnv interface { - // Create creates a new markset within the environment. - // name is a unique name for this markset, mapped to the filesystem in disk-backed environments + // New creates a new markset within the environment. + // name is a unique name for this markset, mapped to the filesystem for on-disk persistence. // sizeHint is a hint about the expected size of the markset - Create(name string, sizeHint int64) (MarkSet, error) + New(name string, sizeHint int64) (MarkSet, error) + // Recover recovers an existing markset persisted on-disk. + Recover(name string) (MarkSet, error) // Close closes the markset Close() error } @@ -30,7 +39,7 @@ type MarkSetEnv interface { func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) { switch mtype { case "map": - return NewMapMarkSetEnv() + return NewMapMarkSetEnv(path) case "badger": return NewBadgerMarkSetEnv(path) default: diff --git a/blockstore/splitstore/markset_badger.go b/blockstore/splitstore/markset_badger.go index e30334b8912..659d3b5ddc7 100644 --- a/blockstore/splitstore/markset_badger.go +++ b/blockstore/splitstore/markset_badger.go @@ -3,6 +3,7 @@ package splitstore import ( "os" "path/filepath" + "runtime" "sync" "golang.org/x/xerrors" @@ -28,6 +29,7 @@ type BadgerMarkSet struct { writers int seqno int version int + persist bool db *badger.DB path string @@ -47,11 +49,10 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) { return &BadgerMarkSetEnv{path: msPath}, nil } -func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { - name += ".tmp" +func (e *BadgerMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) { path := filepath.Join(e.path, name) - db, err := openTransientBadgerDB(path) + db, err := openBadgerDB(path, false) if err != nil { return nil, xerrors.Errorf("error creating badger db: %w", err) } @@ -67,8 +68,72 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) return ms, nil } +func (e *BadgerMarkSetEnv) Recover(name string) (MarkSet, error) { + path := filepath.Join(e.path, name) + + if _, err := os.Stat(path); err != nil { + return nil, xerrors.Errorf("error stating badger db path: %w", err) + } + + db, err := openBadgerDB(path, true) + if err != nil { + return nil, xerrors.Errorf("error creating badger db: %w", err) + } + + ms := &BadgerMarkSet{ + pend: make(map[string]struct{}), + writing: make(map[int]map[string]struct{}), + db: db, + path: path, + persist: true, + } + ms.cond.L = &ms.mx + + return ms, nil +} + func (e *BadgerMarkSetEnv) Close() error { - return os.RemoveAll(e.path) + return nil +} + +func (s *BadgerMarkSet) BeginCriticalSection() error { + s.mx.Lock() + + if s.persist { + s.mx.Unlock() + return nil + } + + var write bool + var seqno int + if len(s.pend) > 0 { + write = true + seqno = s.nextBatch() + } + + s.persist = true + s.mx.Unlock() + + if write { + // all writes sync once perist is true + return s.write(seqno) + } + + // wait for any pending writes and sync + s.mx.Lock() + for s.writers > 0 { + s.cond.Wait() + } + s.mx.Unlock() + + return s.db.Sync() +} + +func (s *BadgerMarkSet) EndCriticalSection() { + s.mx.Lock() + defer s.mx.Unlock() + + s.persist = false } func (s *BadgerMarkSet) Mark(c cid.Cid) error { @@ -88,6 +153,23 @@ func (s *BadgerMarkSet) Mark(c cid.Cid) error { return nil } +func (s *BadgerMarkSet) MarkMany(batch []cid.Cid) error { + s.mx.Lock() + if s.pend == nil { + s.mx.Unlock() + return errMarkSetClosed + } + + write, seqno := s.putMany(batch) + s.mx.Unlock() + + if write { + return s.write(seqno) + } + + return nil +} + func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) { s.mx.RLock() defer s.mx.RUnlock() @@ -193,16 +275,34 @@ func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) { // writer holds the exclusive lock func (s *BadgerMarkSet) put(key string) (write bool, seqno int) { s.pend[key] = struct{}{} - if len(s.pend) < badgerMarkSetBatchSize { + if !s.persist && len(s.pend) < badgerMarkSetBatchSize { return false, 0 } - seqno = s.seqno + seqno = s.nextBatch() + return true, seqno +} + +func (s *BadgerMarkSet) putMany(batch []cid.Cid) (write bool, seqno int) { + for _, c := range batch { + key := string(c.Hash()) + s.pend[key] = struct{}{} + } + + if !s.persist && len(s.pend) < badgerMarkSetBatchSize { + return false, 0 + } + + seqno = s.nextBatch() + return true, seqno +} + +func (s *BadgerMarkSet) nextBatch() int { + seqno := s.seqno s.seqno++ s.writing[seqno] = s.pend s.pend = make(map[string]struct{}) - - return true, seqno + return seqno } func (s *BadgerMarkSet) write(seqno int) (err error) { @@ -247,6 +347,14 @@ func (s *BadgerMarkSet) write(seqno int) (err error) { return xerrors.Errorf("error flushing batch to badger markset: %w", err) } + s.mx.RLock() + persist := s.persist + s.mx.RUnlock() + + if persist { + return s.db.Sync() + } + return nil } @@ -266,26 +374,29 @@ func (s *BadgerMarkSet) Close() error { db := s.db s.db = nil - return closeTransientBadgerDB(db, s.path) + return closeBadgerDB(db, s.path, s.persist) } -func (s *BadgerMarkSet) SetConcurrent() {} - -func openTransientBadgerDB(path string) (*badger.DB, error) { - // clean up first - err := os.RemoveAll(path) - if err != nil { - return nil, xerrors.Errorf("error clearing markset directory: %w", err) - } +func openBadgerDB(path string, recover bool) (*badger.DB, error) { + // if it is not a recovery, clean up first + if !recover { + err := os.RemoveAll(path) + if err != nil { + return nil, xerrors.Errorf("error clearing markset directory: %w", err) + } - err = os.MkdirAll(path, 0755) //nolint:gosec - if err != nil { - return nil, xerrors.Errorf("error creating markset directory: %w", err) + err = os.MkdirAll(path, 0755) //nolint:gosec + if err != nil { + return nil, xerrors.Errorf("error creating markset directory: %w", err) + } } opts := badger.DefaultOptions(path) + // we manually sync when we are in critical section opts.SyncWrites = false + // no need to do that opts.CompactL0OnClose = false + // we store hashes, not much to gain by compression opts.Compression = options.None // Note: We use FileIO for loading modes to avoid memory thrashing and interference // between the system blockstore and the markset. @@ -294,6 +405,15 @@ func openTransientBadgerDB(path string) (*badger.DB, error) { // exceeded 1GB in size. opts.TableLoadingMode = options.FileIO opts.ValueLogLoadingMode = options.FileIO + // We increase the number of L0 tables before compaction to make it unlikely to + // be necessary. + opts.NumLevelZeroTables = 20 // default is 5 + opts.NumLevelZeroTablesStall = 30 // default is 10 + // increase the number of compactors from default 2 so that if we ever have to + // compact, it is fast + if runtime.NumCPU()/2 > opts.NumCompactors { + opts.NumCompactors = runtime.NumCPU() / 2 + } opts.Logger = &badgerLogger{ SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), @@ -302,12 +422,16 @@ func openTransientBadgerDB(path string) (*badger.DB, error) { return badger.Open(opts) } -func closeTransientBadgerDB(db *badger.DB, path string) error { +func closeBadgerDB(db *badger.DB, path string, persist bool) error { err := db.Close() if err != nil { return xerrors.Errorf("error closing badger markset: %w", err) } + if persist { + return nil + } + err = os.RemoveAll(path) if err != nil { return xerrors.Errorf("error deleting badger markset: %w", err) diff --git a/blockstore/splitstore/markset_map.go b/blockstore/splitstore/markset_map.go index fda964663ef..8216bcd812f 100644 --- a/blockstore/splitstore/markset_map.go +++ b/blockstore/splitstore/markset_map.go @@ -1,37 +1,104 @@ package splitstore import ( + "bufio" + "io" + "os" + "path/filepath" "sync" + "golang.org/x/xerrors" + cid "github.com/ipfs/go-cid" ) -type MapMarkSetEnv struct{} +type MapMarkSetEnv struct { + path string +} var _ MarkSetEnv = (*MapMarkSetEnv)(nil) type MapMarkSet struct { mx sync.RWMutex set map[string]struct{} + + persist bool + file *os.File + buf *bufio.Writer + + path string } var _ MarkSet = (*MapMarkSet)(nil) -func NewMapMarkSetEnv() (*MapMarkSetEnv, error) { - return &MapMarkSetEnv{}, nil +func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) { + msPath := filepath.Join(path, "markset.map") + err := os.MkdirAll(msPath, 0755) //nolint:gosec + if err != nil { + return nil, xerrors.Errorf("error creating markset directory: %w", err) + } + + return &MapMarkSetEnv{path: msPath}, nil } -func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { +func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) { + path := filepath.Join(e.path, name) return &MapMarkSet{ - set: make(map[string]struct{}, sizeHint), + set: make(map[string]struct{}, sizeHint), + path: path, }, nil } +func (e *MapMarkSetEnv) Recover(name string) (MarkSet, error) { + path := filepath.Join(e.path, name) + s := &MapMarkSet{ + set: make(map[string]struct{}), + path: path, + } + + in, err := os.Open(path) + if err != nil { + return nil, xerrors.Errorf("error opening markset file for read: %w", err) + } + defer in.Close() //nolint:errcheck + + // wrap a buffered reader to make this faster + buf := bufio.NewReader(in) + for { + var sz byte + if sz, err = buf.ReadByte(); err != nil { + break + } + + key := make([]byte, int(sz)) + if _, err = io.ReadFull(buf, key); err != nil { + break + } + + s.set[string(key)] = struct{}{} + } + + if err != io.EOF { + return nil, xerrors.Errorf("error reading markset file: %w", err) + } + + file, err := os.OpenFile(s.path, os.O_WRONLY|os.O_APPEND, 0) + if err != nil { + return nil, xerrors.Errorf("error opening markset file for write: %w", err) + } + + s.persist = true + s.file = file + s.buf = bufio.NewWriter(file) + + return s, nil +} + func (e *MapMarkSetEnv) Close() error { return nil } -func (s *MapMarkSet) Mark(cid cid.Cid) error { +func (s *MapMarkSet) BeginCriticalSection() error { s.mx.Lock() defer s.mx.Unlock() @@ -39,7 +106,104 @@ func (s *MapMarkSet) Mark(cid cid.Cid) error { return errMarkSetClosed } - s.set[string(cid.Hash())] = struct{}{} + if s.persist { + return nil + } + + file, err := os.OpenFile(s.path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + return xerrors.Errorf("error opening markset file: %w", err) + } + + // wrap a buffered writer to make this faster + s.buf = bufio.NewWriter(file) + for key := range s.set { + if err := s.writeKey([]byte(key), false); err != nil { + _ = file.Close() + s.buf = nil + return err + } + } + if err := s.buf.Flush(); err != nil { + _ = file.Close() + s.buf = nil + return xerrors.Errorf("error flushing markset file buffer: %w", err) + } + + s.file = file + s.persist = true + + return nil +} + +func (s *MapMarkSet) EndCriticalSection() { + s.mx.Lock() + defer s.mx.Unlock() + + if !s.persist { + return + } + + _ = s.file.Close() + _ = os.Remove(s.path) + s.file = nil + s.buf = nil + s.persist = false +} + +func (s *MapMarkSet) Mark(c cid.Cid) error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.set == nil { + return errMarkSetClosed + } + + hash := c.Hash() + s.set[string(hash)] = struct{}{} + + if s.persist { + if err := s.writeKey(hash, true); err != nil { + return err + } + + if err := s.file.Sync(); err != nil { + return xerrors.Errorf("error syncing markset: %w", err) + } + } + + return nil +} + +func (s *MapMarkSet) MarkMany(batch []cid.Cid) error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.set == nil { + return errMarkSetClosed + } + + for _, c := range batch { + hash := c.Hash() + s.set[string(hash)] = struct{}{} + + if s.persist { + if err := s.writeKey(hash, false); err != nil { + return err + } + } + } + + if s.persist { + if err := s.buf.Flush(); err != nil { + return xerrors.Errorf("error flushing markset buffer to disk: %w", err) + } + + if err := s.file.Sync(); err != nil { + return xerrors.Errorf("error syncing markset: %w", err) + } + } + return nil } @@ -63,12 +227,23 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) { return false, errMarkSetClosed } - key := string(c.Hash()) + hash := c.Hash() + key := string(hash) if _, ok := s.set[key]; ok { return false, nil } s.set[key] = struct{}{} + + if s.persist { + if err := s.writeKey(hash, true); err != nil { + return false, err + } + if err := s.file.Sync(); err != nil { + return false, xerrors.Errorf("error syncing markset: %w", err) + } + } + return true, nil } @@ -76,6 +251,39 @@ func (s *MapMarkSet) Close() error { s.mx.Lock() defer s.mx.Unlock() + if s.set == nil { + return nil + } + s.set = nil + + if s.file != nil { + if err := s.file.Close(); err != nil { + log.Warnf("error closing markset file: %s", err) + } + + if !s.persist { + if err := os.Remove(s.path); err != nil { + log.Warnf("error removing markset file: %s", err) + } + } + } + + return nil +} + +func (s *MapMarkSet) writeKey(k []byte, flush bool) error { + if err := s.buf.WriteByte(byte(len(k))); err != nil { + return xerrors.Errorf("error writing markset key length to disk: %w", err) + } + if _, err := s.buf.Write(k); err != nil { + return xerrors.Errorf("error writing markset key to disk: %w", err) + } + if flush { + if err := s.buf.Flush(); err != nil { + return xerrors.Errorf("error flushing markset buffer to disk: %w", err) + } + } + return nil } diff --git a/blockstore/splitstore/markset_test.go b/blockstore/splitstore/markset_test.go index de9421f0894..b4b87160215 100644 --- a/blockstore/splitstore/markset_test.go +++ b/blockstore/splitstore/markset_test.go @@ -11,7 +11,10 @@ import ( func TestMapMarkSet(t *testing.T) { testMarkSet(t, "map") + testMarkSetRecovery(t, "map") + testMarkSetMarkMany(t, "map") testMarkSetVisitor(t, "map") + testMarkSetVisitorRecovery(t, "map") } func TestBadgerMarkSet(t *testing.T) { @@ -21,12 +24,13 @@ func TestBadgerMarkSet(t *testing.T) { badgerMarkSetBatchSize = bs }) testMarkSet(t, "badger") + testMarkSetRecovery(t, "badger") + testMarkSetMarkMany(t, "badger") testMarkSetVisitor(t, "badger") + testMarkSetVisitorRecovery(t, "badger") } func testMarkSet(t *testing.T, lsType string) { - t.Helper() - path, err := ioutil.TempDir("", "markset.*") if err != nil { t.Fatal(err) @@ -42,12 +46,12 @@ func testMarkSet(t *testing.T, lsType string) { } defer env.Close() //nolint:errcheck - hotSet, err := env.Create("hot", 0) + hotSet, err := env.New("hot", 0) if err != nil { t.Fatal(err) } - coldSet, err := env.Create("cold", 0) + coldSet, err := env.New("cold", 0) if err != nil { t.Fatal(err) } @@ -62,6 +66,7 @@ func testMarkSet(t *testing.T, lsType string) { } mustHave := func(s MarkSet, cid cid.Cid) { + t.Helper() has, err := s.Has(cid) if err != nil { t.Fatal(err) @@ -73,6 +78,7 @@ func testMarkSet(t *testing.T, lsType string) { } mustNotHave := func(s MarkSet, cid cid.Cid) { + t.Helper() has, err := s.Has(cid) if err != nil { t.Fatal(err) @@ -114,12 +120,12 @@ func testMarkSet(t *testing.T, lsType string) { t.Fatal(err) } - hotSet, err = env.Create("hot", 0) + hotSet, err = env.New("hot", 0) if err != nil { t.Fatal(err) } - coldSet, err = env.Create("cold", 0) + coldSet, err = env.New("cold", 0) if err != nil { t.Fatal(err) } @@ -150,8 +156,75 @@ func testMarkSet(t *testing.T, lsType string) { } func testMarkSetVisitor(t *testing.T, lsType string) { - t.Helper() + path, err := ioutil.TempDir("", "markset.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + env, err := OpenMarkSetEnv(path, lsType) + if err != nil { + t.Fatal(err) + } + defer env.Close() //nolint:errcheck + + visitor, err := env.New("test", 0) + if err != nil { + t.Fatal(err) + } + defer visitor.Close() //nolint:errcheck + + makeCid := func(key string) cid.Cid { + h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + + return cid.NewCidV1(cid.Raw, h) + } + + mustVisit := func(v ObjectVisitor, cid cid.Cid) { + visit, err := v.Visit(cid) + if err != nil { + t.Fatal(err) + } + + if !visit { + t.Fatal("object should be visited") + } + } + + mustNotVisit := func(v ObjectVisitor, cid cid.Cid) { + visit, err := v.Visit(cid) + if err != nil { + t.Fatal(err) + } + + if visit { + t.Fatal("unexpected visit") + } + } + + k1 := makeCid("a") + k2 := makeCid("b") + k3 := makeCid("c") + k4 := makeCid("d") + + mustVisit(visitor, k1) + mustVisit(visitor, k2) + mustVisit(visitor, k3) + mustVisit(visitor, k4) + + mustNotVisit(visitor, k1) + mustNotVisit(visitor, k2) + mustNotVisit(visitor, k3) + mustNotVisit(visitor, k4) +} +func testMarkSetVisitorRecovery(t *testing.T, lsType string) { path, err := ioutil.TempDir("", "markset.*") if err != nil { t.Fatal(err) @@ -167,7 +240,7 @@ func testMarkSetVisitor(t *testing.T, lsType string) { } defer env.Close() //nolint:errcheck - visitor, err := env.Create("test", 0) + visitor, err := env.New("test", 0) if err != nil { t.Fatal(err) } @@ -211,6 +284,11 @@ func testMarkSetVisitor(t *testing.T, lsType string) { mustVisit(visitor, k1) mustVisit(visitor, k2) + + if err := visitor.BeginCriticalSection(); err != nil { + t.Fatal(err) + } + mustVisit(visitor, k3) mustVisit(visitor, k4) @@ -218,4 +296,249 @@ func testMarkSetVisitor(t *testing.T, lsType string) { mustNotVisit(visitor, k2) mustNotVisit(visitor, k3) mustNotVisit(visitor, k4) + + if err := visitor.Close(); err != nil { + t.Fatal(err) + } + + visitor, err = env.Recover("test") + if err != nil { + t.Fatal(err) + } + + mustNotVisit(visitor, k1) + mustNotVisit(visitor, k2) + mustNotVisit(visitor, k3) + mustNotVisit(visitor, k4) + + visitor.EndCriticalSection() + + if err := visitor.Close(); err != nil { + t.Fatal(err) + } + + _, err = env.Recover("test") + if err == nil { + t.Fatal("expected recovery to fail") + } +} + +func testMarkSetRecovery(t *testing.T, lsType string) { + path, err := ioutil.TempDir("", "markset.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + env, err := OpenMarkSetEnv(path, lsType) + if err != nil { + t.Fatal(err) + } + defer env.Close() //nolint:errcheck + + markSet, err := env.New("test", 0) + if err != nil { + t.Fatal(err) + } + + makeCid := func(key string) cid.Cid { + h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + + return cid.NewCidV1(cid.Raw, h) + } + + mustHave := func(s MarkSet, cid cid.Cid) { + t.Helper() + has, err := s.Has(cid) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatal("mark not found") + } + } + + mustNotHave := func(s MarkSet, cid cid.Cid) { + t.Helper() + has, err := s.Has(cid) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("unexpected mark") + } + } + + k1 := makeCid("a") + k2 := makeCid("b") + k3 := makeCid("c") + k4 := makeCid("d") + + if err := markSet.Mark(k1); err != nil { + t.Fatal(err) + } + if err := markSet.Mark(k2); err != nil { + t.Fatal(err) + } + + mustHave(markSet, k1) + mustHave(markSet, k2) + mustNotHave(markSet, k3) + mustNotHave(markSet, k4) + + if err := markSet.BeginCriticalSection(); err != nil { + t.Fatal(err) + } + + if err := markSet.Mark(k3); err != nil { + t.Fatal(err) + } + if err := markSet.Mark(k4); err != nil { + t.Fatal(err) + } + + mustHave(markSet, k1) + mustHave(markSet, k2) + mustHave(markSet, k3) + mustHave(markSet, k4) + + if err := markSet.Close(); err != nil { + t.Fatal(err) + } + + markSet, err = env.Recover("test") + if err != nil { + t.Fatal(err) + } + + mustHave(markSet, k1) + mustHave(markSet, k2) + mustHave(markSet, k3) + mustHave(markSet, k4) + + markSet.EndCriticalSection() + + if err := markSet.Close(); err != nil { + t.Fatal(err) + } + + _, err = env.Recover("test") + if err == nil { + t.Fatal("expected recovery to fail") + } +} + +func testMarkSetMarkMany(t *testing.T, lsType string) { + path, err := ioutil.TempDir("", "markset.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + env, err := OpenMarkSetEnv(path, lsType) + if err != nil { + t.Fatal(err) + } + defer env.Close() //nolint:errcheck + + markSet, err := env.New("test", 0) + if err != nil { + t.Fatal(err) + } + + makeCid := func(key string) cid.Cid { + h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + + return cid.NewCidV1(cid.Raw, h) + } + + mustHave := func(s MarkSet, cid cid.Cid) { + t.Helper() + has, err := s.Has(cid) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatal("mark not found") + } + } + + mustNotHave := func(s MarkSet, cid cid.Cid) { + t.Helper() + has, err := s.Has(cid) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("unexpected mark") + } + } + + k1 := makeCid("a") + k2 := makeCid("b") + k3 := makeCid("c") + k4 := makeCid("d") + + if err := markSet.MarkMany([]cid.Cid{k1, k2}); err != nil { + t.Fatal(err) + } + + mustHave(markSet, k1) + mustHave(markSet, k2) + mustNotHave(markSet, k3) + mustNotHave(markSet, k4) + + if err := markSet.BeginCriticalSection(); err != nil { + t.Fatal(err) + } + + if err := markSet.MarkMany([]cid.Cid{k3, k4}); err != nil { + t.Fatal(err) + } + + mustHave(markSet, k1) + mustHave(markSet, k2) + mustHave(markSet, k3) + mustHave(markSet, k4) + + if err := markSet.Close(); err != nil { + t.Fatal(err) + } + + markSet, err = env.Recover("test") + if err != nil { + t.Fatal(err) + } + + mustHave(markSet, k1) + mustHave(markSet, k2) + mustHave(markSet, k3) + mustHave(markSet, k4) + + markSet.EndCriticalSection() + + if err := markSet.Close(); err != nil { + t.Fatal(err) + } + + _, err = env.Recover("test") + if err == nil { + t.Fatal("expected recovery to fail") + } } diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 62cb2459e54..6a65e01df36 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -129,8 +129,6 @@ type SplitStore struct { headChangeMx sync.Mutex - coldPurgeSize int - chain ChainAccessor ds dstore.Datastore cold bstore.Blockstore @@ -158,6 +156,10 @@ type SplitStore struct { txnRefsMx sync.Mutex txnRefs map[cid.Cid]struct{} txnMissing map[cid.Cid]struct{} + txnMarkSet MarkSet + txnSyncMx sync.Mutex + txnSyncCond sync.Cond + txnSync bool // registered protectors protectors []func(func(cid.Cid) error) error @@ -194,11 +196,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co cold: cold, hot: hots, markSetEnv: markSetEnv, - - coldPurgeSize: defaultColdPurgeSize, } ss.txnViewsCond.L = &ss.txnViewsMx + ss.txnSyncCond.L = &ss.txnSyncMx ss.ctx, ss.cancel = context.WithCancel(context.Background()) if enableDebugLog { @@ -208,6 +209,14 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co } } + if ss.checkpointExists() { + log.Info("found compaction checkpoint; resuming compaction") + if err := ss.completeCompaction(); err != nil { + markSetEnv.Close() //nolint:errcheck + return nil, xerrors.Errorf("error resuming compaction: %w", err) + } + } + return ss, nil } @@ -230,6 +239,20 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) { s.txnLk.RLock() defer s.txnLk.RUnlock() + // critical section + if s.txnMarkSet != nil { + has, err := s.txnMarkSet.Has(cid) + if err != nil { + return false, err + } + + if has { + return s.has(cid) + } + + return s.cold.Has(ctx, cid) + } + has, err := s.hot.Has(ctx, cid) if err != nil { @@ -257,6 +280,20 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) s.txnLk.RLock() defer s.txnLk.RUnlock() + // critical section + if s.txnMarkSet != nil { + has, err := s.txnMarkSet.Has(cid) + if err != nil { + return nil, err + } + + if has { + return s.get(cid) + } + + return s.cold.Get(ctx, cid) + } + blk, err := s.hot.Get(ctx, cid) switch err { @@ -294,6 +331,20 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { s.txnLk.RLock() defer s.txnLk.RUnlock() + // critical section + if s.txnMarkSet != nil { + has, err := s.txnMarkSet.Has(cid) + if err != nil { + return 0, err + } + + if has { + return s.getSize(cid) + } + + return s.cold.GetSize(ctx, cid) + } + size, err := s.hot.GetSize(ctx, cid) switch err { @@ -332,6 +383,12 @@ func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error { s.debug.LogWrite(blk) + // critical section + if s.txnMarkSet != nil { + s.markLiveRefs([]cid.Cid{blk.Cid()}) + return nil + } + s.trackTxnRef(blk.Cid()) return nil } @@ -377,6 +434,12 @@ func (s *SplitStore) PutMany(ctx context.Context, blks []blocks.Block) error { s.debug.LogWriteMany(blks) + // critical section + if s.txnMarkSet != nil { + s.markLiveRefs(batch) + return nil + } + s.trackTxnRefMany(batch) return nil } @@ -436,6 +499,23 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro return cb(data) } + // critical section + s.txnLk.RLock() // the lock is released in protectView if we are not in critical section + if s.txnMarkSet != nil { + has, err := s.txnMarkSet.Has(cid) + s.txnLk.RUnlock() + + if err != nil { + return err + } + + if has { + return s.view(cid, cb) + } + + return s.cold.View(ctx, cid, cb) + } + // views are (optimistically) protected two-fold: // - if there is an active transaction, then the reference is protected. // - if there is no active transaction, active views are tracked in a @@ -585,6 +665,11 @@ func (s *SplitStore) Close() error { } if atomic.LoadInt32(&s.compacting) == 1 { + s.txnSyncMx.Lock() + s.txnSync = true + s.txnSyncCond.Broadcast() + s.txnSyncMx.Unlock() + log.Warn("close with ongoing compaction in progress; waiting for it to finish...") for atomic.LoadInt32(&s.compacting) == 1 { time.Sleep(time.Second) diff --git a/blockstore/splitstore/splitstore_check.go b/blockstore/splitstore/splitstore_check.go index 0b4cfe04472..d7c9b2ef97b 100644 --- a/blockstore/splitstore/splitstore_check.go +++ b/blockstore/splitstore/splitstore_check.go @@ -89,7 +89,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { coldCnt := new(int64) missingCnt := new(int64) - visitor, err := s.markSetEnv.Create("check", 0) + visitor, err := s.markSetEnv.New("check", 0) if err != nil { return xerrors.Errorf("error creating visitor: %w", err) } diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 20f99af35c0..ae123abc9c3 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -3,8 +3,9 @@ package splitstore import ( "bytes" "errors" + "os" + "path/filepath" "runtime" - "sort" "sync" "sync/atomic" "time" @@ -48,6 +49,10 @@ var ( // SyncGapTime is the time delay from a tipset's min timestamp before we decide // there is a sync gap SyncGapTime = time.Minute + + // SyncWaitTime is the time delay from a tipset's min timestamp before we decide + // we have synced. + SyncWaitTime = 30 * time.Second ) var ( @@ -57,8 +62,6 @@ var ( const ( batchSize = 16384 - - defaultColdPurgeSize = 7_000_000 ) func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { @@ -141,9 +144,9 @@ func (s *SplitStore) isNearUpgrade(epoch abi.ChainEpoch) bool { // transactionally protect incoming tipsets func (s *SplitStore) protectTipSets(apply []*types.TipSet) { s.txnLk.RLock() - defer s.txnLk.RUnlock() if !s.txnActive { + s.txnLk.RUnlock() return } @@ -152,12 +155,115 @@ func (s *SplitStore) protectTipSets(apply []*types.TipSet) { cids = append(cids, ts.Cids()...) } + if len(cids) == 0 { + s.txnLk.RUnlock() + return + } + + // critical section + if s.txnMarkSet != nil { + curTs := apply[len(apply)-1] + timestamp := time.Unix(int64(curTs.MinTimestamp()), 0) + doSync := time.Since(timestamp) < SyncWaitTime + go func() { + if doSync { + defer func() { + s.txnSyncMx.Lock() + defer s.txnSyncMx.Unlock() + s.txnSync = true + s.txnSyncCond.Broadcast() + }() + } + defer s.txnLk.RUnlock() + s.markLiveRefs(cids) + + }() + return + } + s.trackTxnRefMany(cids) + s.txnLk.RUnlock() +} + +func (s *SplitStore) markLiveRefs(cids []cid.Cid) { + log.Debugf("marking %d live refs", len(cids)) + startMark := time.Now() + + count := new(int32) + visitor := newConcurrentVisitor() + walkObject := func(c cid.Cid) error { + return s.walkObjectIncomplete(c, visitor, + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + visit, err := s.txnMarkSet.Visit(c) + if err != nil { + return xerrors.Errorf("error visiting object: %w", err) + } + + if !visit { + return errStopWalk + } + + atomic.AddInt32(count, 1) + return nil + }, + func(missing cid.Cid) error { + log.Warnf("missing object reference %s in %s", missing, c) + return errStopWalk + }) + } + + // optimize the common case of single put + if len(cids) == 1 { + if err := walkObject(cids[0]); err != nil { + log.Errorf("error marking tipset refs: %s", err) + } + log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count) + return + } + + workch := make(chan cid.Cid, len(cids)) + for _, c := range cids { + workch <- c + } + close(workch) + + worker := func() error { + for c := range workch { + if err := walkObject(c); err != nil { + return err + } + } + + return nil + } + + workers := runtime.NumCPU() / 2 + if workers < 2 { + workers = 2 + } + if workers > len(cids) { + workers = len(cids) + } + + g := new(errgroup.Group) + for i := 0; i < workers; i++ { + g.Go(worker) + } + + if err := g.Wait(); err != nil { + log.Errorf("error marking tipset refs: %s", err) + } + + log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count) } // transactionally protect a view func (s *SplitStore) protectView(c cid.Cid) { - s.txnLk.RLock() + // the txnLk is held for read defer s.txnLk.RUnlock() if s.txnActive { @@ -387,6 +493,12 @@ func (s *SplitStore) compact(curTs *types.TipSet) { } func (s *SplitStore) doCompact(curTs *types.TipSet) error { + if s.checkpointExists() { + // this really shouldn't happen, but if it somehow does, it means that the hotstore + // might be potentially inconsistent; abort compaction and notify the user to intervene. + return xerrors.Errorf("checkpoint exists; aborting compaction") + } + currentEpoch := curTs.Height() boundaryEpoch := currentEpoch - CompactionBoundary @@ -398,7 +510,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex) - markSet, err := s.markSetEnv.Create("live", s.markSetSize) + markSet, err := s.markSetEnv.New("live", s.markSetSize) if err != nil { return xerrors.Errorf("error creating mark set: %w", err) } @@ -409,9 +521,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return err } - // we are ready for concurrent marking - s.beginTxnMarking(markSet) - // 0. track all protected references at beginning of compaction; anything added later should // be transactionally protected by the write log.Info("protecting references with registered protectors") @@ -425,7 +534,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Info("marking reachable objects") startMark := time.Now() - var count int64 + count := new(int64) err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, func(c cid.Cid) error { if isUnitaryObject(c) { @@ -441,7 +550,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return errStopWalk } - count++ + atomic.AddInt64(count, 1) return nil }) @@ -449,9 +558,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("error marking: %w", err) } - s.markSetSize = count + count>>2 // overestimate a bit + s.markSetSize = *count + *count>>2 // overestimate a bit - log.Infow("marking done", "took", time.Since(startMark), "marked", count) + log.Infow("marking done", "took", time.Since(startMark), "marked", *count) if err := s.checkClosing(); err != nil { return err @@ -471,10 +580,15 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Info("collecting cold objects") startCollect := time.Now() + coldw, err := NewColdSetWriter(s.coldSetPath()) + if err != nil { + return xerrors.Errorf("error creating coldset: %w", err) + } + defer coldw.Close() //nolint:errcheck + // some stats for logging var hotCnt, coldCnt int - cold := make([]cid.Cid, 0, s.coldPurgeSize) err = s.hot.ForEachKey(func(c cid.Cid) error { // was it marked? mark, err := markSet.Has(c) @@ -488,7 +602,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } // it's cold, mark it as candidate for move - cold = append(cold, c) + if err := coldw.Write(c); err != nil { + return xerrors.Errorf("error writing cid to coldstore: %w", err) + } coldCnt++ return nil @@ -498,12 +614,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("error collecting cold objects: %w", err) } - log.Infow("cold collection done", "took", time.Since(startCollect)) - - if coldCnt > 0 { - s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit + if err := coldw.Close(); err != nil { + return xerrors.Errorf("error closing coldset: %w", err) } + log.Infow("cold collection done", "took", time.Since(startCollect)) + log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt) stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt))) stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt))) @@ -521,11 +637,17 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return err } + coldr, err := NewColdSetReader(s.coldSetPath()) + if err != nil { + return xerrors.Errorf("error opening coldset: %w", err) + } + defer coldr.Close() //nolint:errcheck + // 3. copy the cold objects to the coldstore -- if we have one if !s.cfg.DiscardColdBlocks { log.Info("moving cold objects to the coldstore") startMove := time.Now() - err = s.moveColdBlocks(cold) + err = s.moveColdBlocks(coldr) if err != nil { return xerrors.Errorf("error moving cold objects: %w", err) } @@ -534,41 +656,64 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { if err := s.checkClosing(); err != nil { return err } + + if err := coldr.Reset(); err != nil { + return xerrors.Errorf("error resetting coldset: %w", err) + } } - // 4. sort cold objects so that the dags with most references are deleted first - // this ensures that we can't refer to a dag with its consituents already deleted, ie - // we lave no dangling references. - log.Info("sorting cold objects") - startSort := time.Now() - err = s.sortObjects(cold) - if err != nil { - return xerrors.Errorf("error sorting objects: %w", err) + // 4. Purge cold objects with checkpointing for recovery. + // This is the critical section of compaction, whereby any cold object not in the markSet is + // considered already deleted. + // We delete cold objects in batches, holding the transaction lock, where we check the markSet + // again for new references created by the VM. + // After each batch, we write a checkpoint to disk; if the process is interrupted before completion, + // the process will continue from the checkpoint in the next recovery. + if err := s.beginCriticalSection(markSet); err != nil { + return xerrors.Errorf("error beginning critical section: %w", err) } - log.Infow("sorting done", "took", time.Since(startSort)) - // 4.1 protect transactional refs once more - // strictly speaking, this is not necessary as purge will do it before deleting each - // batch. however, there is likely a largish number of references accumulated during - // ths sort and this protects before entering pruge context. - err = s.protectTxnRefs(markSet) - if err != nil { - return xerrors.Errorf("error protecting transactional refs: %w", err) + if err := s.checkClosing(); err != nil { + return err } + // wait for the head to catch up so that the current tipset is marked + s.waitForSync() + if err := s.checkClosing(); err != nil { return err } + checkpoint, err := NewCheckpoint(s.checkpointPath()) + if err != nil { + return xerrors.Errorf("error creating checkpoint: %w", err) + } + defer checkpoint.Close() //nolint:errcheck + // 5. purge cold objects from the hotstore, taking protected references into account log.Info("purging cold objects from the hotstore") startPurge := time.Now() - err = s.purge(cold, markSet) + err = s.purge(coldr, checkpoint, markSet) if err != nil { - return xerrors.Errorf("error purging cold blocks: %w", err) + return xerrors.Errorf("error purging cold objects: %w", err) } log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge)) + s.endCriticalSection() + + if err := checkpoint.Close(); err != nil { + log.Warnf("error closing checkpoint: %s", err) + } + if err := os.Remove(s.checkpointPath()); err != nil { + log.Warnf("error removing checkpoint: %s", err) + } + if err := coldr.Close(); err != nil { + log.Warnf("error closing coldset: %s", err) + } + if err := os.Remove(s.coldSetPath()); err != nil { + log.Warnf("error removing coldset: %s", err) + } + // we are done; do some housekeeping s.endTxnProtect() s.gcHotstore() @@ -599,12 +744,51 @@ func (s *SplitStore) beginTxnProtect() { defer s.txnLk.Unlock() s.txnActive = true + s.txnSync = false s.txnRefs = make(map[cid.Cid]struct{}) s.txnMissing = make(map[cid.Cid]struct{}) } -func (s *SplitStore) beginTxnMarking(markSet MarkSet) { - log.Info("beginning transactional marking") +func (s *SplitStore) beginCriticalSection(markSet MarkSet) error { + log.Info("beginning critical section") + + // do that once first to get the bulk before the markset is in critical section + if err := s.protectTxnRefs(markSet); err != nil { + return xerrors.Errorf("error protecting transactional references: %w", err) + } + + if err := markSet.BeginCriticalSection(); err != nil { + return xerrors.Errorf("error beginning critical section for markset: %w", err) + } + + s.txnLk.Lock() + defer s.txnLk.Unlock() + + s.txnMarkSet = markSet + + // and do it again while holding the lock to mark references that might have been created + // in the meantime and avoid races of the type Has->txnRef->enterCS->Get fails because + // it's not in the markset + if err := s.protectTxnRefs(markSet); err != nil { + return xerrors.Errorf("error protecting transactional references: %w", err) + } + + return nil +} + +func (s *SplitStore) waitForSync() { + log.Info("waiting for sync") + startWait := time.Now() + defer func() { + log.Infow("waiting for sync done", "took", time.Since(startWait)) + }() + + s.txnSyncMx.Lock() + defer s.txnSyncMx.Unlock() + + for !s.txnSync { + s.txnSyncCond.Wait() + } } func (s *SplitStore) endTxnProtect() { @@ -616,8 +800,20 @@ func (s *SplitStore) endTxnProtect() { } s.txnActive = false + s.txnSync = false s.txnRefs = nil s.txnMissing = nil + s.txnMarkSet = nil +} + +func (s *SplitStore) endCriticalSection() { + log.Info("ending critical section") + + s.txnLk.Lock() + defer s.txnLk.Unlock() + + s.txnMarkSet.EndCriticalSection() + s.txnMarkSet = nil } func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch, @@ -857,7 +1053,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, m return nil } -// internal version used by walk +// internal version used during compaction and related operations func (s *SplitStore) view(c cid.Cid, cb func([]byte) error) error { if isIdentiyCid(c) { data, err := decodeIdentityCid(c) @@ -892,10 +1088,34 @@ func (s *SplitStore) has(c cid.Cid) (bool, error) { return s.cold.Has(s.ctx, c) } -func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { +func (s *SplitStore) get(c cid.Cid) (blocks.Block, error) { + blk, err := s.hot.Get(s.ctx, c) + switch err { + case nil: + return blk, nil + case bstore.ErrNotFound: + return s.cold.Get(s.ctx, c) + default: + return nil, err + } +} + +func (s *SplitStore) getSize(c cid.Cid) (int, error) { + sz, err := s.hot.GetSize(s.ctx, c) + switch err { + case nil: + return sz, nil + case bstore.ErrNotFound: + return s.cold.GetSize(s.ctx, c) + default: + return 0, err + } +} + +func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error { batch := make([]blocks.Block, 0, batchSize) - for _, c := range cold { + err := coldr.ForEach(func(c cid.Cid) error { if err := s.checkClosing(); err != nil { return err } @@ -904,7 +1124,7 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { if err != nil { if err == bstore.ErrNotFound { log.Warnf("hotstore missing block %s", c) - continue + return nil } return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err) @@ -918,6 +1138,12 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { } batch = batch[:0] } + + return nil + }) + + if err != nil { + return xerrors.Errorf("error iterating coldset: %w", err) } if len(batch) > 0 { @@ -930,177 +1156,202 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { return nil } -// sorts a slice of objects heaviest first -- it's a little expensive but worth the -// guarantee that we don't leave dangling references behind, e.g. if we die in the middle -// of a purge. -func (s *SplitStore) sortObjects(cids []cid.Cid) error { - // we cache the keys to avoid making a gazillion of strings - keys := make(map[cid.Cid]string) - key := func(c cid.Cid) string { - s, ok := keys[c] - if !ok { - s = string(c.Hash()) - keys[c] = s - } - return s - } +func (s *SplitStore) purge(coldr *ColdSetReader, checkpoint *Checkpoint, markSet MarkSet) error { + batch := make([]cid.Cid, 0, batchSize) + deadCids := make([]cid.Cid, 0, batchSize) - // compute sorting weights as the cumulative number of DAG links - weights := make(map[string]int) - for _, c := range cids { - // this can take quite a while, so check for shutdown with every opportunity - if err := s.checkClosing(); err != nil { - return err - } + var purgeCnt, liveCnt int + defer func() { + log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt) + }() + + deleteBatch := func() error { + pc, lc, err := s.purgeBatch(batch, deadCids, checkpoint, markSet) - w := s.getObjectWeight(c, weights, key) - weights[key(c)] = w + purgeCnt += pc + liveCnt += lc + batch = batch[:0] + + return err } - // sort! - sort.Slice(cids, func(i, j int) bool { - wi := weights[key(cids[i])] - wj := weights[key(cids[j])] - if wi == wj { - return bytes.Compare(cids[i].Hash(), cids[j].Hash()) > 0 + err := coldr.ForEach(func(c cid.Cid) error { + batch = append(batch, c) + if len(batch) == batchSize { + return deleteBatch() } - return wi > wj + return nil }) + if err != nil { + return err + } + + if len(batch) > 0 { + return deleteBatch() + } + return nil } -func (s *SplitStore) getObjectWeight(c cid.Cid, weights map[string]int, key func(cid.Cid) string) int { - w, ok := weights[key(c)] - if ok { - return w +func (s *SplitStore) purgeBatch(batch, deadCids []cid.Cid, checkpoint *Checkpoint, markSet MarkSet) (purgeCnt int, liveCnt int, err error) { + if err := s.checkClosing(); err != nil { + return 0, 0, err } - // we treat block headers specially to avoid walking the entire chain - var hdr types.BlockHeader - err := s.view(c, func(data []byte) error { - return hdr.UnmarshalCBOR(bytes.NewBuffer(data)) - }) - if err == nil { - w1 := s.getObjectWeight(hdr.ParentStateRoot, weights, key) - weights[key(hdr.ParentStateRoot)] = w1 + s.txnLk.Lock() + defer s.txnLk.Unlock() - w2 := s.getObjectWeight(hdr.Messages, weights, key) - weights[key(hdr.Messages)] = w2 + for _, c := range batch { + has, err := markSet.Has(c) + if err != nil { + return 0, 0, xerrors.Errorf("error checking markset for liveness: %w", err) + } - return 1 + w1 + w2 - } + if has { + liveCnt++ + continue + } - var links []cid.Cid - err = s.view(c, func(data []byte) error { - return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { - links = append(links, c) - }) - }) - if err != nil { - return 1 + deadCids = append(deadCids, c) } - w = 1 - for _, c := range links { - // these are internal refs, so dags will be dags - if c.Prefix().Codec != cid.DagCBOR { - w++ - continue + if len(deadCids) == 0 { + if err := checkpoint.Set(batch[len(batch)-1]); err != nil { + return 0, 0, xerrors.Errorf("error setting checkpoint: %w", err) } - wc := s.getObjectWeight(c, weights, key) - weights[key(c)] = wc + return 0, liveCnt, nil + } - w += wc + if err := s.hot.DeleteMany(s.ctx, deadCids); err != nil { + return 0, liveCnt, xerrors.Errorf("error purging cold objects: %w", err) } - return w + s.debug.LogDelete(deadCids) + purgeCnt = len(deadCids) + + if err := checkpoint.Set(batch[len(batch)-1]); err != nil { + return purgeCnt, liveCnt, xerrors.Errorf("error setting checkpoint: %w", err) + } + + return purgeCnt, liveCnt, nil } -func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error { - if len(cids) == 0 { - return nil +func (s *SplitStore) coldSetPath() string { + return filepath.Join(s.path, "coldset") +} + +func (s *SplitStore) checkpointPath() string { + return filepath.Join(s.path, "checkpoint") +} + +func (s *SplitStore) checkpointExists() bool { + _, err := os.Stat(s.checkpointPath()) + return err == nil +} + +func (s *SplitStore) completeCompaction() error { + checkpoint, last, err := OpenCheckpoint(s.checkpointPath()) + if err != nil { + return xerrors.Errorf("error opening checkpoint: %w", err) } + defer checkpoint.Close() //nolint:errcheck - // we don't delete one giant batch of millions of objects, but rather do smaller batches - // so that we don't stop the world for an extended period of time - done := false - for i := 0; !done; i++ { - start := i * batchSize - end := start + batchSize - if end >= len(cids) { - end = len(cids) - done = true - } + coldr, err := NewColdSetReader(s.coldSetPath()) + if err != nil { + return xerrors.Errorf("error opening coldset: %w", err) + } + defer coldr.Close() //nolint:errcheck - err := deleteBatch(cids[start:end]) - if err != nil { - return xerrors.Errorf("error deleting batch: %w", err) - } + markSet, err := s.markSetEnv.Recover("live") + if err != nil { + return xerrors.Errorf("error recovering markset: %w", err) } + defer markSet.Close() //nolint:errcheck + // PURGE + log.Info("purging cold objects from the hotstore") + startPurge := time.Now() + err = s.completePurge(coldr, checkpoint, last, markSet) + if err != nil { + return xerrors.Errorf("error purging cold objects: %w", err) + } + log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge)) + + markSet.EndCriticalSection() + + if err := checkpoint.Close(); err != nil { + log.Warnf("error closing checkpoint: %s", err) + } + if err := os.Remove(s.checkpointPath()); err != nil { + log.Warnf("error removing checkpoint: %s", err) + } + if err := coldr.Close(); err != nil { + log.Warnf("error closing coldset: %s", err) + } + if err := os.Remove(s.coldSetPath()); err != nil { + log.Warnf("error removing coldset: %s", err) + } + + // Note: at this point we can start the splitstore; a compaction should run on + // the first head change, which will trigger gc on the hotstore. + // We don't mind the second (back-to-back) compaction as the head will + // have advanced during marking and coldset accumulation. return nil } -func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error { +func (s *SplitStore) completePurge(coldr *ColdSetReader, checkpoint *Checkpoint, start cid.Cid, markSet MarkSet) error { + if !start.Defined() { + return s.purge(coldr, checkpoint, markSet) + } + + seeking := true + batch := make([]cid.Cid, 0, batchSize) deadCids := make([]cid.Cid, 0, batchSize) + var purgeCnt, liveCnt int defer func() { log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt) }() - return s.purgeBatch(cids, - func(cids []cid.Cid) error { - deadCids := deadCids[:0] - - for { - if err := s.checkClosing(); err != nil { - return err - } + deleteBatch := func() error { + pc, lc, err := s.purgeBatch(batch, deadCids, checkpoint, markSet) - s.txnLk.Lock() - if len(s.txnRefs) == 0 { - // keep the lock! - break - } + purgeCnt += pc + liveCnt += lc + batch = batch[:0] - // unlock and protect - s.txnLk.Unlock() + return err + } - err := s.protectTxnRefs(markSet) - if err != nil { - return xerrors.Errorf("error protecting transactional refs: %w", err) - } + err := coldr.ForEach(func(c cid.Cid) error { + if seeking { + if start.Equals(c) { + seeking = false } - defer s.txnLk.Unlock() - - for _, c := range cids { - live, err := markSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking for liveness: %w", err) - } + return nil + } - if live { - liveCnt++ - continue - } + batch = append(batch, c) + if len(batch) == batchSize { + return deleteBatch() + } - deadCids = append(deadCids, c) - } + return nil + }) - err := s.hot.DeleteMany(s.ctx, deadCids) - if err != nil { - return xerrors.Errorf("error purging cold objects: %w", err) - } + if err != nil { + return err + } - s.debug.LogDelete(deadCids) + if len(batch) > 0 { + return deleteBatch() + } - purgeCnt += len(deadCids) - return nil - }) + return nil } // I really don't like having this code, but we seem to have some occasional DAG references with diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index 7d84e0a4ca6..27d58bf1043 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "io/ioutil" + "os" "sync" "sync/atomic" "testing" @@ -20,12 +22,14 @@ import ( datastore "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" logging "github.com/ipfs/go-log/v2" + mh "github.com/multiformats/go-multihash" ) func init() { CompactionThreshold = 5 CompactionBoundary = 2 WarmupBoundary = 0 + SyncWaitTime = time.Millisecond logging.SetLogLevel("splitstore", "DEBUG") } @@ -80,8 +84,17 @@ func testSplitStore(t *testing.T, cfg *Config) { t.Fatal(err) } + path, err := ioutil.TempDir("", "splitstore.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + // open the splitstore - ss, err := Open("", ds, hot, cold, cfg) + ss, err := Open(path, ds, hot, cold, cfg) if err != nil { t.Fatal(err) } @@ -125,6 +138,10 @@ func testSplitStore(t *testing.T, cfg *Config) { } waitForCompaction := func() { + ss.txnSyncMx.Lock() + ss.txnSync = true + ss.txnSyncCond.Broadcast() + ss.txnSyncMx.Unlock() for atomic.LoadInt32(&ss.compacting) == 1 { time.Sleep(100 * time.Millisecond) } @@ -259,8 +276,17 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) { t.Fatal(err) } + path, err := ioutil.TempDir("", "splitstore.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + // open the splitstore - ss, err := Open("", ds, hot, cold, &Config{MarkSetType: "map"}) + ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"}) if err != nil { t.Fatal(err) } @@ -305,6 +331,10 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) { } waitForCompaction := func() { + ss.txnSyncMx.Lock() + ss.txnSync = true + ss.txnSyncCond.Broadcast() + ss.txnSyncMx.Unlock() for atomic.LoadInt32(&ss.compacting) == 1 { time.Sleep(100 * time.Millisecond) } @@ -426,17 +456,25 @@ func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, app type mockStore struct { mx sync.Mutex - set map[cid.Cid]blocks.Block + set map[string]blocks.Block } func newMockStore() *mockStore { - return &mockStore{set: make(map[cid.Cid]blocks.Block)} + return &mockStore{set: make(map[string]blocks.Block)} +} + +func (b *mockStore) keyOf(c cid.Cid) string { + return string(c.Hash()) +} + +func (b *mockStore) cidOf(k string) cid.Cid { + return cid.NewCidV1(cid.Raw, mh.Multihash([]byte(k))) } func (b *mockStore) Has(_ context.Context, cid cid.Cid) (bool, error) { b.mx.Lock() defer b.mx.Unlock() - _, ok := b.set[cid] + _, ok := b.set[b.keyOf(cid)] return ok, nil } @@ -446,7 +484,7 @@ func (b *mockStore) Get(_ context.Context, cid cid.Cid) (blocks.Block, error) { b.mx.Lock() defer b.mx.Unlock() - blk, ok := b.set[cid] + blk, ok := b.set[b.keyOf(cid)] if !ok { return nil, blockstore.ErrNotFound } @@ -474,7 +512,7 @@ func (b *mockStore) Put(_ context.Context, blk blocks.Block) error { b.mx.Lock() defer b.mx.Unlock() - b.set[blk.Cid()] = blk + b.set[b.keyOf(blk.Cid())] = blk return nil } @@ -483,7 +521,7 @@ func (b *mockStore) PutMany(_ context.Context, blks []blocks.Block) error { defer b.mx.Unlock() for _, blk := range blks { - b.set[blk.Cid()] = blk + b.set[b.keyOf(blk.Cid())] = blk } return nil } @@ -492,7 +530,7 @@ func (b *mockStore) DeleteBlock(_ context.Context, cid cid.Cid) error { b.mx.Lock() defer b.mx.Unlock() - delete(b.set, cid) + delete(b.set, b.keyOf(cid)) return nil } @@ -501,7 +539,7 @@ func (b *mockStore) DeleteMany(_ context.Context, cids []cid.Cid) error { defer b.mx.Unlock() for _, c := range cids { - delete(b.set, c) + delete(b.set, b.keyOf(c)) } return nil } @@ -515,7 +553,7 @@ func (b *mockStore) ForEachKey(f func(cid.Cid) error) error { defer b.mx.Unlock() for c := range b.set { - err := f(c) + err := f(b.cidOf(c)) if err != nil { return err } diff --git a/blockstore/splitstore/splitstore_warmup.go b/blockstore/splitstore/splitstore_warmup.go index 0670bd0f6ff..b564f03c795 100644 --- a/blockstore/splitstore/splitstore_warmup.go +++ b/blockstore/splitstore/splitstore_warmup.go @@ -62,7 +62,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { xcount := new(int64) missing := new(int64) - visitor, err := s.markSetEnv.Create("warmup", 0) + visitor, err := s.markSetEnv.New("warmup", 0) if err != nil { return xerrors.Errorf("error creating visitor: %w", err) } diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 3cb8977b290..0bc22c9d772 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -163,11 +163,11 @@ #HotStoreType = "badger" # MarkSetType specifies the type of the markset. - # It can be "map" (default) for in memory marking or "badger" for on-disk marking. + # It can be "map" for in memory marking or "badger" (default) for on-disk marking. # # type: string # env var: LOTUS_CHAINSTORE_SPLITSTORE_MARKSETTYPE - #MarkSetType = "map" + #MarkSetType = "badger" # HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond # the compaction boundary; default is 0. diff --git a/node/config/def.go b/node/config/def.go index eded665fff8..1573508665c 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -85,7 +85,7 @@ func DefaultFullNode() *FullNode { Splitstore: Splitstore{ ColdStoreType: "universal", HotStoreType: "badger", - MarkSetType: "map", + MarkSetType: "badger", HotStoreFullGCFrequency: 20, }, diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 79fcf7e43c6..80748d1e6e7 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -810,7 +810,7 @@ Only currently supported value is "badger".`, Type: "string", Comment: `MarkSetType specifies the type of the markset. -It can be "map" (default) for in memory marking or "badger" for on-disk marking.`, +It can be "map" for in memory marking or "badger" (default) for on-disk marking.`, }, { Name: "HotStoreMessageRetention", diff --git a/node/config/types.go b/node/config/types.go index a03e3d3eade..93f5459e650 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -363,7 +363,7 @@ type Splitstore struct { // Only currently supported value is "badger". HotStoreType string // MarkSetType specifies the type of the markset. - // It can be "map" (default) for in memory marking or "badger" for on-disk marking. + // It can be "map" for in memory marking or "badger" (default) for on-disk marking. MarkSetType string // HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond