Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to native badger blockstore; leverage zero-copy View() to deserialize in-place #4681

Merged
merged 47 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ce27b13
add a native badger blockstore with View() method.
raulk Nov 1, 2020
d2e2322
make the lotus node use the new native badger blockstore.
raulk Nov 1, 2020
9437136
fixup.
raulk Nov 1, 2020
099c4b5
migrate repo.Datastore(/chain) to repo.Blockstore().
raulk Nov 1, 2020
95f512a
update/add dependencies.
raulk Nov 1, 2020
a16d7f2
Merge branch 'master' into badger-viewable
raulk Nov 1, 2020
d8d8537
fix lotus-shed datastore commands.
raulk Nov 1, 2020
5a98660
make repo.Blockstore() idempotent; wrap in IDStore.
raulk Nov 1, 2020
72e573d
fix lint.
raulk Nov 1, 2020
0b8a21e
badger: restore prev. max table size (64MiB; default).
raulk Nov 1, 2020
634467d
go mod tidy.
raulk Nov 1, 2020
c2355b1
write heap and allocs profile in lotus-bench.
raulk Nov 1, 2020
aece624
lotus-bench: add support for native badger blockstore.
raulk Nov 1, 2020
35ccd73
lotus-bench: improve for observability.
raulk Nov 1, 2020
44e34d9
lotus-bench: native badger SyncWrites=false.
raulk Nov 1, 2020
f4e13ff
actually register prometheus gauge.
raulk Nov 1, 2020
7facdf6
support legacy keying: base32 multihashes *sigh*.
raulk Nov 2, 2020
d1ebf3c
introduce non-pooled StorageKey method.
raulk Nov 2, 2020
842c8ca
improve lotus-bench; support running validation on a store without a …
raulk Nov 3, 2020
581ac5b
lotus-bench: add ability to specify tipsets.
raulk Nov 3, 2020
1841812
lotus-bench: make start tipset walkback start from end tipset.
raulk Nov 3, 2020
8091113
Merge branch 'master' into badger-viewable
raulk Nov 3, 2020
5487356
fix lint errors.
raulk Nov 3, 2020
87ce2f7
lotus-bench: write separate profiles for import and validation.
raulk Nov 3, 2020
fd1439f
fix lint.
raulk Nov 3, 2020
370ef93
chainstore: new ForceHeadSilent to support benchmarks.
raulk Nov 4, 2020
a411342
export more metrics.
raulk Nov 4, 2020
b1aa437
remove file that sneaked in.
raulk Nov 5, 2020
8befc0c
export metrics from bench.
raulk Nov 5, 2020
de4a072
rename files.
raulk Nov 6, 2020
85e37e4
make the value log loading mode mmap, to leverage zero-copy access.
raulk Nov 6, 2020
7c442e1
godocs.
raulk Nov 6, 2020
0b2a02c
remove redundant import.
raulk Nov 6, 2020
3577300
Merge branch 'master' into badger-viewable
raulk Nov 6, 2020
54bf7c9
add the viewable trait to our gazillion blockstores.
raulk Nov 10, 2020
577476b
import Viewable non-terminal blockstores.
raulk Nov 10, 2020
6d78de9
remove unused and misleading cachebs store.
raulk Nov 10, 2020
af7236f
migrate direct usages blockstore.{Get=>View}.
raulk Nov 10, 2020
360194b
fix comment.
raulk Nov 10, 2020
f8c3756
(to be reverted) add debug.PrintStack.
raulk Nov 10, 2020
38c404e
vm: construct CBOR store properly.
raulk Nov 10, 2020
339391e
Revert "(to be reverted) add debug.PrintStack."
raulk Nov 10, 2020
379dd02
upgrade dependencies.
raulk Nov 10, 2020
d79ff24
remove benchmark.
raulk Nov 10, 2020
659ceaa
fix comment.
raulk Nov 10, 2020
8f5847b
fix condition.
raulk Nov 10, 2020
bc6965c
skip callers in badger log for useful line info.
raulk Nov 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"sync/atomic"
"time"
Expand Down Expand Up @@ -138,12 +139,20 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
}

bds, err := lr.Datastore("/chain")
bs, err := lr.Blockstore(repo.BlockstoreChain)
if err != nil {
return nil, xerrors.Errorf("failed to get blocks datastore: %w", err)
return nil, err
}

bs := mybs{blockstore.NewBlockstore(bds)}
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()

bs = mybs{bs}

ks, err := lr.KeyStore()
if err != nil {
Expand Down
89 changes: 67 additions & 22 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ import (
"github.com/ipfs/go-datastore/query"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
car "github.com/ipld/go-car"
"github.com/ipld/go-car"
carutil "github.com/ipld/go-car/util"
cbg "github.com/whyrusleeping/cbor-gen"
pubsub "github.com/whyrusleeping/pubsub"
"github.com/whyrusleeping/pubsub"
"golang.org/x/xerrors"
)

Expand Down Expand Up @@ -108,6 +108,8 @@ type ChainStore struct {
localbs bstore.Blockstore
ds dstore.Batching

localviewer bstore.Viewer

heaviestLk sync.Mutex
heaviest *types.TipSet

Expand Down Expand Up @@ -150,6 +152,10 @@ func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Ba
journal: j,
}

if v, ok := localbs.(bstore.Viewer); ok {
cs.localviewer = v
}

cs.evtTypes = [1]journal.EventType{
evtTypeHeadChange: j.RegisterEventType("sync", "head_change"),
}
Expand Down Expand Up @@ -365,6 +371,26 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS
return nil
}

