Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SplitStore: supress compaction near upgrades #7734

Merged
merged 4 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"github.com/filecoin-project/go-state-types/abi"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"

Expand Down Expand Up @@ -47,6 +49,9 @@ var (
enableDebugLog = false
// set this to true if you want to track origin stack traces in the write log
enableDebugLogWriteTraces = false

// upgradeBoundary is the boundary before and after an upgrade where we suppress compaction
upgradeBoundary = build.Finality
)

func init() {
Expand Down Expand Up @@ -98,6 +103,12 @@ type ChainAccessor interface {
SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error)
}

// upgradeRange is a precomputed epoch range during which we shouldn't compact so as to not
// interfere with an upgrade
type upgradeRange struct {
start, end abi.ChainEpoch
}

// hotstore is the interface that must be satisfied by the hot blockstore; it is an extension
// of the Blockstore interface with the traits we need for compaction.
type hotstore interface {
Expand Down Expand Up @@ -125,6 +136,8 @@ type SplitStore struct {
cold bstore.Blockstore
hot hotstore

upgrades []upgradeRange

markSetEnv MarkSetEnv
markSetSize int64

Expand Down Expand Up @@ -463,10 +476,27 @@ func (s *SplitStore) isWarm() bool {
}

// State tracking
func (s *SplitStore) Start(chain ChainAccessor) error {
func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error {
s.chain = chain
curTs := chain.GetHeaviestTipSet()

// precompute the upgrade boundaries
s.upgrades = make([]upgradeRange, 0, len(us))
for _, upgrade := range us {
boundary := upgrade.Height
for _, pre := range upgrade.PreMigrations {
preMigrationBoundary := upgrade.Height - pre.StartWithin
if preMigrationBoundary < boundary {
boundary = preMigrationBoundary
}
}

upgradeStart := boundary - upgradeBoundary
upgradeEnd := upgrade.Height + upgradeBoundary

s.upgrades = append(s.upgrades, upgradeRange{start: upgradeStart, end: upgradeEnd})
}

// should we warmup
warmup := false

Expand Down
16 changes: 16 additions & 0 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
return nil
}

if s.isNearUpgrade(epoch) {
// we are near an upgrade epoch, suppress compaction
atomic.StoreInt32(&s.compacting, 0)
return nil
}

if epoch-s.baseEpoch > CompactionThreshold {
// it's time to compact -- prepare the transaction and go!
s.beginTxnProtect()
Expand All @@ -121,6 +127,16 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
return nil
}

func (s *SplitStore) isNearUpgrade(epoch abi.ChainEpoch) bool {
for _, upgrade := range s.upgrades {
if epoch >= upgrade.start && epoch <= upgrade.end {
return true
}
}

return false
}

// transactionally protect incoming tipsets
func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
s.txnLk.RLock()
Expand Down
137 changes: 136 additions & 1 deletion blockstore/splitstore/splitstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"

Expand Down Expand Up @@ -90,7 +91,7 @@ func testSplitStore(t *testing.T, cfg *Config) {
return protect(protected.Cid())
})

err = ss.Start(chain)
err = ss.Start(chain, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -220,6 +221,140 @@ func TestSplitStoreCompactionWithBadger(t *testing.T) {
testSplitStore(t, &Config{MarkSetType: "badger"})
}

func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
chain := &mockChain{t: t}

// the myriads of stores
ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := newMockStore()
cold := newMockStore()

// this is necessary to avoid the garbage mock puts in the blocks
garbage := blocks.NewBlock([]byte{1, 2, 3})
err := cold.Put(garbage)
if err != nil {
t.Fatal(err)
}

// genesis
genBlock := mock.MkBlock(nil, 0, 0)
genBlock.Messages = garbage.Cid()
genBlock.ParentMessageReceipts = garbage.Cid()
genBlock.ParentStateRoot = garbage.Cid()
genBlock.Timestamp = uint64(time.Now().Unix())

genTs := mock.TipSet(genBlock)
chain.push(genTs)

// put the genesis block to cold store
blk, err := genBlock.ToStorageBlock()
if err != nil {
t.Fatal(err)
}

err = cold.Put(blk)
if err != nil {
t.Fatal(err)
}

// open the splitstore
ss, err := Open("", ds, hot, cold, &Config{MarkSetType: "map"})
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint

// create an upgrade schedule that will suppress compaction during the test
upgradeBoundary = 0
upgrade := stmgr.Upgrade{
Height: 10,
PreMigrations: []stmgr.PreMigration{{StartWithin: 10}},
}

err = ss.Start(chain, []stmgr.Upgrade{upgrade})
if err != nil {
t.Fatal(err)
}

mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet {
blk := mock.MkBlock(curTs, uint64(i), uint64(i))

blk.Messages = garbage.Cid()
blk.ParentMessageReceipts = garbage.Cid()
blk.ParentStateRoot = stateRoot.Cid()
blk.Timestamp = uint64(time.Now().Unix())

sblk, err := blk.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
err = ss.Put(stateRoot)
if err != nil {
t.Fatal(err)
}
err = ss.Put(sblk)
if err != nil {
t.Fatal(err)
}
ts := mock.TipSet(blk)
chain.push(ts)

return ts
}

waitForCompaction := func() {
for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond)
}
}

curTs := genTs
for i := 1; i < 10; i++ {
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
curTs = mkBlock(curTs, i, stateRoot)
waitForCompaction()
}

countBlocks := func(bs blockstore.Blockstore) int {
count := 0
_ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error {
count++
return nil
})
return count
}

// we should not have compacted due to suppression and everything should still be hot
hotCnt := countBlocks(hot)
coldCnt := countBlocks(cold)

if hotCnt != 20 {
t.Errorf("expected %d blocks, but got %d", 20, hotCnt)
}

if coldCnt != 2 {
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
}

// put some more blocks, now we should compact
for i := 10; i < 20; i++ {
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
curTs = mkBlock(curTs, i, stateRoot)
waitForCompaction()
}

hotCnt = countBlocks(hot)
coldCnt = countBlocks(cold)

if hotCnt != 24 {
t.Errorf("expected %d blocks, but got %d", 24, hotCnt)
}

if coldCnt != 18 {
t.Errorf("expected %d blocks, but got %d", 18, coldCnt)
}
}

type mockChain struct {
t testing.TB

Expand Down
3 changes: 2 additions & 1 deletion node/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func ChainStore(lc fx.Lifecycle,
ds dtypes.MetadataDS,
basebs dtypes.BaseBlockstore,
weight store.WeightFunc,
us stmgr.UpgradeSchedule,
j journal.Journal) *store.ChainStore {

chain := store.NewChainStore(cbs, sbs, ds, weight, j)
Expand All @@ -89,7 +90,7 @@ func ChainStore(lc fx.Lifecycle,
var startHook func(context.Context) error
if ss, ok := basebs.(*splitstore.SplitStore); ok {
startHook = func(_ context.Context) error {
err := ss.Start(chain)
err := ss.Start(chain, us)
if err != nil {
err = xerrors.Errorf("error starting splitstore: %w", err)
}
Expand Down