diff --git a/.circleci/config.yml b/.circleci/config.yml index 1614daf8e6a..f0262e5b6ea 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -850,6 +850,11 @@ workflows: suite: itest-get_messages_in_ts target: "./itests/get_messages_in_ts_test.go" + - test: + name: test-itest-mempool + suite: itest-mempool + target: "./itests/mempool_test.go" + - test: name: test-itest-multisig suite: itest-multisig diff --git a/chain/messagepool/check_test.go b/chain/messagepool/check_test.go new file mode 100644 index 00000000000..ffcac74e5d0 --- /dev/null +++ b/chain/messagepool/check_test.go @@ -0,0 +1,224 @@ +//stm: #unit +package messagepool + +import ( + "context" + "fmt" + "testing" + + "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/assert" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" + "github.com/filecoin-project/lotus/chain/wallet" + _ "github.com/filecoin-project/lotus/lib/sigs/bls" + _ "github.com/filecoin-project/lotus/lib/sigs/secp" +) + +func init() { + _ = logging.SetLogLevel("*", "INFO") +} + +func getCheckMessageStatus(statusCode api.CheckStatusCode, msgStatuses []api.MessageCheckStatus) (*api.MessageCheckStatus, error) { + for i := 0; i < len(msgStatuses); i++ { + iMsgStatuses := msgStatuses[i] + if iMsgStatuses.CheckStatus.Code == statusCode { + return &iMsgStatuses, nil + } + } + return nil, fmt.Errorf("Could not find CheckStatusCode %s", statusCode) +} + +func TestCheckMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + var protos []*api.MessagePrototype + for i := 0; i < 5; i++ { + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: uint64(i), + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + proto := &api.MessagePrototype{ + Message: *msg, + ValidNonce: true, + } + protos = append(protos, proto) + } + + messageStatuses, err := mp.CheckMessages(context.TODO(), protos) + assert.NoError(t, err) + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + for j := 0; j < len(iMsgStatuses); j++ { + jStatus := iMsgStatuses[i] + assert.True(t, jStatus.OK) + } + } +} + +func TestCheckPendingMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + // add a valid message to the pool + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + + sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + mustAdd(t, mp, sm) + + messageStatuses, err := mp.CheckPendingMessages(context.TODO(), sender) + assert.NoError(t, err) + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + for j := 0; j < len(iMsgStatuses); j++ { + jStatus := iMsgStatuses[i] + assert.True(t, jStatus.OK) + } + } +} + +func TestCheckReplaceMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_REPLACE_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + // add a valid message to the pool + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + + sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + mustAdd(t, mp, sm) + + // create a new message with the same data, except that it is too big + var msgs []*types.Message + invalidmsg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 128<<10), + } + msgs = append(msgs, invalidmsg) + + { + messageStatuses, err := mp.CheckReplaceMessages(context.TODO(), msgs) + if err != nil { + t.Fatal(err) + } + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + + status, err := getCheckMessageStatus(api.CheckStatusMessageSize, iMsgStatuses) + if err != nil { + t.Fatal(err) + } + // the replacement message should cause a status error + assert.False(t, status.OK) + } + } + +} diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 6bd60da34b1..d7f075aabc7 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -226,6 +227,8 @@ func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) { } func TestMessagePool(t *testing.T) { + //stm: @CHAIN_MEMPOOL_GET_NONCE_001 + tma := newTestMpoolAPI() w, err := wallet.NewWallet(wallet.NewMemKeyStore()) @@ -327,6 +330,7 @@ func TestCheckMessageBig(t *testing.T) { Message: *msg, Signature: *sig, } + //stm: @CHAIN_MEMPOOL_PUSH_001 err = mp.Add(context.TODO(), sm) assert.ErrorIs(t, err, ErrMessageTooBig) } @@ -760,3 +764,302 @@ func TestUpdates(t *testing.T) { t.Fatal("expected closed channel, but got an update instead") } } + +func TestMessageBelowMinGasFee(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + // fee is just below minimum gas fee + fee := minimumBaseFee.Uint64() - 1 + { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(fee), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + err = mp.Add(context.TODO(), sm) + assert.ErrorIs(t, err, ErrGasFeeCapTooLow) + } +} + +func TestMessageValueTooHigh(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + totalFil := types.TotalFilecoinInt + extra := types.NewInt(1) + + value := types.BigAdd(totalFil, extra) + { + msg := &types.Message{ + To: to, + From: from, + Value: value, + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + + err = mp.Add(context.TODO(), sm) + assert.Error(t, err) + } +} + +func TestMessageSignatureInvalid(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + badSig := &crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: make([]byte, 0), + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *badSig, + } + err = mp.Add(context.TODO(), sm) + assert.Error(t, err) + // assert.Contains(t, err.Error(), "invalid signature length") + assert.Error(t, err) + } +} + +func TestAddMessageTwice(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + // create a valid messages + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // try to add it twice + err = mp.Add(context.TODO(), sm) + // assert.Contains(t, err.Error(), "with nonce 0 already in mpool") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceNonceGap(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + // create message with invalid nonce (1) + sm := makeTestMessage(w, from, to, 1, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // then try to add message again + err = mp.Add(context.TODO(), sm) + // assert.Contains(t, err.Error(), "unfulfilled nonce gap") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceCidDiff(t *testing.T) { + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // Create message with different data, so CID is different + sm2 := makeTestMessage(w, from, to, 0, 50_000_001, minimumBaseFee.Uint64()) + + //stm: @CHAIN_MEMPOOL_PUSH_001 + // then try to add message again + err = mp.Add(context.TODO(), sm2) + // assert.Contains(t, err.Error(), "replace by fee has too low GasPremium") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceCidDiffReplaced(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // Create message with different data, so CID is different + sm2 := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()*2) + mustAdd(t, mp, sm2) + } +} + +func TestRemoveMessage(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + //stm: @CHAIN_MEMPOOL_REMOVE_001 + // remove message for sender + mp.Remove(context.TODO(), from, sm.Message.Nonce, true) + + //stm: @CHAIN_MEMPOOL_PENDING_FOR_001 + // check messages in pool: should be none present + msgs := mp.pendingFor(context.TODO(), from) + assert.Len(t, msgs, 0) + } +} diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index de32eaa6bd4..18a75d88181 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -1,3 +1,4 @@ +//stm: #unit package messagepool import ( @@ -16,6 +17,7 @@ import ( ) func TestRepubMessages(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001 oldRepublishBatchDelay := RepublishBatchDelay RepublishBatchDelay = time.Microsecond defer func() { @@ -57,6 +59,7 @@ func TestRepubMessages(t *testing.T) { for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + //stm: @CHAIN_MEMPOOL_PUSH_001 _, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 2ae99cd779f..e97d5208ef1 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -1,3 +1,4 @@ +//stm: #unit package messagepool import ( @@ -74,6 +75,8 @@ func makeTestMpool() (*MessagePool, *testMpoolAPI) { } func TestMessageChains(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001 + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001 mp, tma := makeTestMpool() // the actors @@ -310,6 +313,8 @@ func TestMessageChains(t *testing.T) { } func TestMessageChainSkipping(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001 + // regression test for chain skip bug mp, tma := makeTestMpool() @@ -382,6 +387,7 @@ func TestMessageChainSkipping(t *testing.T) { } func TestBasicMessageSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 oldMaxNonceGap := MaxNonceGap MaxNonceGap = 1000 defer func() { @@ -532,6 +538,7 @@ func TestBasicMessageSelection(t *testing.T) { } func TestMessageSelectionTrimmingGas(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -595,6 +602,7 @@ func TestMessageSelectionTrimmingGas(t *testing.T) { } func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -641,6 +649,7 @@ func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { } func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -707,6 +716,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { } func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -788,6 +798,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { } func TestPriorityMessageSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -867,6 +878,7 @@ func TestPriorityMessageSelection(t *testing.T) { } func TestPriorityMessageSelection2(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -934,6 +946,7 @@ func TestPriorityMessageSelection2(t *testing.T) { } func TestPriorityMessageSelection3(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -1028,6 +1041,8 @@ func TestPriorityMessageSelection3(t *testing.T) { } func TestOptimalMessageSelection1(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses just a single actor sending messages with a low tq // the chain depenent merging algorithm should pick messages from the actor // from the start @@ -1094,6 +1109,8 @@ func TestOptimalMessageSelection1(t *testing.T) { } func TestOptimalMessageSelection2(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses two actors sending messages to each other, with the first // actor paying (much) higher gas premium than the second. // We select with a low ticket quality; the chain depenent merging algorithm should pick @@ -1173,6 +1190,8 @@ func TestOptimalMessageSelection2(t *testing.T) { } func TestOptimalMessageSelection3(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses 10 actors sending a block of messages to each other, with the the first // actors paying higher gas premium than the subsequent actors. // We select with a low ticket quality; the chain dependent merging algorithm should pick @@ -1416,6 +1435,8 @@ func makeZipfPremiumDistribution(rng *rand.Rand) func() uint64 { } func TestCompetitiveMessageSelectionExp(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + if testing.Short() { t.Skip("skipping in short mode") } @@ -1439,6 +1460,8 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) { } func TestCompetitiveMessageSelectionZipf(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + if testing.Short() { t.Skip("skipping in short mode") } @@ -1462,6 +1485,7 @@ func TestCompetitiveMessageSelectionZipf(t *testing.T) { } func TestGasReward(t *testing.T) { + //stm: @CHAIN_MEMPOOL_GET_GAS_REWARD_001 tests := []struct { Premium uint64 FeeCap uint64 @@ -1494,6 +1518,8 @@ func TestGasReward(t *testing.T) { } func TestRealWorldSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001 + // load test-messages.json.gz and rewrite the messages so that // 1) we map each real actor to a test actor so that we can sign the messages // 2) adjust the nonces so that they start from 0 diff --git a/itests/kit/circuit.go b/itests/kit/circuit.go new file mode 100644 index 00000000000..d2857010e31 --- /dev/null +++ b/itests/kit/circuit.go @@ -0,0 +1,34 @@ +package kit + +import ( + "fmt" + "testing" + "time" +) + +/* +CircuitBreaker implements a simple time-based circuit breaker used for waiting for async operations to finish. + +This is how it works: + - It runs the `cb` function until it returns true, + - waiting for `throttle` duration between each iteration, + - or at most `timeout` duration until it breaks test execution. + +You can use it if t.Deadline() is not "granular" enough, and you want to know which specific piece of code timed out, +or you need to set different deadlines in the same test. +*/ +func CircuitBreaker(t *testing.T, label string, throttle, timeout time.Duration, cb func() bool) { + tmo := time.After(timeout) + for { + if cb() { + break + } + select { + case <-tmo: + t.Fatal("timeout: ", label) + default: + fmt.Printf("waiting: %s\n", label) + time.Sleep(throttle) + } + } +} diff --git a/itests/mempool_test.go b/itests/mempool_test.go new file mode 100644 index 00000000000..a1c2a330e50 --- /dev/null +++ b/itests/mempool_test.go @@ -0,0 +1,521 @@ +//stm: #integration +package itests + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/stretchr/testify/require" +) + +const mPoolThrottle = time.Millisecond * 100 +const mPoolTimeout = time.Second * 10 + +func TestMemPoolPushSingleNode(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, err := firstNode.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + } + + sm, err := firstNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + // check pending messages for address + kit.CircuitBreaker(t, "push messages", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) +} + +func TestMemPoolPushTwoNodes(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + sender2 := secondNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + addr2, _ := secondNode.WalletNew(ctx, types.KTBLS) + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + + const totalMessages = 10 + + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + var sms []*types.SignedMessage + // push messages to message pools of both nodes + for i := 0; i < totalMessages; i++ { + // first + msg1 := &types.Message{ + From: sender, + To: addr, + Value: each, + } + + sm1, err := firstNode.MpoolPushMessage(ctx, msg1, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm1.Message.Nonce) + sms = append(sms, sm1) + + // second + msg2 := &types.Message{ + From: sender2, + To: addr2, + Value: each, + } + + sm2, err := secondNode.MpoolPushMessage(ctx, msg2, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm2.Message.Nonce) + sms = append(sms, sm2) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "push & mine messages", mPoolThrottle, mPoolTimeout, func() bool { + pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending1) == 0 && len(pending2) == 0 { + // Check messages on both nodes + for _, lookMsg := range sms { + msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup1) + + msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup2) + } + return true + } + return false + }) +} + +func TestMemPoolClearPending(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001, @CHAIN_MEMPOOL_PENDING_001 + //stm: @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CLEAR_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // Add single message, then clear the pool + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + } + _, err = firstNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + + // message should be in the mempool + kit.CircuitBreaker(t, "push message", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + return len(pending) == 1 + }) + + err = firstNode.MpoolClear(ctx, true) + require.NoError(t, err) + + // pool should be empty now + kit.CircuitBreaker(t, "clear mempool", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + return len(pending) == 0 + }) + + // mine a couple of blocks + ens.BeginMining(blockTime) + time.Sleep(5 * blockTime) + + // make sure that the cleared message wasn't picked up and mined + _, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true) + require.Error(t, err) +} + +func TestMemPoolBatchPush(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_BATCH_PUSH_001 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + sms = append(sms, signedMessage) + } + + _, err = firstNode.MpoolBatchPush(ctx, sms) + require.NoError(t, err) + + // check pending messages for address + kit.CircuitBreaker(t, "batch push", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + require.NoError(t, err) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) +} + +func TestMemPoolPushSingleNodeUntrusted(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_PUSH_003 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + // push untrusted messages + pushedCid, err := firstNode.MpoolPushUntrusted(ctx, signedMessage) + require.NoError(t, err) + require.Equal(t, msg.Cid(), pushedCid) + + sms = append(sms, signedMessage) + } + + kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + // check pending messages for address + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) + +} + +func TestMemPoolBatchPushUntrusted(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_BATCH_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + sms = append(sms, signedMessage) + } + + _, err = firstNode.MpoolBatchPushUntrusted(ctx, sms) + require.NoError(t, err) + + // check pending messages for address, wait until they are all pushed + kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + // wait until pending messages are mined, pool pending list should be empty + kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) + +}