Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

splitstore sortless compaction #8008

Merged
merged 50 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
45c2f34
refactor marksets for critical section on-disk persistence
vyzo Jan 28, 2022
1bf396f
add test for markset persistence
vyzo Jan 28, 2022
730acea
immediately flush pending writes when entering critical section
vyzo Jan 28, 2022
67fbf9e
improve peristence test
vyzo Jan 28, 2022
f9fd47e
use temporary dir for splitstore test path
vyzo Jan 29, 2022
d140909
add MarkMany to MarkSet interface
vyzo Jan 30, 2022
a4c1a34
check for existence of badger db in recover
vyzo Jan 30, 2022
cf09dd0
moar markset tests
vyzo Jan 30, 2022
322b858
make markSets synchronous in critical section
vyzo Jan 30, 2022
c94eee5
on disk checkpoints
vyzo Jan 30, 2022
6ede77b
checkpoint test
vyzo Jan 30, 2022
64cda4a
on disk coldsets
vyzo Jan 30, 2022
7233314
prettier checkpoint close
vyzo Jan 30, 2022
4b8369c
fix buffered reads
vyzo Jan 30, 2022
a4f720d
coldset test
vyzo Jan 30, 2022
dbc8903
sortless compaction
vyzo Jan 30, 2022
20b7502
fix mockStore for splitstore tests
vyzo Jan 30, 2022
7931f1f
fix lint
vyzo Jan 30, 2022
7b8447a
reinstante waitForMissingRefs
vyzo Jan 30, 2022
a9d4495
use both hot and cold when doing fetches for markset positive objects
vyzo Jan 30, 2022
1900c90
account for missing refs in the markset in Has
vyzo Jan 31, 2022
ee63be2
fix race in protectView
vyzo Jan 31, 2022
c9bd5ec
mark tipset references to protect them during critical section
vyzo Jan 31, 2022
1abfc5b
fix comment
vyzo Jan 31, 2022
2b14bda
recursively mark puts during the critical section
vyzo Jan 31, 2022
710fda4
fix putmany marking
vyzo Jan 31, 2022
5b9ea1b
avoid races in beginCriticalSection
vyzo Jan 31, 2022
877dfbe
hold the lock in the second protect call
vyzo Jan 31, 2022
7896af7
use walkObjectIncomplete for marking live refs
vyzo Jan 31, 2022
37673c6
downgrade marking log to debug
vyzo Jan 31, 2022
3aabb03
synchronously mark live refs on put/putmany
vyzo Feb 1, 2022
11ae856
optimize single object marking in markLiveRefs
vyzo Feb 1, 2022
fd07ca8
wait for the sync gap time befor starting the purge
vyzo Feb 1, 2022
6353fa7
decouple SyncGapTime from wait time
vyzo Feb 1, 2022
578b569
check for closing after the sync wait
vyzo Feb 1, 2022
7b4ab20
wait for sync in a non racey way
vyzo Feb 1, 2022
cd95892
fix test
vyzo Feb 1, 2022
9c92d77
improve robustness of waitForSync
vyzo Feb 1, 2022
b13aa8f
unblock waitForSync on close
vyzo Feb 1, 2022
4b4104e
fix comment
vyzo Feb 1, 2022
c1d8368
share a concurrent visitor between workers in markLiveRefs
vyzo Feb 1, 2022
75ad0c3
badger markset option tweaks
vyzo Feb 2, 2022
049b489
add note about compaction algorithm changes in README
vyzo Feb 2, 2022
03352ea
make badger the default splitstore markset type
vyzo Feb 6, 2022
d45e207
update README for map as the default
vyzo Feb 6, 2022
1221c0b
make gen
vyzo Feb 6, 2022
0ad1f0e
moar make gen
vyzo Feb 6, 2022
966071d
Merge pull request #8034 from filecoin-project/feat/splitstore-defaul…
magik6k Feb 7, 2022
8ddf476
update README
vyzo Feb 8, 2022
e129ae3
refactor nextBatch in badger markset
vyzo Feb 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions blockstore/splitstore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration:
blockstore and discards writes; this is necessary to support syncing from a snapshot.
- `MarkSetType` -- specifies the type of markset to use during compaction.
The markset is the data structure used by compaction/gc to track live objects.
The default value is `"map"`, which will use an in-memory map; if you are limited
in memory (or indeed see compaction run out of memory), you can also specify
`"badger"` which will use an disk backed markset, using badger. This will use
much less memory, but will also make compaction slower.
The default value is "badger", which will use a disk backed markset using badger.
If you have a lot of memory (48G or more) you can also use "map", which will use
an in memory markset, speeding up compaction at the cost of higher memory usage.
Note: If you are using a VPS with a network volume, you need to provision at least
3000 IOPs with the badger markset.
- `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4
finalities maintained by default, to maintain messages and message receipts in the
hotstore. This is useful for assistive nodes that want to support syncing for other
Expand Down Expand Up @@ -105,6 +106,12 @@ Compaction works transactionally with the following algorithm:
- We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
- We then end the transaction and compact/gc the hotstore.

As of [#8008](https://github.com/filecoin-project/lotus/pull/8008) the compaction algorithm has been
modified to eliminate sorting and maintain the cold object set on disk. This drastically reduces
memory usage; in fact, when using badger as the markset compaction uses very little memory, and
it should be now possible to run splitstore with 32GB of RAM or less without danger of running out of
memory during compaction.

## Garbage Collection

TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)
Expand Down
118 changes: 118 additions & 0 deletions blockstore/splitstore/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package splitstore

import (
"bufio"
"io"
"os"

"golang.org/x/xerrors"

cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)

type Checkpoint struct {
file *os.File
buf *bufio.Writer
}

func NewCheckpoint(path string) (*Checkpoint, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating checkpoint: %w", err)
}
buf := bufio.NewWriter(file)

return &Checkpoint{
file: file,
buf: buf,
}, nil
}

func OpenCheckpoint(path string) (*Checkpoint, cid.Cid, error) {
filein, err := os.Open(path)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for reading: %w", err)
}
defer filein.Close() //nolint:errcheck

bufin := bufio.NewReader(filein)
start, err := readRawCid(bufin, nil)
if err != nil && err != io.EOF {
return nil, cid.Undef, xerrors.Errorf("error reading cid from checkpoint: %w", err)
}

fileout, err := os.OpenFile(path, os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for writing: %w", err)
}
bufout := bufio.NewWriter(fileout)

return &Checkpoint{
file: fileout,
buf: bufout,
}, start, nil
}

func (cp *Checkpoint) Set(c cid.Cid) error {
if _, err := cp.file.Seek(0, io.SeekStart); err != nil {
return xerrors.Errorf("error seeking beginning of checkpoint: %w", err)
}

if err := writeRawCid(cp.buf, c, true); err != nil {
return xerrors.Errorf("error writing cid to checkpoint: %w", err)
}

return nil
}

func (cp *Checkpoint) Close() error {
if cp.file == nil {
return nil
}

err := cp.file.Close()
cp.file = nil
cp.buf = nil

return err
}

func readRawCid(buf *bufio.Reader, hbuf []byte) (cid.Cid, error) {
sz, err := buf.ReadByte()
if err != nil {
return cid.Undef, err // don't wrap EOF as it is not an error here
}

if hbuf == nil {
hbuf = make([]byte, int(sz))
} else {
hbuf = hbuf[:int(sz)]
}

if _, err := io.ReadFull(buf, hbuf); err != nil {
return cid.Undef, xerrors.Errorf("error reading hash: %w", err) // wrap EOF, it's corrupt
}

hash, err := mh.Cast(hbuf)
if err != nil {
return cid.Undef, xerrors.Errorf("error casting multihash: %w", err)
}

return cid.NewCidV1(cid.Raw, hash), nil
}

func writeRawCid(buf *bufio.Writer, c cid.Cid, flush bool) error {
hash := c.Hash()
if err := buf.WriteByte(byte(len(hash))); err != nil {
return err
}
if _, err := buf.Write(hash); err != nil {
return err
}
if flush {
return buf.Flush()
}

return nil
}
147 changes: 147 additions & 0 deletions blockstore/splitstore/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package splitstore

import (
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
)

func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "checkpoint.*")
if err != nil {
t.Fatal(err)
}

t.Cleanup(func() {
_ = os.RemoveAll(dir)
})

path := filepath.Join(dir, "checkpoint")

makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}

return cid.NewCidV1(cid.Raw, h)
}

k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")

cp, err := NewCheckpoint(path)
if err != nil {
t.Fatal(err)
}

if err := cp.Set(k1); err != nil {
t.Fatal(err)
}
if err := cp.Set(k2); err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err := OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k2) {
t.Fatalf("expected start to be %s; got %s", k2, start)
}

if err := cp.Set(k3); err != nil {
t.Fatal(err)
}
if err := cp.Set(k4); err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k4) {
t.Fatalf("expected start to be %s; got %s", k4, start)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

// also test correct operation with an empty checkpoint
cp, err = NewCheckpoint(path)
if err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}

if start.Defined() {
t.Fatal("expected start to be undefined")
}

if err := cp.Set(k1); err != nil {
t.Fatal(err)
}
if err := cp.Set(k2); err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k2) {
t.Fatalf("expected start to be %s; got %s", k2, start)
}

if err := cp.Set(k3); err != nil {
t.Fatal(err)
}
if err := cp.Set(k4); err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k4) {
t.Fatalf("expected start to be %s; got %s", k4, start)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

}
102 changes: 102 additions & 0 deletions blockstore/splitstore/coldset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package splitstore

import (
"bufio"
"io"
"os"

"golang.org/x/xerrors"

cid "github.com/ipfs/go-cid"
)

type ColdSetWriter struct {
file *os.File
buf *bufio.Writer
}

type ColdSetReader struct {
file *os.File
buf *bufio.Reader
}

func NewColdSetWriter(path string) (*ColdSetWriter, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating coldset: %w", err)
}
buf := bufio.NewWriter(file)

return &ColdSetWriter{
file: file,
buf: buf,
}, nil
}

func NewColdSetReader(path string) (*ColdSetReader, error) {
file, err := os.Open(path)
if err != nil {
return nil, xerrors.Errorf("error opening coldset: %w", err)
}
buf := bufio.NewReader(file)

return &ColdSetReader{
file: file,
buf: buf,
}, nil
}

func (s *ColdSetWriter) Write(c cid.Cid) error {
return writeRawCid(s.buf, c, false)
}

func (s *ColdSetWriter) Close() error {
if s.file == nil {
return nil
}

err1 := s.buf.Flush()
err2 := s.file.Close()
s.buf = nil
s.file = nil

if err1 != nil {
return err1
}
return err2
}

func (s *ColdSetReader) ForEach(f func(cid.Cid) error) error {
hbuf := make([]byte, 256)
for {
next, err := readRawCid(s.buf, hbuf)
if err != nil {
if err == io.EOF {
return nil
}

return xerrors.Errorf("error reading coldset: %w", err)
}

if err := f(next); err != nil {
return err
}
}
}

func (s *ColdSetReader) Reset() error {
_, err := s.file.Seek(0, io.SeekStart)
return err
}

func (s *ColdSetReader) Close() error {
if s.file == nil {
return nil
}

err := s.file.Close()
s.file = nil
s.buf = nil

return err
}
Loading