From 78b99291e5d6fe9195caac4f7e2e768c5c08d1f2 Mon Sep 17 00:00:00 2001 From: dirkmc Date: Thu, 26 Aug 2021 17:36:06 +0200 Subject: [PATCH] fix events API timeout handling for nil blocks (#7184) --- .circleci/config.yml | 5 + chain/events/events_called.go | 14 +-- chain/events/events_test.go | 153 +++++++++++++----------- itests/deals_expiry_test.go | 140 ++++++++++++++++++++++ itests/kit/deals.go | 10 +- markets/storageadapter/ondealexpired.go | 4 +- node/modules/storageminer.go | 2 +- 7 files changed, 242 insertions(+), 86 deletions(-) create mode 100644 itests/deals_expiry_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 4d24e25b1f8..48495628964 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -815,6 +815,11 @@ workflows: suite: itest-deals_concurrent target: "./itests/deals_concurrent_test.go" + - test: + name: test-itest-deals_expiry + suite: itest-deals_expiry + target: "./itests/deals_expiry_test.go" + - test: name: test-itest-deals_offline suite: itest-deals_offline diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 1f0b80169e1..e783f78007d 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -157,7 +157,7 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { // Apply any queued events and timeouts that were targeted at the // current chain height e.applyWithConfidence(ts, at) - e.applyTimeouts(ts) + e.applyTimeouts(at) } // Update the latest known tipset @@ -273,8 +273,8 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) } // Apply any timeouts that expire at this height -func (e *hcEvents) applyTimeouts(ts *types.TipSet) { - triggers, ok := e.timeouts[ts.Height()] +func (e *hcEvents) applyTimeouts(at abi.ChainEpoch) { + triggers, ok := e.timeouts[at] if !ok { return // nothing to do } @@ -288,14 +288,14 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) { continue } - timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence)) + timeoutTs, err := e.tsc.get(at - abi.ChainEpoch(trigger.confidence)) if err != nil { - log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height()) + log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at) } - more, err := trigger.handle(nil, nil, timeoutTs, ts.Height()) + more, err := trigger.handle(nil, nil, timeoutTs, at) if err != nil { - log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err) + log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err) continue // don't revert failed calls } diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 04f938055f1..6f73dfe5863 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1293,81 +1293,88 @@ func TestStateChangedRevert(t *testing.T) { } func TestStateChangedTimeout(t *testing.T) { - fcs := &fakeCS{ - t: t, - h: 1, - - msgs: map[cid.Cid]fakeMsg{}, - blkMsgs: map[cid.Cid]cid.Cid{}, - tsc: newTSCache(2*build.ForkLengthThreshold, nil), - callNumber: map[string]int{}, - } - require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) - - events := NewEvents(context.Background(), fcs) - - called := false - - err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { - return false, true, nil - }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { - called = true - require.Nil(t, data) - require.Equal(t, abi.ChainEpoch(20), newTs.Height()) - require.Equal(t, abi.ChainEpoch(23), curH) - return false, nil - }, func(_ context.Context, ts *types.TipSet) error { - t.Fatal("revert on timeout") - return nil - }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { - return false, nil, nil - }) - - require.NoError(t, err) - - fcs.advance(0, 21, nil) - require.False(t, called) - - fcs.advance(0, 5, nil) - require.True(t, called) - called = false - - // with check func reporting done - - fcs = &fakeCS{ - t: t, - h: 1, + timeoutHeight := abi.ChainEpoch(20) + confidence := 3 - msgs: map[cid.Cid]fakeMsg{}, - blkMsgs: map[cid.Cid]cid.Cid{}, - callNumber: map[string]int{}, - tsc: newTSCache(2*build.ForkLengthThreshold, nil), + testCases := []struct { + name string + checkFn CheckFunc + nilBlocks []int + expectTimeout bool + }{{ + // Verify that the state changed timeout is called at the expected height + name: "state changed timeout", + checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, + expectTimeout: true, + }, { + // Verify that the state changed timeout is called even if the timeout + // falls on nil block + name: "state changed timeout falls on nil block", + checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, + nilBlocks: []int{20, 21, 22, 23}, + expectTimeout: true, + }, { + // Verify that the state changed timeout is not called if the check + // function reports that it's complete + name: "no timeout callback if check func reports done", + checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { + return true, true, nil + }, + expectTimeout: false, + }} + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + callNumber: map[string]int{}, + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + // Track whether the callback was called + called := false + + // Set up state change tracking that will timeout at the given height + err := events.StateChanged( + tc.checkFn, + func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + // Expect the callback to be called at the timeout height with nil data + called = true + require.Nil(t, data) + require.Equal(t, timeoutHeight, newTs.Height()) + require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + return false, nil, nil + }) + + require.NoError(t, err) + + // Advance to timeout height + fcs.advance(0, int(timeoutHeight)+1, nil) + require.False(t, called) + + // Advance past timeout height + fcs.advance(0, 5, nil, tc.nilBlocks...) + require.Equal(t, tc.expectTimeout, called) + called = false + }) } - require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) - - events = NewEvents(context.Background(), fcs) - - err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { - return true, true, nil - }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { - called = true - require.Nil(t, data) - require.Equal(t, abi.ChainEpoch(20), newTs.Height()) - require.Equal(t, abi.ChainEpoch(23), curH) - return false, nil - }, func(_ context.Context, ts *types.TipSet) error { - t.Fatal("revert on timeout") - return nil - }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { - return false, nil, nil - }) - require.NoError(t, err) - - fcs.advance(0, 21, nil) - require.False(t, called) - - fcs.advance(0, 5, nil) - require.False(t, called) } func TestCalledMultiplePerEpoch(t *testing.T) { diff --git a/itests/deals_expiry_test.go b/itests/deals_expiry_test.go new file mode 100644 index 00000000000..b8b3c4b5ab8 --- /dev/null +++ b/itests/deals_expiry_test.go @@ -0,0 +1,140 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-fil-markets/storagemarket" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + market3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/market" + market4 "github.com/filecoin-project/specs-actors/v4/actors/builtin/market" + market5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/market" + + "github.com/filecoin-project/lotus/itests/kit" +) + +// Test that the deal state eventually moves to "Expired" on both client and miner +func TestDealExpiry(t *testing.T) { + kit.QuietMiningLogs() + + resetMinDealDuration(t) + + ctx := context.Background() + + var ( + client kit.TestFullNode + miner1 kit.TestMiner + ) + + ens := kit.NewEnsemble(t, kit.MockProofs()) + ens.FullNode(&client) + ens.Miner(&miner1, &client, kit.WithAllSubsystems()) + bm := ens.Start().InterconnectAll().BeginMining(50 * time.Millisecond) + + dh := kit.NewDealHarness(t, &client, &miner1, &miner1) + + client.WaitTillChain(ctx, kit.HeightAtLeast(5)) + + // Make a deal with a short duration + dealProposalCid, _, _ := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{ + Rseed: 0, + FastRet: true, + // Needs to be far enough in the future to ensure the deal has been sealed + StartEpoch: 3000, + // Short deal duration + MinBlocksDuration: 50, + }) + + // Inject null blocks each time the chain advances by a block so as to + // get to deal expiration faster + go func() { + ch, _ := client.ChainNotify(ctx) + for range ch { + bm[0].InjectNulls(10) + } + }() + + clientExpired := false + minerExpired := false + for { + ts, err := client.ChainHead(ctx) + require.NoError(t, err) + + t.Logf("Chain height: %d", ts.Height()) + + // Get the miner deal from the proposal CID + minerDeal := getMinerDeal(ctx, t, miner1, *dealProposalCid) + + t.Logf("Miner deal:") + t.Logf(" %s -> %s", minerDeal.Proposal.Client, minerDeal.Proposal.Provider) + t.Logf(" StartEpoch: %d", minerDeal.Proposal.StartEpoch) + t.Logf(" EndEpoch: %d", minerDeal.Proposal.EndEpoch) + t.Logf(" State: %s", storagemarket.DealStates[minerDeal.State]) + //spew.Dump(d) + + // Get the client deal + clientDeals, err := client.ClientListDeals(ctx) + require.NoError(t, err) + + t.Logf("Client deal state: %s\n", storagemarket.DealStates[clientDeals[0].State]) + + // Expect the deal to eventually expire on the client and the miner + if clientDeals[0].State == storagemarket.StorageDealExpired { + t.Logf("Client deal expired") + clientExpired = true + } + if minerDeal.State == storagemarket.StorageDealExpired { + t.Logf("Miner deal expired") + minerExpired = true + } + if clientExpired && minerExpired { + t.Logf("PASS: Client and miner deal expired") + return + } + + if ts.Height() > 5000 { + t.Fatalf("Reached height %d without client and miner deals expiring", ts.Height()) + } + + time.Sleep(2 * time.Second) + } +} + +func getMinerDeal(ctx context.Context, t *testing.T, miner1 kit.TestMiner, dealProposalCid cid.Cid) storagemarket.MinerDeal { + minerDeals, err := miner1.MarketListIncompleteDeals(ctx) + require.NoError(t, err) + require.Greater(t, len(minerDeals), 0) + + for _, d := range minerDeals { + if d.ProposalCid == dealProposalCid { + return d + } + } + t.Fatalf("miner deal with proposal CID %s not found", dealProposalCid) + return storagemarket.MinerDeal{} +} + +// reset minimum deal duration to 0, so we can make very short-lived deals. +// NOTE: this will need updating with every new specs-actors version. +func resetMinDealDuration(t *testing.T) { + m2 := market2.DealMinDuration + m3 := market3.DealMinDuration + m4 := market4.DealMinDuration + m5 := market5.DealMinDuration + + market2.DealMinDuration = 0 + market3.DealMinDuration = 0 + market4.DealMinDuration = 0 + market5.DealMinDuration = 0 + + t.Cleanup(func() { + market2.DealMinDuration = m2 + market3.DealMinDuration = m3 + market4.DealMinDuration = m4 + market5.DealMinDuration = m5 + }) +} diff --git a/itests/kit/deals.go b/itests/kit/deals.go index 0832447f20b..0db48a26e05 100644 --- a/itests/kit/deals.go +++ b/itests/kit/deals.go @@ -33,9 +33,10 @@ type DealHarness struct { } type MakeFullDealParams struct { - Rseed int - FastRet bool - StartEpoch abi.ChainEpoch + Rseed int + FastRet bool + StartEpoch abi.ChainEpoch + MinBlocksDuration uint64 // SuspendUntilCryptoeconStable suspends deal-making, until cryptoecon // parameters are stabilised. This affects projected collateral, and tests @@ -92,6 +93,9 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, params MakeFullDealPa dp.Data.Root = res.Root dp.DealStartEpoch = params.StartEpoch dp.FastRetrieval = params.FastRet + if params.MinBlocksDuration > 0 { + dp.MinBlocksDuration = params.MinBlocksDuration + } deal = dh.StartDeal(ctx, dp) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this diff --git a/markets/storageadapter/ondealexpired.go b/markets/storageadapter/ondealexpired.go index e57b396343c..a6c6efd824f 100644 --- a/markets/storageadapter/ondealexpired.go +++ b/markets/storageadapter/ondealexpired.go @@ -101,7 +101,7 @@ func (mgr *DealExpiryManager) OnDealExpiredOrSlashed(ctx context.Context, publis // and the chain has advanced to the confidence height stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) { // Check if the deal has already expired - if ts2 == nil || res.MarketDeal.Proposal.EndEpoch <= ts2.Height() { + if res.MarketDeal.Proposal.EndEpoch <= h { onDealExpired(nil) return false, nil } @@ -143,7 +143,7 @@ func (mgr *DealExpiryManager) OnDealExpiredOrSlashed(ctx context.Context, publis match := mgr.dsMatcher.matcher(ctx, res.DealID) // Wait until after the end epoch for the deal and then timeout - timeout := (res.MarketDeal.Proposal.EndEpoch - head.Height()) + 1 + timeout := res.MarketDeal.Proposal.EndEpoch + 1 if err := mgr.demAPI.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil { return xerrors.Errorf("failed to set up state changed handler: %w", err) } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 5497eab5813..0350f707553 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -572,7 +572,7 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside earliest := abi.ChainEpoch(sealEpochs) + ht if deal.Proposal.StartEpoch < earliest { log.Warnw("proposed deal would start before sealing can be completed; rejecting storage deal proposal from client", "piece_cid", deal.Proposal.PieceCID, "client", deal.Client.String(), "seal_duration", sealDuration, "earliest", earliest, "curepoch", ht) - return false, fmt.Sprintf("cannot seal a sector before %s", deal.Proposal.StartEpoch), nil + return false, fmt.Sprintf("proposed deal start epoch %s too early, cannot seal a sector before %s", deal.Proposal.StartEpoch, earliest), nil } sd, err := startDelay()