Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

markets: OnDealExpiredOrSlashed - get deal by proposal instead of deal ID #5431

Merged
merged 4 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,11 @@ workflows:
suite: itest-deals_concurrent
target: "./itests/deals_concurrent_test.go"

- test:
name: test-itest-deals_expiry
suite: itest-deals_expiry
target: "./itests/deals_expiry_test.go"

- test:
name: test-itest-deals_offline
suite: itest-deals_offline
Expand Down
14 changes: 7 additions & 7 deletions chain/events/events_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
// Apply any queued events and timeouts that were targeted at the
// current chain height
e.applyWithConfidence(ts, at)
e.applyTimeouts(ts)
e.applyTimeouts(at)
}

// Update the latest known tipset
Expand Down Expand Up @@ -273,8 +273,8 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
}

// Apply any timeouts that expire at this height
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
triggers, ok := e.timeouts[ts.Height()]
func (e *hcEvents) applyTimeouts(at abi.ChainEpoch) {
triggers, ok := e.timeouts[at]
if !ok {
return // nothing to do
}
Expand All @@ -288,14 +288,14 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
continue
}

timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
timeoutTs, err := e.tsc.get(at - abi.ChainEpoch(trigger.confidence))
if err != nil {
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at)
}

more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
more, err := trigger.handle(nil, nil, timeoutTs, at)
if err != nil {
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err)
continue // don't revert failed calls
}

Expand Down
153 changes: 80 additions & 73 deletions chain/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,81 +1293,88 @@ func TestStateChangedRevert(t *testing.T) {
}

func TestStateChangedTimeout(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events := NewEvents(context.Background(), fcs)

called := false

err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, data)
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})

require.NoError(t, err)

fcs.advance(0, 21, nil)
require.False(t, called)

fcs.advance(0, 5, nil)
require.True(t, called)
called = false

// with check func reporting done

fcs = &fakeCS{
t: t,
h: 1,
timeoutHeight := abi.ChainEpoch(20)
confidence := 3

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
testCases := []struct {
name string
checkFn CheckFunc
nilBlocks []int
expectTimeout bool
}{{
// Verify that the state changed timeout is called at the expected height
name: "state changed timeout",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
},
expectTimeout: true,
}, {
// Verify that the state changed timeout is called even if the timeout
// falls on nil block
name: "state changed timeout falls on nil block",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
},
nilBlocks: []int{20, 21, 22, 23},
expectTimeout: true,
}, {
// Verify that the state changed timeout is not called if the check
// function reports that it's complete
name: "no timeout callback if check func reports done",
checkFn: func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
},
expectTimeout: false,
}}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events := NewEvents(context.Background(), fcs)

// Track whether the callback was called
called := false

// Set up state change tracking that will timeout at the given height
err := events.StateChanged(
tc.checkFn,
func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
// Expect the callback to be called at the timeout height with nil data
called = true
require.Nil(t, data)
require.Equal(t, timeoutHeight, newTs.Height())
require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})

require.NoError(t, err)

// Advance to timeout height
fcs.advance(0, int(timeoutHeight)+1, nil)
require.False(t, called)

// Advance past timeout height
fcs.advance(0, 5, nil, tc.nilBlocks...)
require.Equal(t, tc.expectTimeout, called)
called = false
})
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events = NewEvents(context.Background(), fcs)

err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
called = true
require.Nil(t, data)
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
require.Equal(t, abi.ChainEpoch(23), curH)
return false, nil
}, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
return false, nil, nil
})
require.NoError(t, err)

fcs.advance(0, 21, nil)
require.False(t, called)

fcs.advance(0, 5, nil)
require.False(t, called)
}

