diff --git a/blockstore/splitstore/markset.go b/blockstore/splitstore/markset.go index 218681e13cc..f173be575b4 100644 --- a/blockstore/splitstore/markset.go +++ b/blockstore/splitstore/markset.go @@ -10,20 +10,12 @@ import ( var errMarkSetClosed = errors.New("markset closed") -// MarkSet is a utility to keep track of seen CID, and later query for them. -// -// * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt). -// * If a probabilistic result is acceptable, it can be backed by a bloom filter +// MarkSet is an interface for tracking CIDs during chain and object walks type MarkSet interface { + ObjectVisitor Mark(cid.Cid) error Has(cid.Cid) (bool, error) Close() error - SetConcurrent() -} - -type MarkSetVisitor interface { - MarkSet - ObjectVisitor } type MarkSetEnv interface { @@ -31,11 +23,7 @@ type MarkSetEnv interface { // name is a unique name for this markset, mapped to the filesystem in disk-backed environments // sizeHint is a hint about the expected size of the markset Create(name string, sizeHint int64) (MarkSet, error) - // CreateVisitor is like Create, but returns a wider interface that supports atomic visits. - // It may not be supported by some markset types (e.g. bloom). - CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) - // SupportsVisitor returns true if the marksets created by this environment support the visitor interface. - SupportsVisitor() bool + // Close closes the markset Close() error } diff --git a/blockstore/splitstore/markset_badger.go b/blockstore/splitstore/markset_badger.go index ae06a69f87e..e30334b8912 100644 --- a/blockstore/splitstore/markset_badger.go +++ b/blockstore/splitstore/markset_badger.go @@ -34,7 +34,6 @@ type BadgerMarkSet struct { } var _ MarkSet = (*BadgerMarkSet)(nil) -var _ MarkSetVisitor = (*BadgerMarkSet)(nil) var badgerMarkSetBatchSize = 16384 @@ -48,7 +47,7 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) { return &BadgerMarkSetEnv{path: msPath}, nil } -func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, error) { +func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { name += ".tmp" path := filepath.Join(e.path, name) @@ -68,16 +67,6 @@ func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, return ms, nil } -func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { - return e.create(name, sizeHint) -} - -func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { - return e.create(name, sizeHint) -} - -func (e *BadgerMarkSetEnv) SupportsVisitor() bool { return true } - func (e *BadgerMarkSetEnv) Close() error { return os.RemoveAll(e.path) } diff --git a/blockstore/splitstore/markset_map.go b/blockstore/splitstore/markset_map.go index 07a7ae70d4c..fda964663ef 100644 --- a/blockstore/splitstore/markset_map.go +++ b/blockstore/splitstore/markset_map.go @@ -13,42 +13,27 @@ var _ MarkSetEnv = (*MapMarkSetEnv)(nil) type MapMarkSet struct { mx sync.RWMutex set map[string]struct{} - - ts bool } var _ MarkSet = (*MapMarkSet)(nil) -var _ MarkSetVisitor = (*MapMarkSet)(nil) func NewMapMarkSetEnv() (*MapMarkSetEnv, error) { return &MapMarkSetEnv{}, nil } -func (e *MapMarkSetEnv) create(name string, sizeHint int64) (*MapMarkSet, error) { +func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { return &MapMarkSet{ set: make(map[string]struct{}, sizeHint), }, nil } -func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { - return e.create(name, sizeHint) -} - -func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { - return e.create(name, sizeHint) -} - -func (e *MapMarkSetEnv) SupportsVisitor() bool { return true } - func (e *MapMarkSetEnv) Close() error { return nil } func (s *MapMarkSet) Mark(cid cid.Cid) error { - if s.ts { - s.mx.Lock() - defer s.mx.Unlock() - } + s.mx.Lock() + defer s.mx.Unlock() if s.set == nil { return errMarkSetClosed @@ -59,10 +44,8 @@ func (s *MapMarkSet) Mark(cid cid.Cid) error { } func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) { - if s.ts { - s.mx.RLock() - defer s.mx.RUnlock() - } + s.mx.RLock() + defer s.mx.RUnlock() if s.set == nil { return false, errMarkSetClosed @@ -73,10 +56,8 @@ func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) { } func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) { - if s.ts { - s.mx.Lock() - defer s.mx.Unlock() - } + s.mx.Lock() + defer s.mx.Unlock() if s.set == nil { return false, errMarkSetClosed @@ -92,14 +73,9 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) { } func (s *MapMarkSet) Close() error { - if s.ts { - s.mx.Lock() - defer s.mx.Unlock() - } + s.mx.Lock() + defer s.mx.Unlock() + s.set = nil return nil } - -func (s *MapMarkSet) SetConcurrent() { - s.ts = true -} diff --git a/blockstore/splitstore/markset_test.go b/blockstore/splitstore/markset_test.go index a4a42e8601b..de9421f0894 100644 --- a/blockstore/splitstore/markset_test.go +++ b/blockstore/splitstore/markset_test.go @@ -167,7 +167,7 @@ func testMarkSetVisitor(t *testing.T, lsType string) { } defer env.Close() //nolint:errcheck - visitor, err := env.CreateVisitor("test", 0) + visitor, err := env.Create("test", 0) if err != nil { t.Fatal(err) } diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index f6715ea333c..62cb2459e54 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -186,10 +186,6 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co return nil, err } - if !markSetEnv.SupportsVisitor() { - return nil, xerrors.Errorf("markset type does not support atomic visitors") - } - // and now we can make a SplitStore ss := &SplitStore{ cfg: cfg, diff --git a/blockstore/splitstore/splitstore_check.go b/blockstore/splitstore/splitstore_check.go index c83ed7b285a..0b4cfe04472 100644 --- a/blockstore/splitstore/splitstore_check.go +++ b/blockstore/splitstore/splitstore_check.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "sync/atomic" "time" @@ -67,7 +68,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { } defer output.Close() //nolint:errcheck + var mx sync.Mutex write := func(format string, args ...interface{}) { + mx.Lock() + defer mx.Unlock() _, err := fmt.Fprintf(output, format+"\n", args...) if err != nil { log.Warnf("error writing check output: %s", err) @@ -82,9 +86,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { write("compaction index: %d", s.compactionIndex) write("--") - var coldCnt, missingCnt int64 + coldCnt := new(int64) + missingCnt := new(int64) - visitor, err := s.markSetEnv.CreateVisitor("check", 0) + visitor, err := s.markSetEnv.Create("check", 0) if err != nil { return xerrors.Errorf("error creating visitor: %w", err) } @@ -111,10 +116,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { } if has { - coldCnt++ + atomic.AddInt64(coldCnt, 1) write("cold object reference: %s", c) } else { - missingCnt++ + atomic.AddInt64(missingCnt, 1) write("missing object reference: %s", c) return errStopWalk } @@ -128,9 +133,9 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { return err } - log.Infow("check done", "cold", coldCnt, "missing", missingCnt) + log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt) write("--") - write("cold: %d missing: %d", coldCnt, missingCnt) + write("cold: %d missing: %d", *coldCnt, *missingCnt) write("DONE") return nil diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 13ab90ac006..20f99af35c0 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -5,6 +5,7 @@ import ( "errors" "runtime" "sort" + "sync" "sync/atomic" "time" @@ -227,7 +228,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) { } // protect all pending transactional references -func (s *SplitStore) protectTxnRefs(markSet MarkSetVisitor) error { +func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { for { var txnRefs map[cid.Cid]struct{} @@ -299,14 +300,14 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSetVisitor) error { // transactionally protect a reference by walking the object and marking. // concurrent markings are short circuited by checking the markset. -func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSetVisitor) error { +func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { if err := s.checkClosing(); err != nil { return err } // Note: cold objects are deleted heaviest first, so the consituents of an object // cannot be deleted before the object itself. - return s.walkObjectIncomplete(root, tmpVisitor(), + return s.walkObjectIncomplete(root, newTmpVisitor(), func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk @@ -397,7 +398,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex) - markSet, err := s.markSetEnv.CreateVisitor("live", s.markSetSize) + markSet, err := s.markSetEnv.Create("live", s.markSetSize) if err != nil { return xerrors.Errorf("error creating mark set: %w", err) } @@ -602,8 +603,8 @@ func (s *SplitStore) beginTxnProtect() { s.txnMissing = make(map[cid.Cid]struct{}) } -func (s *SplitStore) beginTxnMarking(markSet MarkSetVisitor) { - markSet.SetConcurrent() +func (s *SplitStore) beginTxnMarking(markSet MarkSet) { + log.Info("beginning transactional marking") } func (s *SplitStore) endTxnProtect() { @@ -621,26 +622,33 @@ func (s *SplitStore) endTxnProtect() { func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch, visitor ObjectVisitor, f func(cid.Cid) error) error { - var walked *cid.Set - toWalk := ts.Cids() - walkCnt := 0 - scanCnt := 0 + var walked ObjectVisitor + var mx sync.Mutex + // we copy the tipset first into a new slice, which allows us to reuse it in every epoch. + toWalk := make([]cid.Cid, len(ts.Cids())) + copy(toWalk, ts.Cids()) + walkCnt := new(int64) + scanCnt := new(int64) stopWalk := func(_ cid.Cid) error { return errStopWalk } walkBlock := func(c cid.Cid) error { - if !walked.Visit(c) { + visit, err := walked.Visit(c) + if err != nil { + return err + } + if !visit { return nil } - walkCnt++ + atomic.AddInt64(walkCnt, 1) if err := f(c); err != nil { return err } var hdr types.BlockHeader - err := s.view(c, func(data []byte) error { + err = s.view(c, func(data []byte) error { return hdr.UnmarshalCBOR(bytes.NewBuffer(data)) }) @@ -676,11 +684,13 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil { return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) } - scanCnt++ + atomic.AddInt64(scanCnt, 1) } if hdr.Height > 0 { + mx.Lock() toWalk = append(toWalk, hdr.Parents...) + mx.Unlock() } return nil @@ -692,20 +702,43 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp return err } + workers := len(toWalk) + if workers > runtime.NumCPU()/2 { + workers = runtime.NumCPU() / 2 + } + if workers < 2 { + workers = 2 + } + // the walk is BFS, so we can reset the walked set in every iteration and avoid building up // a set that contains all blocks (1M epochs -> 5M blocks -> 200MB worth of memory and growing // over time) - walked = cid.NewSet() - walking := toWalk - toWalk = nil - for _, c := range walking { - if err := walkBlock(c); err != nil { - return xerrors.Errorf("error walking block (cid: %s): %w", c, err) - } + walked = newConcurrentVisitor() + workch := make(chan cid.Cid, len(toWalk)) + for _, c := range toWalk { + workch <- c + } + close(workch) + toWalk = toWalk[:0] + + g := new(errgroup.Group) + for i := 0; i < workers; i++ { + g.Go(func() error { + for c := range workch { + if err := walkBlock(c); err != nil { + return xerrors.Errorf("error walking block (cid: %s): %w", c, err) + } + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return err } } - log.Infow("chain walk done", "walked", walkCnt, "scanned", scanCnt) + log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt) return nil } @@ -1011,7 +1044,7 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro return nil } -func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSetVisitor) error { +func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error { deadCids := make([]cid.Cid, 0, batchSize) var purgeCnt, liveCnt int defer func() { @@ -1077,7 +1110,7 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSetVisitor) error { // have this gem[TM]. // My best guess is that they are parent message receipts or yet to be computed state roots; magik // thinks the cause may be block validation. -func (s *SplitStore) waitForMissingRefs(markSet MarkSetVisitor) { +func (s *SplitStore) waitForMissingRefs(markSet MarkSet) { s.txnLk.Lock() missing := s.txnMissing s.txnMissing = nil @@ -1106,7 +1139,7 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSetVisitor) { } towalk := missing - visitor := tmpVisitor() + visitor := newTmpVisitor() missing = make(map[cid.Cid]struct{}) for c := range towalk { diff --git a/blockstore/splitstore/splitstore_warmup.go b/blockstore/splitstore/splitstore_warmup.go index 2a39f6c9d8d..0670bd0f6ff 100644 --- a/blockstore/splitstore/splitstore_warmup.go +++ b/blockstore/splitstore/splitstore_warmup.go @@ -1,6 +1,7 @@ package splitstore import ( + "sync" "sync/atomic" "time" @@ -55,12 +56,13 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { if WarmupBoundary < epoch { boundaryEpoch = epoch - WarmupBoundary } + var mx sync.Mutex batchHot := make([]blocks.Block, 0, batchSize) - count := int64(0) - xcount := int64(0) - missing := int64(0) + count := new(int64) + xcount := new(int64) + missing := new(int64) - visitor, err := s.markSetEnv.CreateVisitor("warmup", 0) + visitor, err := s.markSetEnv.Create("warmup", 0) if err != nil { return xerrors.Errorf("error creating visitor: %w", err) } @@ -73,7 +75,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { return errStopWalk } - count++ + atomic.AddInt64(count, 1) has, err := s.hot.Has(s.ctx, c) if err != nil { @@ -87,22 +89,25 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { blk, err := s.cold.Get(s.ctx, c) if err != nil { if err == bstore.ErrNotFound { - missing++ + atomic.AddInt64(missing, 1) return errStopWalk } return err } - xcount++ + atomic.AddInt64(xcount, 1) + mx.Lock() batchHot = append(batchHot, blk) if len(batchHot) == batchSize { err = s.hot.PutMany(s.ctx, batchHot) if err != nil { + mx.Unlock() return err } batchHot = batchHot[:0] } + mx.Unlock() return nil }) @@ -118,9 +123,9 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { } } - log.Infow("warmup stats", "visited", count, "warm", xcount, "missing", missing) + log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing) - s.markSetSize = count + count>>2 // overestimate a bit + s.markSetSize = *count + *count>>2 // overestimate a bit err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize)) if err != nil { log.Warnf("error saving mark set size: %s", err) diff --git a/blockstore/splitstore/visitor.go b/blockstore/splitstore/visitor.go index f89c8f3894e..9dfbb78e704 100644 --- a/blockstore/splitstore/visitor.go +++ b/blockstore/splitstore/visitor.go @@ -1,6 +1,8 @@ package splitstore import ( + "sync" + cid "github.com/ipfs/go-cid" ) @@ -17,16 +19,34 @@ func (v *noopVisitor) Visit(_ cid.Cid) (bool, error) { return true, nil } -type cidSetVisitor struct { +type tmpVisitor struct { set *cid.Set } -var _ ObjectVisitor = (*cidSetVisitor)(nil) +var _ ObjectVisitor = (*tmpVisitor)(nil) -func (v *cidSetVisitor) Visit(c cid.Cid) (bool, error) { +func (v *tmpVisitor) Visit(c cid.Cid) (bool, error) { return v.set.Visit(c), nil } -func tmpVisitor() ObjectVisitor { - return &cidSetVisitor{set: cid.NewSet()} +func newTmpVisitor() ObjectVisitor { + return &tmpVisitor{set: cid.NewSet()} +} + +type concurrentVisitor struct { + mx sync.Mutex + set *cid.Set +} + +var _ ObjectVisitor = (*concurrentVisitor)(nil) + +func newConcurrentVisitor() *concurrentVisitor { + return &concurrentVisitor{set: cid.NewSet()} +} + +func (v *concurrentVisitor) Visit(c cid.Cid) (bool, error) { + v.mx.Lock() + defer v.mx.Unlock() + + return v.set.Visit(c), nil }