// ForceHeadSilent forces a chain head tipset without triggering a reorg
// operation.
//
// CAUTION: Use it only for testing, such as to teleport the chain to a
// particular tipset to carry out a benchmark, verification, etc. on a chain
// segment.
func (cs *ChainStore) ForceHeadSilent(_ context.Context, ts *types.TipSet) error {
log.Warnf("(!!!) forcing a new head silently; only use this only for testing; new head: %s", ts)

cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
cs.heaviest = ts

err := cs.writeHead(ts)
if err != nil {
err = xerrors.Errorf("failed to write chain head: %s", err)
}
return err
}

type reorg struct {
old *types.TipSet
new *types.TipSet
Expand Down Expand Up @@ -525,12 +551,20 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
// GetBlock fetches a BlockHeader with the supplied CID. It returns
// blockstore.ErrNotFound if the block was not found in the BlockStore.
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
sb, err := cs.localbs.Get(c)
if err != nil {
return nil, err
if cs.localviewer == nil {
sb, err := cs.localbs.Get(c)
if err != nil {
return nil, err
}
return types.DecodeBlock(sb.RawData())
}

return types.DecodeBlock(sb.RawData())
var blk *types.BlockHeader
err := cs.localviewer.View(c, func(b []byte) (err error) {
blk, err = types.DecodeBlock(b)
return err
})
return blk, err
}

func (cs *ChainStore) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
Expand Down Expand Up @@ -775,12 +809,7 @@ func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) {
return nil, err
}

genb, err := cs.bs.Get(c)
if err != nil {
return nil, err
}

return types.DecodeBlock(genb.RawData())
return cs.GetBlock(c)
}

func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
Expand All @@ -796,23 +825,39 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
}

func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
if cs.localviewer == nil {
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
}
return types.DecodeMessage(sb.RawData())
}

return types.DecodeMessage(sb.RawData())
var msg *types.Message
err := cs.localviewer.View(c, func(b []byte) (err error) {
msg, err = types.DecodeMessage(b)
return err
})
return msg, err
}

func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
if cs.localviewer == nil {
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
}
return types.DecodeSignedMessage(sb.RawData())
}

return types.DecodeSignedMessage(sb.RawData())
var msg *types.SignedMessage
err := cs.localviewer.View(c, func(b []byte) (err error) {
msg, err = types.DecodeSignedMessage(b)
return err
})
return msg, err
}

func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) {
Expand Down
13 changes: 10 additions & 3 deletions chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store_test
import (
"bytes"
"context"
"io"
"testing"

datastore "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -51,18 +52,24 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err)
}

bds, err := lr.Datastore("/chain")
bs, err := lr.Blockstore(repo.BlockstoreChain)
if err != nil {
b.Fatal(err)
}

defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
b.Logf("WARN: failed to close blockstore: %s", err)
}
}
}()

mds, err := lr.Datastore("/metadata")
if err != nil {
b.Fatal(err)
}

bs := blockstore.NewBlockstore(bds)

cs := store.NewChainStore(bs, bs, mds, nil, nil)

b.ResetTimer()
Expand Down
27 changes: 23 additions & 4 deletions chain/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,32 @@ func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Ad
}

var _ cbor.IpldBlockstore = (*gasChargingBlocks)(nil)
var _ blockstore.Viewer = (*gasChargingBlocks)(nil)

type gasChargingBlocks struct {
chargeGas func(GasCharge)
pricelist Pricelist
under cbor.IpldBlockstore
}

func (bs *gasChargingBlocks) View(c cid.Cid, cb func([]byte) error) error {
if v, ok := bs.under.(blockstore.Viewer); ok {
bs.chargeGas(bs.pricelist.OnIpldGet())
return v.View(c, func(b []byte) error {
// we have successfully retrieved the value; charge for it, even if the user-provided function fails.
bs.chargeGas(newGasCharge("OnIpldViewEnd", 0, 0).WithExtra(len(b)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two are only signaling charges, they have no consensus importantce so it is fine if they are inside the View.

bs.chargeGas(gasOnActorExec)
return cb(b)
})
}
// the underlying blockstore doesn't implement the viewer interface, fall back to normal Get behaviour.
blk, err := bs.Get(c)
if err == nil && blk != nil {
return cb(blk.RawData())
}
return err
}

func (bs *gasChargingBlocks) Get(c cid.Cid) (block.Block, error) {
bs.chargeGas(bs.pricelist.OnIpldGet())
blk, err := bs.under.Get(c)
Expand Down Expand Up @@ -130,10 +149,10 @@ func (vm *VM) makeRuntime(ctx context.Context, msg *types.Message, parent *Runti
rt.Abortf(exitcode.SysErrForbidden, "message execution exceeds call depth")
}

rt.cst = &cbor.BasicIpldStore{
Blocks: &gasChargingBlocks{rt.chargeGasFunc(2), rt.pricelist, vm.cst.Blocks},
Atlas: vm.cst.Atlas,
}
cbb := &gasChargingBlocks{rt.chargeGasFunc(2), rt.pricelist, vm.cst.Blocks}
cst := cbor.NewCborStore(cbb)
cst.Atlas = vm.cst.Atlas // associate the atlas.
rt.cst = cst

vmm := *msg
resF, ok := rt.ResolveAddress(msg.From)
Expand Down
Loading