From 008fbbd652326a878e6848d3a922eb8efbd0f171 Mon Sep 17 00:00:00 2001 From: Darko Brdareski Date: Wed, 2 Feb 2022 17:08:50 +0100 Subject: [PATCH 1/5] Add unit and integration tests for mempool --- chain/messagepool/check_test.go | 224 +++++++++++++ chain/messagepool/messagepool_test.go | 299 +++++++++++++++++ chain/messagepool/repub_test.go | 3 + chain/messagepool/selection_test.go | 26 ++ itests/mempool_test.go | 455 ++++++++++++++++++++++++++ 5 files changed, 1007 insertions(+) create mode 100644 chain/messagepool/check_test.go create mode 100644 itests/mempool_test.go diff --git a/chain/messagepool/check_test.go b/chain/messagepool/check_test.go new file mode 100644 index 00000000000..ffcac74e5d0 --- /dev/null +++ b/chain/messagepool/check_test.go @@ -0,0 +1,224 @@ +//stm: #unit +package messagepool + +import ( + "context" + "fmt" + "testing" + + "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/assert" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" + "github.com/filecoin-project/lotus/chain/wallet" + _ "github.com/filecoin-project/lotus/lib/sigs/bls" + _ "github.com/filecoin-project/lotus/lib/sigs/secp" +) + +func init() { + _ = logging.SetLogLevel("*", "INFO") +} + +func getCheckMessageStatus(statusCode api.CheckStatusCode, msgStatuses []api.MessageCheckStatus) (*api.MessageCheckStatus, error) { + for i := 0; i < len(msgStatuses); i++ { + iMsgStatuses := msgStatuses[i] + if iMsgStatuses.CheckStatus.Code == statusCode { + return &iMsgStatuses, nil + } + } + return nil, fmt.Errorf("Could not find CheckStatusCode %s", statusCode) +} + +func TestCheckMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + var protos []*api.MessagePrototype + for i := 0; i < 5; i++ { + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: uint64(i), + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + proto := &api.MessagePrototype{ + Message: *msg, + ValidNonce: true, + } + protos = append(protos, proto) + } + + messageStatuses, err := mp.CheckMessages(context.TODO(), protos) + assert.NoError(t, err) + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + for j := 0; j < len(iMsgStatuses); j++ { + jStatus := iMsgStatuses[i] + assert.True(t, jStatus.OK) + } + } +} + +func TestCheckPendingMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + // add a valid message to the pool + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + + sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + mustAdd(t, mp, sm) + + messageStatuses, err := mp.CheckPendingMessages(context.TODO(), sender) + assert.NoError(t, err) + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + for j := 0; j < len(iMsgStatuses); j++ { + jStatus := iMsgStatuses[i] + assert.True(t, jStatus.OK) + } + } +} + +func TestCheckReplaceMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_REPLACE_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + // add a valid message to the pool + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + + sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + mustAdd(t, mp, sm) + + // create a new message with the same data, except that it is too big + var msgs []*types.Message + invalidmsg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 128<<10), + } + msgs = append(msgs, invalidmsg) + + { + messageStatuses, err := mp.CheckReplaceMessages(context.TODO(), msgs) + if err != nil { + t.Fatal(err) + } + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + + status, err := getCheckMessageStatus(api.CheckStatusMessageSize, iMsgStatuses) + if err != nil { + t.Fatal(err) + } + // the replacement message should cause a status error + assert.False(t, status.OK) + } + } + +} diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 6bd60da34b1..86c3a49d147 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -226,6 +227,8 @@ func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) { } func TestMessagePool(t *testing.T) { + //stm: @CHAIN_MEMPOOL_GET_NONCE_001 + tma := newTestMpoolAPI() w, err := wallet.NewWallet(wallet.NewMemKeyStore()) @@ -327,6 +330,7 @@ func TestCheckMessageBig(t *testing.T) { Message: *msg, Signature: *sig, } + //stm: @CHAIN_MEMPOOL_PUSH_001 err = mp.Add(context.TODO(), sm) assert.ErrorIs(t, err, ErrMessageTooBig) } @@ -760,3 +764,298 @@ func TestUpdates(t *testing.T) { t.Fatal("expected closed channel, but got an update instead") } } + +func TestMessageBelowMinGasFee(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + // fee is just below minimum gas fee + fee := minimumBaseFee.Uint64() - 1 + { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(fee), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + err = mp.Add(context.TODO(), sm) + assert.ErrorIs(t, err, ErrGasFeeCapTooLow) + } +} + +func TestMessageValueTooHigh(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + totalFil := types.TotalFilecoinInt + extra := types.NewInt(1) + + value := types.BigAdd(totalFil, extra) + { + msg := &types.Message{ + To: to, + From: from, + Value: value, + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + + err = mp.Add(context.TODO(), sm) + assert.Error(t, err) + } +} + +func TestMessageSignatureInvalid(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + badSig := &crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: make([]byte, 0), + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *badSig, + } + err = mp.Add(context.TODO(), sm) + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid signature length") + } +} + +func TestAddMessageTwice(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + // create a valid messages + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // try to add it twice + err = mp.Add(context.TODO(), sm) + assert.Contains(t, err.Error(), "with nonce 0 already in mpool") + } +} + +func TestAddMessageTwiceNonceGap(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + // create message with invalid nonce (1) + sm := makeTestMessage(w, from, to, 1, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // then try to add message again + err = mp.Add(context.TODO(), sm) + assert.Contains(t, err.Error(), "unfulfilled nonce gap") + } +} + +func TestAddMessageTwiceCidDiff(t *testing.T) { + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // Create message with different data, so CID is different + sm2 := makeTestMessage(w, from, to, 0, 50_000_001, minimumBaseFee.Uint64()) + + //stm: @CHAIN_MEMPOOL_PUSH_001 + // then try to add message again + err = mp.Add(context.TODO(), sm2) + assert.Contains(t, err.Error(), "replace by fee has too low GasPremium") + } +} + +func TestAddMessageTwiceCidDiffReplaced(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // Create message with different data, so CID is different + sm2 := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()*2) + mustAdd(t, mp, sm2) + } +} + +func TestRemoveMessage(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + //stm: @CHAIN_MEMPOOL_REMOVE_001 + // remove message for sender + mp.Remove(context.TODO(), from, sm.Message.Nonce, true) + + //stm: @CHAIN_MEMPOOL_PENDING_FOR_001 + // check messages in pool: should be none present + msgs := mp.pendingFor(context.TODO(), from) + assert.Len(t, msgs, 0) + } +} diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index de32eaa6bd4..18a75d88181 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -1,3 +1,4 @@ +//stm: #unit package messagepool import ( @@ -16,6 +17,7 @@ import ( ) func TestRepubMessages(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001 oldRepublishBatchDelay := RepublishBatchDelay RepublishBatchDelay = time.Microsecond defer func() { @@ -57,6 +59,7 @@ func TestRepubMessages(t *testing.T) { for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + //stm: @CHAIN_MEMPOOL_PUSH_001 _, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 2ae99cd779f..e97d5208ef1 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -1,3 +1,4 @@ +//stm: #unit package messagepool import ( @@ -74,6 +75,8 @@ func makeTestMpool() (*MessagePool, *testMpoolAPI) { } func TestMessageChains(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001 + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001 mp, tma := makeTestMpool() // the actors @@ -310,6 +313,8 @@ func TestMessageChains(t *testing.T) { } func TestMessageChainSkipping(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001 + // regression test for chain skip bug mp, tma := makeTestMpool() @@ -382,6 +387,7 @@ func TestMessageChainSkipping(t *testing.T) { } func TestBasicMessageSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 oldMaxNonceGap := MaxNonceGap MaxNonceGap = 1000 defer func() { @@ -532,6 +538,7 @@ func TestBasicMessageSelection(t *testing.T) { } func TestMessageSelectionTrimmingGas(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -595,6 +602,7 @@ func TestMessageSelectionTrimmingGas(t *testing.T) { } func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -641,6 +649,7 @@ func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { } func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -707,6 +716,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { } func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -788,6 +798,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { } func TestPriorityMessageSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -867,6 +878,7 @@ func TestPriorityMessageSelection(t *testing.T) { } func TestPriorityMessageSelection2(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -934,6 +946,7 @@ func TestPriorityMessageSelection2(t *testing.T) { } func TestPriorityMessageSelection3(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -1028,6 +1041,8 @@ func TestPriorityMessageSelection3(t *testing.T) { } func TestOptimalMessageSelection1(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses just a single actor sending messages with a low tq // the chain depenent merging algorithm should pick messages from the actor // from the start @@ -1094,6 +1109,8 @@ func TestOptimalMessageSelection1(t *testing.T) { } func TestOptimalMessageSelection2(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses two actors sending messages to each other, with the first // actor paying (much) higher gas premium than the second. // We select with a low ticket quality; the chain depenent merging algorithm should pick @@ -1173,6 +1190,8 @@ func TestOptimalMessageSelection2(t *testing.T) { } func TestOptimalMessageSelection3(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses 10 actors sending a block of messages to each other, with the the first // actors paying higher gas premium than the subsequent actors. // We select with a low ticket quality; the chain dependent merging algorithm should pick @@ -1416,6 +1435,8 @@ func makeZipfPremiumDistribution(rng *rand.Rand) func() uint64 { } func TestCompetitiveMessageSelectionExp(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + if testing.Short() { t.Skip("skipping in short mode") } @@ -1439,6 +1460,8 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) { } func TestCompetitiveMessageSelectionZipf(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + if testing.Short() { t.Skip("skipping in short mode") } @@ -1462,6 +1485,7 @@ func TestCompetitiveMessageSelectionZipf(t *testing.T) { } func TestGasReward(t *testing.T) { + //stm: @CHAIN_MEMPOOL_GET_GAS_REWARD_001 tests := []struct { Premium uint64 FeeCap uint64 @@ -1494,6 +1518,8 @@ func TestGasReward(t *testing.T) { } func TestRealWorldSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001 + // load test-messages.json.gz and rewrite the messages so that // 1) we map each real actor to a test actor so that we can sign the messages // 2) adjust the nonces so that they start from 0 diff --git a/itests/mempool_test.go b/itests/mempool_test.go new file mode 100644 index 00000000000..f5fb408e0aa --- /dev/null +++ b/itests/mempool_test.go @@ -0,0 +1,455 @@ +//stm: #integration +package itests + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/stretchr/testify/require" +) + +func TestMemPoolPushSingleNode(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(blockTime) + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, err := firstNode.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + } + + sm, err := firstNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + // check pending messages for address + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.Equal(t, totalMessages, len(msgStatuses)) + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + time.Sleep(10 * blockTime) + + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, 0, len(pending)) + + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } +} + +func TestMemPoolPushTwoNodes(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(blockTime) + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + sender2 := secondNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + addr2, _ := secondNode.WalletNew(ctx, types.KTBLS) + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + + const totalMessages = 10 + + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + var sms []*types.SignedMessage + // push messages to message pools of both nodes + for i := 0; i < totalMessages; i++ { + // first + msg1 := &types.Message{ + From: sender, + To: addr, + Value: each, + } + + sm1, err := firstNode.MpoolPushMessage(ctx, msg1, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm1.Message.Nonce) + sms = append(sms, sm1) + + // second + msg2 := &types.Message{ + From: sender2, + To: addr2, + Value: each, + } + + sm2, err := secondNode.MpoolPushMessage(ctx, msg2, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm2.Message.Nonce) + sms = append(sms, sm2) + } + + time.Sleep(10 * blockTime) + + pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, 0, len(pending1)) + + pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, 0, len(pending2)) + + // Check messages on both nodes + for _, lookMsg := range sms { + msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup1) + + msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup2) + } +} + +func TestMemPoolClearPending(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001, @CHAIN_MEMPOOL_PENDING_001 + //stm: @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CLEAR_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(blockTime) + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // Add single message, then clear the pool + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + } + _, err = firstNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + + err = firstNode.MpoolClear(ctx, true) + require.NoError(t, err) + + // pool should be empty now + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, 0, len(pending)) + + time.Sleep(2 * blockTime) + + // waiting for the message should produce nothing + _, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true) + require.Error(t, err) +} + +func TestMemPoolBatchPush(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_BATCH_PUSH_001 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(blockTime) + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + sms = append(sms, signedMessage) + } + + _, err = firstNode.MpoolBatchPush(ctx, sms) + require.NoError(t, err) + + // check pending messages for address + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + require.Equal(t, totalMessages, len(msgStatuses)) + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + require.NoError(t, err) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + time.Sleep(10 * blockTime) + + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, 0, len(pending)) + + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } +} + +func TestMemPoolPushSingleNodeUntrusted(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_PUSH_003 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(blockTime) + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + // push untrusted messages + pushedCid, err := firstNode.MpoolPushUntrusted(ctx, signedMessage) + require.NoError(t, err) + require.Equal(t, msg.Cid(), pushedCid) + + sms = append(sms, signedMessage) + } + + // check pending messages for address + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.Equal(t, totalMessages, len(msgStatuses)) + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + time.Sleep(10 * blockTime) + + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, 0, len(pending)) + + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } +} + +func TestMemPoolBatchPushUntrusted(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_BATCH_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(blockTime) + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + sms = append(sms, signedMessage) + } + + _, err = firstNode.MpoolBatchPushUntrusted(ctx, sms) + require.NoError(t, err) + + // check pending messages for address + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + require.Equal(t, totalMessages, len(msgStatuses)) + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + time.Sleep(10 * blockTime) + + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, 0, len(pending)) + + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } +} From 03bc45a26a867a8e17ac9dd3d5c51571754d78f7 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Tue, 8 Feb 2022 12:47:23 -0500 Subject: [PATCH 2/5] Update ci config to match auto gen --- .circleci/config.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 53611d56515..5672130eb3f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -850,6 +850,11 @@ workflows: suite: itest-get_messages_in_ts target: "./itests/get_messages_in_ts_test.go" + - test: + name: test-itest-mempool + suite: itest-mempool + target: "./itests/mempool_test.go" + - test: name: test-itest-multisig suite: itest-multisig From 7d2810abbcc312ef970e56f3cb1363c80c12a586 Mon Sep 17 00:00:00 2001 From: Nikola Divic Date: Wed, 9 Feb 2022 19:54:45 +0100 Subject: [PATCH 3/5] test: don't parse err messages in messagepool_test Per @vyzo's feedback, we shouldn't parse err messages but figure out a way to do this smarter. I updated the code just check for error existence and @brdji should figure out what to do next. --- chain/messagepool/messagepool_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 86c3a49d147..d7f075aabc7 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -901,7 +901,8 @@ func TestMessageSignatureInvalid(t *testing.T) { } err = mp.Add(context.TODO(), sm) assert.Error(t, err) - assert.Contains(t, err.Error(), "invalid signature length") + // assert.Contains(t, err.Error(), "invalid signature length") + assert.Error(t, err) } } @@ -931,7 +932,8 @@ func TestAddMessageTwice(t *testing.T) { // try to add it twice err = mp.Add(context.TODO(), sm) - assert.Contains(t, err.Error(), "with nonce 0 already in mpool") + // assert.Contains(t, err.Error(), "with nonce 0 already in mpool") + assert.Error(t, err) } } @@ -961,7 +963,8 @@ func TestAddMessageTwiceNonceGap(t *testing.T) { // then try to add message again err = mp.Add(context.TODO(), sm) - assert.Contains(t, err.Error(), "unfulfilled nonce gap") + // assert.Contains(t, err.Error(), "unfulfilled nonce gap") + assert.Error(t, err) } } @@ -993,7 +996,8 @@ func TestAddMessageTwiceCidDiff(t *testing.T) { //stm: @CHAIN_MEMPOOL_PUSH_001 // then try to add message again err = mp.Add(context.TODO(), sm2) - assert.Contains(t, err.Error(), "replace by fee has too low GasPremium") + // assert.Contains(t, err.Error(), "replace by fee has too low GasPremium") + assert.Error(t, err) } } From aca2a0fd1b8fafb594a3e9375d1d7231aca3f15d Mon Sep 17 00:00:00 2001 From: Nikola Divic Date: Sat, 12 Feb 2022 17:48:45 +0100 Subject: [PATCH 4/5] test: fix flake in TestMemPoolBatchPushUntrusted integration test The flake was caused by improper waiting for certain chain operations to finish: - We didn't wait for messages to be registered as pushed - We improperly waited for a fixed time (10 seconds) for messages to be mined, which in the best case would wait longer than necessary and in the worst case would cause the test to break. What I did: - fixed by waiting in a loop for "just enough time". This fixed the flake and made the test run faster, on average, because we don't have unnecessary waiting. - I added a "circuit-breaker" where the wait loop will timeout after 10 seconds. --- itests/mempool_test.go | 66 ++++++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/itests/mempool_test.go b/itests/mempool_test.go index f5fb408e0aa..51e0afafd2a 100644 --- a/itests/mempool_test.go +++ b/itests/mempool_test.go @@ -3,6 +3,7 @@ package itests import ( "context" + "fmt" "testing" "time" @@ -380,7 +381,7 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) { ctx := context.Background() const blockTime = 100 * time.Millisecond firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) + ens.InterconnectAll() kit.QuietMiningLogs() sender := firstNode.DefaultKey.Address @@ -416,18 +417,33 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) { _, err = firstNode.MpoolBatchPushUntrusted(ctx, sms) require.NoError(t, err) - // check pending messages for address - msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) - require.NoError(t, err) - require.Equal(t, totalMessages, len(msgStatuses)) - for _, msgStatusList := range msgStatuses { - for _, status := range msgStatusList { - require.True(t, status.OK) + // check pending messages for address, wait until they are all pushed + timeout := time.After(time.Second * 10) + for { + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + break + } + + select { + case <-timeout: + t.Fatal("waiting for batch push timed out") + default: + fmt.Printf("waiting for %d more messages to be pushed\n", len(msgStatuses)-totalMessages) + time.Sleep(time.Millisecond * 100) } } // verify messages should be the ones included in the next block selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { found := false for _, selectedMsg := range selected { @@ -439,17 +455,31 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) { require.True(t, found) } - time.Sleep(10 * blockTime) + ens.BeginMining(blockTime) - // pool pending list should be empty - pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) - require.NoError(t, err) - require.Equal(t, 0, len(pending)) - - // all messages should be added to the chain - for _, lookMsg := range sms { - msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + // wait until pending messages are mined + timeout = time.After(time.Second * 10) + for { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) require.NoError(t, err) - require.NotNil(t, msgLookup) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + break + } + + select { + case <-timeout: + t.Fatal("waiting for pending messages to be mined timed out") + default: + fmt.Printf("waiting for %d more messages to be mined\n", len(pending)) + time.Sleep(time.Millisecond * 100) + } } } From 34387326d16b14c03c7ad97ce74cc988f3a5dfc8 Mon Sep 17 00:00:00 2001 From: Nikola Divic Date: Sat, 12 Feb 2022 19:52:51 +0100 Subject: [PATCH 5/5] test: fix flaky message pool integration tests Using the same pattern described in my previous commit. I also added the CircuitBreaker to the itests kit as it may be useful for other integration tests when debugging flakyness caused by timeouts. --- itests/kit/circuit.go | 34 ++++++ itests/mempool_test.go | 246 +++++++++++++++++++++++------------------ 2 files changed, 175 insertions(+), 105 deletions(-) create mode 100644 itests/kit/circuit.go diff --git a/itests/kit/circuit.go b/itests/kit/circuit.go new file mode 100644 index 00000000000..d2857010e31 --- /dev/null +++ b/itests/kit/circuit.go @@ -0,0 +1,34 @@ +package kit + +import ( + "fmt" + "testing" + "time" +) + +/* +CircuitBreaker implements a simple time-based circuit breaker used for waiting for async operations to finish. + +This is how it works: + - It runs the `cb` function until it returns true, + - waiting for `throttle` duration between each iteration, + - or at most `timeout` duration until it breaks test execution. + +You can use it if t.Deadline() is not "granular" enough, and you want to know which specific piece of code timed out, +or you need to set different deadlines in the same test. +*/ +func CircuitBreaker(t *testing.T, label string, throttle, timeout time.Duration, cb func() bool) { + tmo := time.After(timeout) + for { + if cb() { + break + } + select { + case <-tmo: + t.Fatal("timeout: ", label) + default: + fmt.Printf("waiting: %s\n", label) + time.Sleep(throttle) + } + } +} diff --git a/itests/mempool_test.go b/itests/mempool_test.go index 51e0afafd2a..a1c2a330e50 100644 --- a/itests/mempool_test.go +++ b/itests/mempool_test.go @@ -3,7 +3,6 @@ package itests import ( "context" - "fmt" "testing" "time" @@ -14,6 +13,9 @@ import ( "github.com/stretchr/testify/require" ) +const mPoolThrottle = time.Millisecond * 100 +const mPoolTimeout = time.Second * 10 + func TestMemPoolPushSingleNode(t *testing.T) { //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 @@ -21,7 +23,7 @@ func TestMemPoolPushSingleNode(t *testing.T) { ctx := context.Background() const blockTime = 100 * time.Millisecond firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) + ens.InterconnectAll() kit.QuietMiningLogs() sender := firstNode.DefaultKey.Address @@ -53,13 +55,18 @@ func TestMemPoolPushSingleNode(t *testing.T) { } // check pending messages for address - msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) - require.Equal(t, totalMessages, len(msgStatuses)) - for _, msgStatusList := range msgStatuses { - for _, status := range msgStatusList { - require.True(t, status.OK) + kit.CircuitBreaker(t, "push messages", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true } - } + return false + }) // verify messages should be the ones included in the next block selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) @@ -74,19 +81,24 @@ func TestMemPoolPushSingleNode(t *testing.T) { require.True(t, found) } - time.Sleep(10 * blockTime) - - // pool pending list should be empty - pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) - require.NoError(t, err) - require.Equal(t, 0, len(pending)) + ens.BeginMining(blockTime) - // all messages should be added to the chain - for _, lookMsg := range sms { - msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) require.NoError(t, err) - require.NotNil(t, msgLookup) - } + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) } func TestMemPoolPushTwoNodes(t *testing.T) { @@ -96,7 +108,7 @@ func TestMemPoolPushTwoNodes(t *testing.T) { ctx := context.Background() const blockTime = 100 * time.Millisecond firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) + ens.InterconnectAll() kit.QuietMiningLogs() sender := firstNode.DefaultKey.Address @@ -141,26 +153,30 @@ func TestMemPoolPushTwoNodes(t *testing.T) { sms = append(sms, sm2) } - time.Sleep(10 * blockTime) - - pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) - require.NoError(t, err) - require.Equal(t, 0, len(pending1)) - - pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK) - require.NoError(t, err) - require.Equal(t, 0, len(pending2)) + ens.BeginMining(blockTime) - // Check messages on both nodes - for _, lookMsg := range sms { - msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + kit.CircuitBreaker(t, "push & mine messages", mPoolThrottle, mPoolTimeout, func() bool { + pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) require.NoError(t, err) - require.NotNil(t, msgLookup1) - msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK) require.NoError(t, err) - require.NotNil(t, msgLookup2) - } + + if len(pending1) == 0 && len(pending2) == 0 { + // Check messages on both nodes + for _, lookMsg := range sms { + msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup1) + + msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup2) + } + return true + } + return false + }) } func TestMemPoolClearPending(t *testing.T) { @@ -169,7 +185,7 @@ func TestMemPoolClearPending(t *testing.T) { ctx := context.Background() const blockTime = 100 * time.Millisecond firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) + ens.InterconnectAll() kit.QuietMiningLogs() sender := firstNode.DefaultKey.Address @@ -192,17 +208,30 @@ func TestMemPoolClearPending(t *testing.T) { _, err = firstNode.MpoolPushMessage(ctx, msg, nil) require.NoError(t, err) + // message should be in the mempool + kit.CircuitBreaker(t, "push message", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + return len(pending) == 1 + }) + err = firstNode.MpoolClear(ctx, true) require.NoError(t, err) // pool should be empty now - pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) - require.NoError(t, err) - require.Equal(t, 0, len(pending)) + kit.CircuitBreaker(t, "clear mempool", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + return len(pending) == 0 + }) - time.Sleep(2 * blockTime) + // mine a couple of blocks + ens.BeginMining(blockTime) + time.Sleep(5 * blockTime) - // waiting for the message should produce nothing + // make sure that the cleared message wasn't picked up and mined _, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true) require.Error(t, err) } @@ -215,7 +244,7 @@ func TestMemPoolBatchPush(t *testing.T) { ctx := context.Background() const blockTime = 100 * time.Millisecond firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) + ens.InterconnectAll() kit.QuietMiningLogs() sender := firstNode.DefaultKey.Address @@ -252,14 +281,20 @@ func TestMemPoolBatchPush(t *testing.T) { require.NoError(t, err) // check pending messages for address - msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) - require.NoError(t, err) - require.Equal(t, totalMessages, len(msgStatuses)) - for _, msgStatusList := range msgStatuses { - for _, status := range msgStatusList { - require.True(t, status.OK) + kit.CircuitBreaker(t, "batch push", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true } - } + return false + }) // verify messages should be the ones included in the next block selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) @@ -275,19 +310,24 @@ func TestMemPoolBatchPush(t *testing.T) { require.True(t, found) } - time.Sleep(10 * blockTime) - - // pool pending list should be empty - pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) - require.NoError(t, err) - require.Equal(t, 0, len(pending)) + ens.BeginMining(blockTime) - // all messages should be added to the chain - for _, lookMsg := range sms { - msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) require.NoError(t, err) - require.NotNil(t, msgLookup) - } + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) } func TestMemPoolPushSingleNodeUntrusted(t *testing.T) { @@ -298,7 +338,7 @@ func TestMemPoolPushSingleNodeUntrusted(t *testing.T) { ctx := context.Background() const blockTime = 100 * time.Millisecond firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blockTime) + ens.InterconnectAll() kit.QuietMiningLogs() sender := firstNode.DefaultKey.Address @@ -336,14 +376,20 @@ func TestMemPoolPushSingleNodeUntrusted(t *testing.T) { sms = append(sms, signedMessage) } - // check pending messages for address - msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) - require.Equal(t, totalMessages, len(msgStatuses)) - for _, msgStatusList := range msgStatuses { - for _, status := range msgStatusList { - require.True(t, status.OK) + kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + // check pending messages for address + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true } - } + return false + }) // verify messages should be the ones included in the next block selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) @@ -358,19 +404,25 @@ func TestMemPoolPushSingleNodeUntrusted(t *testing.T) { require.True(t, found) } - time.Sleep(10 * blockTime) - - // pool pending list should be empty - pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) - require.NoError(t, err) - require.Equal(t, 0, len(pending)) + ens.BeginMining(blockTime) - // all messages should be added to the chain - for _, lookMsg := range sms { - msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) require.NoError(t, err) - require.NotNil(t, msgLookup) - } + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) + } func TestMemPoolBatchPushUntrusted(t *testing.T) { @@ -418,8 +470,7 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) { require.NoError(t, err) // check pending messages for address, wait until they are all pushed - timeout := time.After(time.Second * 10) - for { + kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) require.NoError(t, err) @@ -429,17 +480,10 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) { require.True(t, status.OK) } } - break + return true } - - select { - case <-timeout: - t.Fatal("waiting for batch push timed out") - default: - fmt.Printf("waiting for %d more messages to be pushed\n", len(msgStatuses)-totalMessages) - time.Sleep(time.Millisecond * 100) - } - } + return false + }) // verify messages should be the ones included in the next block selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) @@ -457,10 +501,8 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) { ens.BeginMining(blockTime) - // wait until pending messages are mined - timeout = time.After(time.Second * 10) - for { - // pool pending list should be empty + // wait until pending messages are mined, pool pending list should be empty + kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) require.NoError(t, err) @@ -471,15 +513,9 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) { require.NoError(t, err) require.NotNil(t, msgLookup) } - break + return true } + return false + }) - select { - case <-timeout: - t.Fatal("waiting for pending messages to be mined timed out") - default: - fmt.Printf("waiting for %d more messages to be mined\n", len(pending)) - time.Sleep(time.Millisecond * 100) - } - } }