diff --git a/itests/kit/blockminer.go b/itests/kit/blockminer.go index 91ddc2e26e0..a232d82e026 100644 --- a/itests/kit/blockminer.go +++ b/itests/kit/blockminer.go @@ -3,6 +3,7 @@ package kit import ( "bytes" "context" + "fmt" "sync" "sync/atomic" "testing" @@ -10,6 +11,7 @@ import ( "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/lotus/api" aminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" @@ -63,11 +65,10 @@ func (p *partitionTracker) done(t *testing.T) bool { return uint64(len(p.partitions)) == p.count(t) } -func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) { +func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) { defer func() { ret = p.done(t) }() - msg := smsg.Message if !(msg.To == bm.miner.ActorAddr) { return } @@ -82,11 +83,69 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type return } +func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *dline.Info) { + + tracker := newPartitionTracker(ctx, dlinfo.Index, bm) + if !tracker.done(bm.t) { // need to wait for post + bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) + poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) //subscribe before checking pending so we don't miss any events + require.NoError(bm.t, err) + + // First check pending messages we'll mine this epoch + msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) + require.NoError(bm.t, err) + for _, msg := range msgs { + if tracker.recordIfPost(bm.t, bm, &msg.Message) { + fmt.Printf("found post in mempool pending\n") + } + } + + // Account for included but not yet executed messages + for _, bc := range ts.Cids() { + msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc) + require.NoError(bm.t, err) + for _, msg := range msgs.BlsMessages { + if tracker.recordIfPost(bm.t, bm, msg) { + fmt.Printf("found post in message of prev tipset\n") + } + + } + for _, msg := range msgs.SecpkMessages { + if tracker.recordIfPost(bm.t, bm, &msg.Message) { + fmt.Printf("found post in message of prev tipset\n") + } + } + } + + // post not yet in mpool, wait for it + if !tracker.done(bm.t) { + bm.t.Logf("post missing from mpool, block mining suspended until it arrives") + POOL: + for { + bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) + select { + case <-ctx.Done(): + return + case evt := <-poolEvts: + bm.t.Logf("pool event: %d", evt.Type) + if evt.Type == api.MpoolAdd { + bm.t.Logf("incoming message %v", evt.Message) + if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) { + fmt.Printf("found post in mempool evt\n") + break POOL + } + } + } + } + bm.t.Logf("done waiting on mpool") + } + } +} + // Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool // and everything shuts down if a post fails. It also enforces that every block mined succeeds func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) { - - time.Sleep(3 * time.Second) + time.Sleep(time.Second) // wrap context in a cancellable context. ctx, bm.cancel = context.WithCancel(ctx) @@ -94,8 +153,6 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur go func() { defer bm.wg.Done() - activeDeadlines := make(map[int]struct{}) - _ = activeDeadlines ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) wait := make(chan bool) @@ -103,7 +160,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur require.NoError(bm.t, err) // read current out curr := <-chg - require.Equal(bm.t, ts.Height(), curr[0].Val.Height()) + require.Equal(bm.t, ts.Height(), curr[0].Val.Height(), "failed sanity check: are multiple miners mining with must post?") for { select { case <-time.After(blocktime): @@ -111,52 +168,15 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur return } nulls := atomic.SwapInt64(&bm.nextNulls, 0) - require.Equal(bm.t, int64(0), nulls, "Injecting > 0 null blocks while `MustPost` mining is currently unsupported") // Wake up and figure out if we are at the end of an active deadline ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) - tsk := ts.Key() - dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk) + dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key()) require.NoError(bm.t, err) - if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted - - tracker := newPartitionTracker(ctx, dlinfo.Index, bm) - if !tracker.done(bm.t) { // need to wait for post - bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) - poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) - require.NoError(bm.t, err) - - // First check pending messages we'll mine this epoch - msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) - require.NoError(bm.t, err) - for _, msg := range msgs { - tracker.recordIfPost(bm.t, bm, msg) - } - - // post not yet in mpool, wait for it - if !tracker.done(bm.t) { - bm.t.Logf("post missing from mpool, block mining suspended until it arrives") - POOL: - for { - bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) - select { - case <-ctx.Done(): - return - case evt := <-poolEvts: - bm.t.Logf("pool event: %d", evt.Type) - if evt.Type == api.MpoolAdd { - bm.t.Logf("incoming message %v", evt.Message) - if tracker.recordIfPost(bm.t, bm, evt.Message) { - break POOL - } - } - } - } - bm.t.Logf("done waiting on mpool") - } - } + if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post + bm.forcePoSt(ctx, ts, dlinfo) } var target abi.ChainEpoch @@ -173,6 +193,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur Done: reportSuccessFn, }) success = <-wait + if !success { + // if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post + if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() { + bm.forcePoSt(ctx, ts, dlinfo) + } + } } // Wait until it shows up on the given full nodes ChainHead diff --git a/itests/paych_api_test.go b/itests/paych_api_test.go index a07c499f9cf..7e135a9bea7 100644 --- a/itests/paych_api_test.go +++ b/itests/paych_api_test.go @@ -51,7 +51,7 @@ func TestPaymentChannelsAPI(t *testing.T) { Miner(&miner, &paymentCreator, kit.WithAllSubsystems()). Start(). InterconnectAll() - bms := ens.BeginMining(blockTime) + bms := ens.BeginMiningMustPost(blockTime) bm := bms[0] // send some funds to register the receiver