func TestCalledMultiplePerEpoch(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/filecoin-project/go-data-transfer v1.7.6
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.8.1
github.com/filecoin-project/go-fil-markets v1.9.0
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.8.1 h1:nNJB5EIp5c6yo/z51DloVaL7T24SslCoxSDOXwNQr9k=
github.com/filecoin-project/go-fil-markets v1.8.1/go.mod h1:PIPyOhoDLWT5NcciJQeK6Hes7MIeczGLNWVO/2Vy0a4=
github.com/filecoin-project/go-fil-markets v1.9.0 h1:atoORQmjN1SEjB4RKj3uvPCqL9Jcs2RZ1GHKefstkxw=
github.com/filecoin-project/go-fil-markets v1.9.0/go.mod h1:PIPyOhoDLWT5NcciJQeK6Hes7MIeczGLNWVO/2Vy0a4=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
Expand Down
140 changes: 140 additions & 0 deletions itests/deals_expiry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package itests

import (
"context"
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/storagemarket"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
market3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
market4 "github.com/filecoin-project/specs-actors/v4/actors/builtin/market"
market5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/market"

"github.com/filecoin-project/lotus/itests/kit"
)

// Test that the deal state eventually moves to "Expired" on both client and miner
func TestDealExpiry(t *testing.T) {
kit.QuietMiningLogs()

resetMinDealDuration(t)

ctx := context.Background()

var (
client kit.TestFullNode
miner1 kit.TestMiner
)

ens := kit.NewEnsemble(t, kit.MockProofs())
ens.FullNode(&client)
ens.Miner(&miner1, &client, kit.WithAllSubsystems())
bm := ens.Start().InterconnectAll().BeginMining(50 * time.Millisecond)

dh := kit.NewDealHarness(t, &client, &miner1, &miner1)

client.WaitTillChain(ctx, kit.HeightAtLeast(5))

// Make a deal with a short duration
dealProposalCid, _, _ := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{
Rseed: 0,
FastRet: true,
// Needs to be far enough in the future to ensure the deal has been sealed
StartEpoch: 3000,
// Short deal duration
MinBlocksDuration: 50,
})

// Inject null blocks each time the chain advances by a block so as to
// get to deal expiration faster
go func() {
ch, _ := client.ChainNotify(ctx)
for range ch {
bm[0].InjectNulls(10)
}
}()

clientExpired := false
minerExpired := false
for {
ts, err := client.ChainHead(ctx)
require.NoError(t, err)

t.Logf("Chain height: %d", ts.Height())

// Get the miner deal from the proposal CID
minerDeal := getMinerDeal(ctx, t, miner1, *dealProposalCid)

t.Logf("Miner deal:")
t.Logf(" %s -> %s", minerDeal.Proposal.Client, minerDeal.Proposal.Provider)
t.Logf(" StartEpoch: %d", minerDeal.Proposal.StartEpoch)
t.Logf(" EndEpoch: %d", minerDeal.Proposal.EndEpoch)
t.Logf(" State: %s", storagemarket.DealStates[minerDeal.State])
//spew.Dump(d)

// Get the client deal
clientDeals, err := client.ClientListDeals(ctx)
require.NoError(t, err)

t.Logf("Client deal state: %s\n", storagemarket.DealStates[clientDeals[0].State])

// Expect the deal to eventually expire on the client and the miner
if clientDeals[0].State == storagemarket.StorageDealExpired {
t.Logf("Client deal expired")
clientExpired = true
}
if minerDeal.State == storagemarket.StorageDealExpired {
t.Logf("Miner deal expired")
minerExpired = true
}
if clientExpired && minerExpired {
t.Logf("PASS: Client and miner deal expired")
return
}

if ts.Height() > 5000 {
t.Fatalf("Reached height %d without client and miner deals expiring", ts.Height())
}

time.Sleep(2 * time.Second)
}
}

func getMinerDeal(ctx context.Context, t *testing.T, miner1 kit.TestMiner, dealProposalCid cid.Cid) storagemarket.MinerDeal {
minerDeals, err := miner1.MarketListIncompleteDeals(ctx)
require.NoError(t, err)
require.Greater(t, len(minerDeals), 0)

for _, d := range minerDeals {
if d.ProposalCid == dealProposalCid {
return d
}
}
t.Fatalf("miner deal with proposal CID %s not found", dealProposalCid)
return storagemarket.MinerDeal{}
}

// reset minimum deal duration to 0, so we can make very short-lived deals.
// NOTE: this will need updating with every new specs-actors version.
func resetMinDealDuration(t *testing.T) {
m2 := market2.DealMinDuration
m3 := market3.DealMinDuration
m4 := market4.DealMinDuration
m5 := market5.DealMinDuration

market2.DealMinDuration = 0
market3.DealMinDuration = 0
market4.DealMinDuration = 0
market5.DealMinDuration = 0

t.Cleanup(func() {
market2.DealMinDuration = m2
market3.DealMinDuration = m3
market4.DealMinDuration = m4
market5.DealMinDuration = m5
})
}
4 changes: 4 additions & 0 deletions itests/kit/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type MakeFullDealParams struct {
Rseed int
FastRet bool
StartEpoch abi.ChainEpoch
MinBlocksDuration uint64
UseCARFileForStorageDeal bool

// SuspendUntilCryptoeconStable suspends deal-making, until cryptoecon
Expand Down Expand Up @@ -97,6 +98,9 @@ func (dh *DealHarness) MakeOnlineDeal(ctx context.Context, params MakeFullDealPa
dp.Data.Root = res.Root
dp.DealStartEpoch = params.StartEpoch
dp.FastRetrieval = params.FastRet
if params.MinBlocksDuration > 0 {
dp.MinBlocksDuration = params.MinBlocksDuration
}
deal = dh.StartDeal(ctx, dp)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
Expand Down
Loading