Skip to content

Commit

Permalink
Merge pull request #8008 from filecoin-project/feat/splitstore-sortle…
Browse files Browse the repository at this point in the history
…ss-compaction

splitstore sortless compaction
  • Loading branch information
magik6k authored Feb 9, 2022
2 parents 026c510 + e129ae3 commit 44fd0e3
Show file tree
Hide file tree
Showing 18 changed files with 1,744 additions and 233 deletions.
15 changes: 11 additions & 4 deletions blockstore/splitstore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration:
blockstore and discards writes; this is necessary to support syncing from a snapshot.
- `MarkSetType` -- specifies the type of markset to use during compaction.
The markset is the data structure used by compaction/gc to track live objects.
The default value is `"map"`, which will use an in-memory map; if you are limited
in memory (or indeed see compaction run out of memory), you can also specify
`"badger"` which will use an disk backed markset, using badger. This will use
much less memory, but will also make compaction slower.
The default value is "badger", which will use a disk backed markset using badger.
If you have a lot of memory (48G or more) you can also use "map", which will use
an in memory markset, speeding up compaction at the cost of higher memory usage.
Note: If you are using a VPS with a network volume, you need to provision at least
3000 IOPs with the badger markset.
- `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4
finalities maintained by default, to maintain messages and message receipts in the
hotstore. This is useful for assistive nodes that want to support syncing for other
Expand Down Expand Up @@ -105,6 +106,12 @@ Compaction works transactionally with the following algorithm:
- We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
- We then end the transaction and compact/gc the hotstore.

As of [#8008](https://github.com/filecoin-project/lotus/pull/8008) the compaction algorithm has been
modified to eliminate sorting and maintain the cold object set on disk. This drastically reduces
memory usage; in fact, when using badger as the markset compaction uses very little memory, and
it should be now possible to run splitstore with 32GB of RAM or less without danger of running out of
memory during compaction.

## Garbage Collection

TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)
Expand Down
118 changes: 118 additions & 0 deletions blockstore/splitstore/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package splitstore

import (
"bufio"
"io"
"os"

"golang.org/x/xerrors"

cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)

type Checkpoint struct {
file *os.File
buf *bufio.Writer
}

func NewCheckpoint(path string) (*Checkpoint, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating checkpoint: %w", err)
}
buf := bufio.NewWriter(file)

return &Checkpoint{
file: file,
buf: buf,
}, nil
}

func OpenCheckpoint(path string) (*Checkpoint, cid.Cid, error) {
filein, err := os.Open(path)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for reading: %w", err)
}
defer filein.Close() //nolint:errcheck

bufin := bufio.NewReader(filein)
start, err := readRawCid(bufin, nil)
if err != nil && err != io.EOF {
return nil, cid.Undef, xerrors.Errorf("error reading cid from checkpoint: %w", err)
}

fileout, err := os.OpenFile(path, os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for writing: %w", err)
}
bufout := bufio.NewWriter(fileout)

return &Checkpoint{
file: fileout,
buf: bufout,
}, start, nil
}

func (cp *Checkpoint) Set(c cid.Cid) error {
if _, err := cp.file.Seek(0, io.SeekStart); err != nil {
return xerrors.Errorf("error seeking beginning of checkpoint: %w", err)
}

if err := writeRawCid(cp.buf, c, true); err != nil {
return xerrors.Errorf("error writing cid to checkpoint: %w", err)
}

return nil
}

func (cp *Checkpoint) Close() error {
if cp.file == nil {
return nil
}

err := cp.file.Close()
cp.file = nil
cp.buf = nil

return err
}

func readRawCid(buf *bufio.Reader, hbuf []byte) (cid.Cid, error) {
sz, err := buf.ReadByte()
if err != nil {
return cid.Undef, err // don't wrap EOF as it is not an error here
}

if hbuf == nil {
hbuf = make([]byte, int(sz))
} else {
hbuf = hbuf[:int(sz)]
}

if _, err := io.ReadFull(buf, hbuf); err != nil {
return cid.Undef, xerrors.Errorf("error reading hash: %w", err) // wrap EOF, it's corrupt
}

hash, err := mh.Cast(hbuf)
if err != nil {
return cid.Undef, xerrors.Errorf("error casting multihash: %w", err)
}

return cid.NewCidV1(cid.Raw, hash), nil
}

func writeRawCid(buf *bufio.Writer, c cid.Cid, flush bool) error {
hash := c.Hash()
if err := buf.WriteByte(byte(len(hash))); err != nil {
return err
}
if _, err := buf.Write(hash); err != nil {
return err
}
if flush {
return buf.Flush()
}

return nil
}
147 changes: 147 additions & 0 deletions blockstore/splitstore/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package splitstore

import (
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
)

func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "checkpoint.*")
if err != nil {
t.Fatal(err)
}

t.Cleanup(func() {
_ = os.RemoveAll(dir)
})

path := filepath.Join(dir, "checkpoint")

makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}

return cid.NewCidV1(cid.Raw, h)
}

k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")

cp, err := NewCheckpoint(path)
if err != nil {
t.Fatal(err)
}

if err := cp.Set(k1); err != nil {
t.Fatal(err)
}
if err := cp.Set(k2); err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err := OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k2) {
t.Fatalf("expected start to be %s; got %s", k2, start)
}

if err := cp.Set(k3); err != nil {
t.Fatal(err)
}
if err := cp.Set(k4); err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k4) {
t.Fatalf("expected start to be %s; got %s", k4, start)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

// also test correct operation with an empty checkpoint
cp, err = NewCheckpoint(path)
if err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}

if start.Defined() {
t.Fatal("expected start to be undefined")
}

if err := cp.Set(k1); err != nil {
t.Fatal(err)
}
if err := cp.Set(k2); err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k2) {
t.Fatalf("expected start to be %s; got %s", k2, start)
}

if err := cp.Set(k3); err != nil {
t.Fatal(err)
}
if err := cp.Set(k4); err != nil {
t.Fatal(err)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k4) {
t.Fatalf("expected start to be %s; got %s", k4, start)
}

if err := cp.Close(); err != nil {
t.Fatal(err)
}

}
102 changes: 102 additions & 0 deletions blockstore/splitstore/coldset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package splitstore

import (
"bufio"
"io"
"os"

"golang.org/x/xerrors"

cid "github.com/ipfs/go-cid"
)

type ColdSetWriter struct {
file *os.File
buf *bufio.Writer
}

type ColdSetReader struct {
file *os.File
buf *bufio.Reader
}

func NewColdSetWriter(path string) (*ColdSetWriter, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating coldset: %w", err)
}
buf := bufio.NewWriter(file)

return &ColdSetWriter{
file: file,
buf: buf,
}, nil
}

func NewColdSetReader(path string) (*ColdSetReader, error) {
file, err := os.Open(path)
if err != nil {
return nil, xerrors.Errorf("error opening coldset: %w", err)
}
buf := bufio.NewReader(file)

return &ColdSetReader{
file: file,
buf: buf,
}, nil
}

func (s *ColdSetWriter) Write(c cid.Cid) error {
return writeRawCid(s.buf, c, false)
}

func (s *ColdSetWriter) Close() error {
if s.file == nil {
return nil
}

err1 := s.buf.Flush()
err2 := s.file.Close()
s.buf = nil
s.file = nil

if err1 != nil {
return err1
}
return err2
}

func (s *ColdSetReader) ForEach(f func(cid.Cid) error) error {
hbuf := make([]byte, 256)
for {
next, err := readRawCid(s.buf, hbuf)
if err != nil {
if err == io.EOF {
return nil
}

return xerrors.Errorf("error reading coldset: %w", err)
}

if err := f(next); err != nil {
return err
}
}
}

func (s *ColdSetReader) Reset() error {
_, err := s.file.Seek(0, io.SeekStart)
return err
}

func (s *ColdSetReader) Close() error {
if s.file == nil {
return nil
}

err := s.file.Close()
s.file = nil
s.buf = nil

return err
}
Loading

0 comments on commit 44fd0e3

Please sign in to comment.