Skip to content

Commit

Permalink
fix events API timeout handling for nil blocks (#7184)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored Aug 26, 2021
1 parent bfd4e20 commit 78b9929
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 86 deletions.
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,11 @@ workflows:
suite: itest-deals_concurrent
target: "./itests/deals_concurrent_test.go"

- test:
name: test-itest-deals_expiry
suite: itest-deals_expiry
target: "./itests/deals_expiry_test.go"

- test:
name: test-itest-deals_offline
suite: itest-deals_offline
Expand Down
14 changes: 7 additions & 7 deletions chain/events/events_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ts, at)
e.applyTimeouts(ts)
e.applyTimeouts(at)
}

// Update the latest known tipset
Expand Down Expand Up @@ -273,8 +273,8 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
}

// Apply any timeouts that expire at this height
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
triggers, ok := e.timeouts[ts.Height()]
func (e *hcEvents) applyTimeouts(at abi.ChainEpoch) {
triggers, ok := e.timeouts[at]
if !ok {
return // nothing to do
}
Expand All @@ -288,14 +288,14 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
continue
}

timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
timeoutTs, err := e.tsc.get(at - abi.ChainEpoch(trigger.confidence))
if err != nil {
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at)
}

more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
more, err := trigger.handle(nil, nil, timeoutTs, at)
if err != nil {
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err)
continue // don't revert failed calls
}

Expand Down
153 changes: 80 additions & 73 deletions chain/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,81 +1293,88 @@ func TestStateChangedRevert(t *testing.T) {
}

func TestStateChangedTimeout(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events := NewEvents(context.Background(), fcs)

called := false

err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, data)
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})

require.NoError(t, err)

fcs.advance(0, 21, nil)
require.False(t, called)

fcs.advance(0, 5, nil)
require.True(t, called)
called = false

// with check func reporting done

fcs = &fakeCS{
t: t,
h: 1,
timeoutHeight := abi.ChainEpoch(20)
confidence := 3

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
testCases := []struct {
name string
checkFn CheckFunc
nilBlocks []int
expectTimeout bool
}{{
// Verify that the state changed timeout is called at the expected height
name: "state changed timeout",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
},
expectTimeout: true,
}, {
// Verify that the state changed timeout is called even if the timeout
// falls on nil block
name: "state changed timeout falls on nil block",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
},
nilBlocks: []int{20, 21, 22, 23},
expectTimeout: true,
}, {
// Verify that the state changed timeout is not called if the check
// function reports that it's complete
name: "no timeout callback if check func reports done",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
},
expectTimeout: false,
}}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events := NewEvents(context.Background(), fcs)

// Track whether the callback was called
called := false

// Set up state change tracking that will timeout at the given height
err := events.StateChanged(
tc.checkFn,
func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
// Expect the callback to be called at the timeout height with nil data
called = true
require.Nil(t, data)
require.Equal(t, timeoutHeight, newTs.Height())
require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})

require.NoError(t, err)

// Advance to timeout height
fcs.advance(0, int(timeoutHeight)+1, nil)
require.False(t, called)

// Advance past timeout height
fcs.advance(0, 5, nil, tc.nilBlocks...)
require.Equal(t, tc.expectTimeout, called)
called = false
})
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events = NewEvents(context.Background(), fcs)

err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, data)
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})
require.NoError(t, err)

fcs.advance(0, 21, nil)
require.False(t, called)

fcs.advance(0, 5, nil)
require.False(t, called)
}

