Skip to content

Commit

Permalink
Merge branch 'master' into feat/fvm
Browse files Browse the repository at this point in the history
  • Loading branch information
arajasek committed Feb 13, 2022
2 parents 7ef1513 + 2e22781 commit ee69899
Show file tree
Hide file tree
Showing 74 changed files with 2,722 additions and 442 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ commands:
- restore_cache:
name: Restore parameters cache
keys:
- 'v25-2k-lotus-params'
- 'v26-2k-lotus-params'
paths:
- /var/tmp/filecoin-proof-parameters/
- run: ./lotus fetch-params 2048
- save_cache:
name: Save parameters cache
key: 'v25-2k-lotus-params'
key: 'v26-2k-lotus-params'
paths:
- /var/tmp/filecoin-proof-parameters/
install_ipfs:
Expand Down
4 changes: 2 additions & 2 deletions .circleci/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ commands:
- restore_cache:
name: Restore parameters cache
keys:
- 'v25-2k-lotus-params'
- 'v26-2k-lotus-params'
paths:
- /var/tmp/filecoin-proof-parameters/
- run: ./lotus fetch-params 2048
- save_cache:
name: Save parameters cache
key: 'v25-2k-lotus-params'
key: 'v26-2k-lotus-params'
paths:
- /var/tmp/filecoin-proof-parameters/
install_ipfs:
Expand Down
3 changes: 3 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type StorageMiner interface {
// SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message
SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error //perm:admin
// SectorAbortUpgrade can be called on sectors that are in the process of being upgraded to abort it
SectorAbortUpgrade(context.Context, abi.SectorNumber) error //perm:admin

// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error //perm:admin retry:true
Expand All @@ -130,6 +132,7 @@ type StorageMiner interface {
ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, vanillaProofs storage.ReplicaVanillaProofs, err *storiface.CallError) error //perm:admin retry:true
ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storage.ReplicaUpdateProof, err *storiface.CallError) error //perm:admin retry:true
ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
Expand Down
1 change: 1 addition & 0 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Worker interface {
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
Expand Down
39 changes: 39 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions blockstore/splitstore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration:
blockstore and discards writes; this is necessary to support syncing from a snapshot.
- `MarkSetType` -- specifies the type of markset to use during compaction.
The markset is the data structure used by compaction/gc to track live objects.
The default value is `"map"`, which will use an in-memory map; if you are limited
in memory (or indeed see compaction run out of memory), you can also specify
`"badger"` which will use an disk backed markset, using badger. This will use
much less memory, but will also make compaction slower.
The default value is "badger", which will use a disk backed markset using badger.
If you have a lot of memory (48G or more) you can also use "map", which will use
an in memory markset, speeding up compaction at the cost of higher memory usage.
Note: If you are using a VPS with a network volume, you need to provision at least
3000 IOPs with the badger markset.
- `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4
finalities maintained by default, to maintain messages and message receipts in the
hotstore. This is useful for assistive nodes that want to support syncing for other
Expand Down Expand Up @@ -105,6 +106,12 @@ Compaction works transactionally with the following algorithm:
- We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
- We then end the transaction and compact/gc the hotstore.

As of [#8008](https://github.com/filecoin-project/lotus/pull/8008) the compaction algorithm has been
modified to eliminate sorting and maintain the cold object set on disk. This drastically reduces
memory usage; in fact, when using badger as the markset compaction uses very little memory, and
it should be now possible to run splitstore with 32GB of RAM or less without danger of running out of
memory during compaction.

## Garbage Collection

TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)
Expand Down
118 changes: 118 additions & 0 deletions blockstore/splitstore/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package splitstore

import (
"bufio"
"io"
"os"

"golang.org/x/xerrors"

cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)

type Checkpoint struct {
file *os.File
buf *bufio.Writer
}

func NewCheckpoint(path string) (*Checkpoint, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating checkpoint: %w", err)
}
buf := bufio.NewWriter(file)

return &Checkpoint{
file: file,
buf: buf,
}, nil
}

func OpenCheckpoint(path string) (*Checkpoint, cid.Cid, error) {
filein, err := os.Open(path)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for reading: %w", err)
}
defer filein.Close() //nolint:errcheck

bufin := bufio.NewReader(filein)
start, err := readRawCid(bufin, nil)
if err != nil && err != io.EOF {
return nil, cid.Undef, xerrors.Errorf("error reading cid from checkpoint: %w", err)
}

fileout, err := os.OpenFile(path, os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for writing: %w", err)
}
bufout := bufio.NewWriter(fileout)

return &Checkpoint{
file: fileout,
buf: bufout,
}, start, nil
}

func (cp *Checkpoint) Set(c cid.Cid) error {
if _, err := cp.file.Seek(0, io.SeekStart); err != nil {
return xerrors.Errorf("error seeking beginning of checkpoint: %w", err)
}

if err := writeRawCid(cp.buf, c, true); err != nil {
return xerrors.Errorf("error writing cid to checkpoint: %w", err)
}

return nil
}

func (cp *Checkpoint) Close() error {
if cp.file == nil {
return nil
}

err := cp.file.Close()
cp.file = nil
cp.buf = nil

return err
}

func readRawCid(buf *bufio.Reader, hbuf []byte) (cid.Cid, error) {
sz, err := buf.ReadByte()
if err != nil {
return cid.Undef, err // don't wrap EOF as it is not an error here
}

if hbuf == nil {
hbuf = make([]byte, int(sz))
} else {
hbuf = hbuf[:int(sz)]
}

if _, err := io.ReadFull(buf, hbuf); err != nil {
return cid.Undef, xerrors.Errorf("error reading hash: %w", err) // wrap EOF, it's corrupt
}

hash, err := mh.Cast(hbuf)
if err != nil {
return cid.Undef, xerrors.Errorf("error casting multihash: %w", err)
}

return cid.NewCidV1(cid.Raw, hash), nil
}

func writeRawCid(buf *bufio.Writer, c cid.Cid, flush bool) error {
hash := c.Hash()
if err := buf.WriteByte(byte(len(hash))); err != nil {
return err
}
if _, err := buf.Write(hash); err != nil {
return err
}
if flush {
return buf.Flush()
}

return nil
}
Loading

0 comments on commit ee69899

Please sign in to comment.