Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:paychan:deflake integration test #8088

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 73 additions & 47 deletions itests/kit/blockminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package kit
import (
"bytes"
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/api"
aminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
Expand Down Expand Up @@ -63,11 +65,10 @@ func (p *partitionTracker) done(t *testing.T) bool {
return uint64(len(p.partitions)) == p.count(t)
}

func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) {
func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) {
defer func() {
ret = p.done(t)
}()
msg := smsg.Message
if !(msg.To == bm.miner.ActorAddr) {
return
}
Expand All @@ -82,81 +83,100 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type
return
}

func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *dline.Info) {

tracker := newPartitionTracker(ctx, dlinfo.Index, bm)
if !tracker.done(bm.t) { // need to wait for post
bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t))
poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) //subscribe before checking pending so we don't miss any events
require.NoError(bm.t, err)

// First check pending messages we'll mine this epoch
msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK)
require.NoError(bm.t, err)
for _, msg := range msgs {
if tracker.recordIfPost(bm.t, bm, &msg.Message) {
fmt.Printf("found post in mempool pending\n")
}
}

// Account for included but not yet executed messages
for _, bc := range ts.Cids() {
msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc)
require.NoError(bm.t, err)
for _, msg := range msgs.BlsMessages {
if tracker.recordIfPost(bm.t, bm, msg) {
fmt.Printf("found post in message of prev tipset\n")
}

}
for _, msg := range msgs.SecpkMessages {
if tracker.recordIfPost(bm.t, bm, &msg.Message) {
fmt.Printf("found post in message of prev tipset\n")
}
}
}

// post not yet in mpool, wait for it
if !tracker.done(bm.t) {
bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
POOL:
for {
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
select {
case <-ctx.Done():
return
case evt := <-poolEvts:
bm.t.Logf("pool event: %d", evt.Type)
if evt.Type == api.MpoolAdd {
bm.t.Logf("incoming message %v", evt.Message)
if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) {
fmt.Printf("found post in mempool evt\n")
break POOL
}
}
}
}
bm.t.Logf("done waiting on mpool")
}
}
}

// Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool
// and everything shuts down if a post fails. It also enforces that every block mined succeeds
func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) {

time.Sleep(3 * time.Second)
time.Sleep(time.Second)

// wrap context in a cancellable context.
ctx, bm.cancel = context.WithCancel(ctx)
bm.wg.Add(1)
go func() {
defer bm.wg.Done()

activeDeadlines := make(map[int]struct{})
_ = activeDeadlines
ts, err := bm.miner.FullNode.ChainHead(ctx)
require.NoError(bm.t, err)
wait := make(chan bool)
chg, err := bm.miner.FullNode.ChainNotify(ctx)
require.NoError(bm.t, err)
// read current out
curr := <-chg
require.Equal(bm.t, ts.Height(), curr[0].Val.Height())
require.Equal(bm.t, ts.Height(), curr[0].Val.Height(), "failed sanity check: are multiple miners mining with must post?")
for {
select {
case <-time.After(blocktime):
case <-ctx.Done():
return
}
nulls := atomic.SwapInt64(&bm.nextNulls, 0)
require.Equal(bm.t, int64(0), nulls, "Injecting > 0 null blocks while `MustPost` mining is currently unsupported")

// Wake up and figure out if we are at the end of an active deadline
ts, err := bm.miner.FullNode.ChainHead(ctx)
require.NoError(bm.t, err)
tsk := ts.Key()

dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk)
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key())
require.NoError(bm.t, err)
if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted

tracker := newPartitionTracker(ctx, dlinfo.Index, bm)
if !tracker.done(bm.t) { // need to wait for post
bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t))
poolEvts, err := bm.miner.FullNode.MpoolSub(ctx)
require.NoError(bm.t, err)

// First check pending messages we'll mine this epoch
msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK)
require.NoError(bm.t, err)
for _, msg := range msgs {
tracker.recordIfPost(bm.t, bm, msg)
}

// post not yet in mpool, wait for it
if !tracker.done(bm.t) {
bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
POOL:
for {
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
select {
case <-ctx.Done():
return
case evt := <-poolEvts:
bm.t.Logf("pool event: %d", evt.Type)
if evt.Type == api.MpoolAdd {
bm.t.Logf("incoming message %v", evt.Message)
if tracker.recordIfPost(bm.t, bm, evt.Message) {
break POOL
}
}
}
}
bm.t.Logf("done waiting on mpool")
}
}
if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post
bm.forcePoSt(ctx, ts, dlinfo)
}

var target abi.ChainEpoch
Expand All @@ -173,6 +193,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
Done: reportSuccessFn,
})
success = <-wait
if !success {
// if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post
if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() {
bm.forcePoSt(ctx, ts, dlinfo)
}
}
}

// Wait until it shows up on the given full nodes ChainHead
Expand Down
2 changes: 1 addition & 1 deletion itests/paych_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestPaymentChannelsAPI(t *testing.T) {
Miner(&miner, &paymentCreator, kit.WithAllSubsystems()).
Start().
InterconnectAll()
bms := ens.BeginMining(blockTime)
bms := ens.BeginMiningMustPost(blockTime)
bm := bms[0]

// send some funds to register the receiver
Expand Down