func TestCalledMultiplePerEpoch(t *testing.T) {
Expand Down
140 changes: 140 additions & 0 deletions itests/deals_expiry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package itests

import (
"context"
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/storagemarket"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
market3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
market4 "github.com/filecoin-project/specs-actors/v4/actors/builtin/market"
market5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/market"

"github.com/filecoin-project/lotus/itests/kit"
)

// Test that the deal state eventually moves to "Expired" on both client and miner
func TestDealExpiry(t *testing.T) {
kit.QuietMiningLogs()

resetMinDealDuration(t)

ctx := context.Background()

var (
client kit.TestFullNode
miner1 kit.TestMiner
)

ens := kit.NewEnsemble(t, kit.MockProofs())
ens.FullNode(&client)
ens.Miner(&miner1, &client, kit.WithAllSubsystems())
bm := ens.Start().InterconnectAll().BeginMining(50 * time.Millisecond)

dh := kit.NewDealHarness(t, &client, &miner1, &miner1)

client.WaitTillChain(ctx, kit.HeightAtLeast(5))

// Make a deal with a short duration
dealProposalCid, _, _ := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{
Rseed: 0,
FastRet: true,
// Needs to be far enough in the future to ensure the deal has been sealed
StartEpoch: 3000,
// Short deal duration
MinBlocksDuration: 50,
})

// Inject null blocks each time the chain advances by a block so as to
// get to deal expiration faster
go func() {
ch, _ := client.ChainNotify(ctx)
for range ch {
bm[0].InjectNulls(10)
}
}()

clientExpired := false
minerExpired := false
for {
ts, err := client.ChainHead(ctx)
require.NoError(t, err)

t.Logf("Chain height: %d", ts.Height())

// Get the miner deal from the proposal CID
minerDeal := getMinerDeal(ctx, t, miner1, *dealProposalCid)

t.Logf("Miner deal:")
t.Logf(" %s -> %s", minerDeal.Proposal.Client, minerDeal.Proposal.Provider)
t.Logf(" StartEpoch: %d", minerDeal.Proposal.StartEpoch)
t.Logf(" EndEpoch: %d", minerDeal.Proposal.EndEpoch)
t.Logf(" State: %s", storagemarket.DealStates[minerDeal.State])
//spew.Dump(d)

// Get the client deal
clientDeals, err := client.ClientListDeals(ctx)
require.NoError(t, err)

t.Logf("Client deal state: %s\n", storagemarket.DealStates[clientDeals[0].State])

// Expect the deal to eventually expire on the client and the miner
if clientDeals[0].State == storagemarket.StorageDealExpired {
t.Logf("Client deal expired")
clientExpired = true
}
if minerDeal.State == storagemarket.StorageDealExpired {
t.Logf("Miner deal expired")
minerExpired = true
}
if clientExpired && minerExpired {
t.Logf("PASS: Client and miner deal expired")
return
}

if ts.Height() > 5000 {
t.Fatalf("Reached height %d without client and miner deals expiring", ts.Height())
}

time.Sleep(2 * time.Second)
}
}

func getMinerDeal(ctx context.Context, t *testing.T, miner1 kit.TestMiner, dealProposalCid cid.Cid) storagemarket.MinerDeal {
minerDeals, err := miner1.MarketListIncompleteDeals(ctx)
require.NoError(t, err)
require.Greater(t, len(minerDeals), 0)

for _, d := range minerDeals {
if d.ProposalCid == dealProposalCid {
return d
}
}
t.Fatalf("miner deal with proposal CID %s not found", dealProposalCid)
return storagemarket.MinerDeal{}
}

// reset minimum deal duration to 0, so we can make very short-lived deals.
// NOTE: this will need updating with every new specs-actors version.
func resetMinDealDuration(t *testing.T) {
m2 := market2.DealMinDuration
m3 := market3.DealMinDuration
m4 := market4.DealMinDuration
m5 := market5.DealMinDuration

market2.DealMinDuration = 0
market3.DealMinDuration = 0
market4.DealMinDuration = 0
market5.DealMinDuration = 0

t.Cleanup(func() {
market2.DealMinDuration = m2
market3.DealMinDuration = m3
market4.DealMinDuration = m4
market5.DealMinDuration = m5
})
}
10 changes: 7 additions & 3 deletions itests/kit/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ type DealHarness struct {
}

type MakeFullDealParams struct {
Rseed int
FastRet bool
StartEpoch abi.ChainEpoch
Rseed int
FastRet bool
StartEpoch abi.ChainEpoch
MinBlocksDuration uint64

// SuspendUntilCryptoeconStable suspends deal-making, until cryptoecon
// parameters are stabilised. This affects projected collateral, and tests
Expand Down Expand Up @@ -92,6 +93,9 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, params MakeFullDealPa
dp.Data.Root = res.Root
dp.DealStartEpoch = params.StartEpoch
dp.FastRetrieval = params.FastRet
if params.MinBlocksDuration > 0 {
dp.MinBlocksDuration = params.MinBlocksDuration
}
deal = dh.StartDeal(ctx, dp)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
Expand Down
4 changes: 2 additions & 2 deletions markets/storageadapter/ondealexpired.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (mgr *DealExpiryManager) OnDealExpiredOrSlashed(ctx context.Context, publis
// and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
// Check if the deal has already expired
if ts2 == nil || res.MarketDeal.Proposal.EndEpoch <= ts2.Height() {
if res.MarketDeal.Proposal.EndEpoch <= h {
onDealExpired(nil)
return false, nil
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func (mgr *DealExpiryManager) OnDealExpiredOrSlashed(ctx context.Context, publis
match := mgr.dsMatcher.matcher(ctx, res.DealID)

// Wait until after the end epoch for the deal and then timeout
timeout := (res.MarketDeal.Proposal.EndEpoch - head.Height()) + 1
timeout := res.MarketDeal.Proposal.EndEpoch + 1
if err := mgr.demAPI.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err)
}
Expand Down
Loading

0 comments on commit 78b9929

Please sign in to comment.