Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some basic splitstore refactors #7999

Merged
merged 7 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions blockstore/splitstore/markset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,20 @@ import (

var errMarkSetClosed = errors.New("markset closed")

// MarkSet is a utility to keep track of seen CID, and later query for them.
//
// * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt).
// * If a probabilistic result is acceptable, it can be backed by a bloom filter
// MarkSet is an interface for tracking CIDs during chain and object walks
type MarkSet interface {
ObjectVisitor
Mark(cid.Cid) error
Has(cid.Cid) (bool, error)
Close() error
SetConcurrent()
}

type MarkSetVisitor interface {
MarkSet
ObjectVisitor
}

type MarkSetEnv interface {
// Create creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem in disk-backed environments
// sizeHint is a hint about the expected size of the markset
Create(name string, sizeHint int64) (MarkSet, error)
// CreateVisitor is like Create, but returns a wider interface that supports atomic visits.
// It may not be supported by some markset types (e.g. bloom).
CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error)
// SupportsVisitor returns true if the marksets created by this environment support the visitor interface.
SupportsVisitor() bool
// Close closes the markset
Close() error
}

Expand Down
13 changes: 1 addition & 12 deletions blockstore/splitstore/markset_badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type BadgerMarkSet struct {
}

var _ MarkSet = (*BadgerMarkSet)(nil)
var _ MarkSetVisitor = (*BadgerMarkSet)(nil)

var badgerMarkSetBatchSize = 16384

Expand All @@ -48,7 +47,7 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil
}

func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, error) {
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
name += ".tmp"
path := filepath.Join(e.path, name)

Expand All @@ -68,16 +67,6 @@ func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet,
return ms, nil
}

func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}

func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}

func (e *BadgerMarkSetEnv) SupportsVisitor() bool { return true }

func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
}
Expand Down
44 changes: 10 additions & 34 deletions blockstore/splitstore/markset_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,27 @@ var _ MarkSetEnv = (*MapMarkSetEnv)(nil)
type MapMarkSet struct {
mx sync.RWMutex
set map[string]struct{}

ts bool
}

var _ MarkSet = (*MapMarkSet)(nil)
var _ MarkSetVisitor = (*MapMarkSet)(nil)

func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil
}

func (e *MapMarkSetEnv) create(name string, sizeHint int64) (*MapMarkSet, error) {
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return &MapMarkSet{
set: make(map[string]struct{}, sizeHint),
}, nil
}

func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}

func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}

func (e *MapMarkSetEnv) SupportsVisitor() bool { return true }

func (e *MapMarkSetEnv) Close() error {
return nil
}

func (s *MapMarkSet) Mark(cid cid.Cid) error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.mx.Lock()
defer s.mx.Unlock()

if s.set == nil {
return errMarkSetClosed
Expand All @@ -59,10 +44,8 @@ func (s *MapMarkSet) Mark(cid cid.Cid) error {
}

func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
if s.ts {
s.mx.RLock()
defer s.mx.RUnlock()
}
s.mx.RLock()
defer s.mx.RUnlock()

if s.set == nil {
return false, errMarkSetClosed
Expand All @@ -73,10 +56,8 @@ func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
}

func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.mx.Lock()
defer s.mx.Unlock()

if s.set == nil {
return false, errMarkSetClosed
Expand All @@ -92,14 +73,9 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
}

func (s *MapMarkSet) Close() error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.mx.Lock()
defer s.mx.Unlock()

s.set = nil
return nil
}

func (s *MapMarkSet) SetConcurrent() {
s.ts = true
}
2 changes: 1 addition & 1 deletion blockstore/splitstore/markset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func testMarkSetVisitor(t *testing.T, lsType string) {
}
defer env.Close() //nolint:errcheck

visitor, err := env.CreateVisitor("test", 0)
visitor, err := env.Create("test", 0)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 0 additions & 4 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,6 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
return nil, err
}

if !markSetEnv.SupportsVisitor() {
return nil, xerrors.Errorf("markset type does not support atomic visitors")
}

// and now we can make a SplitStore
ss := &SplitStore{
cfg: cfg,
Expand Down
17 changes: 11 additions & 6 deletions blockstore/splitstore/splitstore_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -67,7 +68,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
}
defer output.Close() //nolint:errcheck

var mx sync.Mutex
write := func(format string, args ...interface{}) {
mx.Lock()
defer mx.Unlock()
_, err := fmt.Fprintf(output, format+"\n", args...)
if err != nil {
log.Warnf("error writing check output: %s", err)
Expand All @@ -82,9 +86,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
write("compaction index: %d", s.compactionIndex)
write("--")

var coldCnt, missingCnt int64
coldCnt := new(int64)
missingCnt := new(int64)

visitor, err := s.markSetEnv.CreateVisitor("check", 0)
visitor, err := s.markSetEnv.Create("check", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
Expand All @@ -111,10 +116,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
}

if has {
coldCnt++
atomic.AddInt64(coldCnt, 1)
write("cold object reference: %s", c)
} else {
missingCnt++
atomic.AddInt64(missingCnt, 1)
write("missing object reference: %s", c)
return errStopWalk
}
Expand All @@ -128,9 +133,9 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
return err
}

log.Infow("check done", "cold", coldCnt, "missing", missingCnt)
log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt)
write("--")
write("cold: %d missing: %d", coldCnt, missingCnt)
write("cold: %d missing: %d", *coldCnt, *missingCnt)
write("DONE")

return nil
Expand Down
Loading