From a719c5c92368f6fb4f6b7931125358e77b3fcbda Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli <gvanahalli@offchainlabs.com> Date: Fri, 20 Dec 2024 15:38:12 -0600 Subject: [PATCH 1/8] Fix processing of transactions in expressLaneService --- execution/gethexec/express_lane_service.go | 108 +++++++++++------- .../gethexec/express_lane_service_test.go | 72 ++++++++---- execution/gethexec/sequencer.go | 22 +++- system_tests/timeboost_test.go | 7 ++ timeboost/errors.go | 1 + 5 files changed, 144 insertions(+), 66 deletions(-) diff --git a/execution/gethexec/express_lane_service.go b/execution/gethexec/express_lane_service.go index c981160814..d13f7daacc 100644 --- a/execution/gethexec/express_lane_service.go +++ b/execution/gethexec/express_lane_service.go @@ -39,22 +39,28 @@ type expressLaneControl struct { } type transactionPublisher interface { - PublishTimeboostedTransaction(context.Context, *types.Transaction, *arbitrum_types.ConditionalOptions) error + PublishTimeboostedTransaction(context.Context, *types.Transaction, *arbitrum_types.ConditionalOptions, chan struct{}) error +} + +type msgAndResult struct { + msg *timeboost.ExpressLaneSubmission + resultChan chan error } type expressLaneService struct { stopwaiter.StopWaiter sync.RWMutex - transactionPublisher transactionPublisher - auctionContractAddr common.Address - apiBackend *arbitrum.APIBackend - roundTimingInfo timeboost.RoundTimingInfo - earlySubmissionGrace time.Duration - chainConfig *params.ChainConfig - logs chan []*types.Log - auctionContract *express_lane_auctiongen.ExpressLaneAuction - roundControl *lru.Cache[uint64, *expressLaneControl] // thread safe - messagesBySequenceNumber map[uint64]*timeboost.ExpressLaneSubmission + transactionPublisher transactionPublisher + auctionContractAddr common.Address + apiBackend *arbitrum.APIBackend + roundTimingInfo timeboost.RoundTimingInfo + earlySubmissionGrace time.Duration + txQueueTimeout time.Duration + chainConfig *params.ChainConfig + logs chan []*types.Log + auctionContract *express_lane_auctiongen.ExpressLaneAuction + roundControl *lru.Cache[uint64, *expressLaneControl] // thread safe + msgAndResultBySequenceNumber map[uint64]*msgAndResult } type contractAdapter struct { @@ -127,6 +133,7 @@ func newExpressLaneService( auctionContractAddr common.Address, bc *core.BlockChain, earlySubmissionGrace time.Duration, + txQueueTimeout time.Duration, ) (*expressLaneService, error) { chainConfig := bc.Config() @@ -158,16 +165,17 @@ pending: } return &expressLaneService{ - transactionPublisher: transactionPublisher, - auctionContract: auctionContract, - apiBackend: apiBackend, - chainConfig: chainConfig, - roundTimingInfo: *roundTimingInfo, - earlySubmissionGrace: earlySubmissionGrace, - roundControl: lru.NewCache[uint64, *expressLaneControl](8), // Keep 8 rounds cached. - auctionContractAddr: auctionContractAddr, - logs: make(chan []*types.Log, 10_000), - messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission), + transactionPublisher: transactionPublisher, + auctionContract: auctionContract, + apiBackend: apiBackend, + chainConfig: chainConfig, + roundTimingInfo: *roundTimingInfo, + earlySubmissionGrace: earlySubmissionGrace, + roundControl: lru.NewCache[uint64, *expressLaneControl](8), // Keep 8 rounds cached. + auctionContractAddr: auctionContractAddr, + logs: make(chan []*types.Log, 10_000), + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + txQueueTimeout: txQueueTimeout, }, nil } @@ -204,7 +212,7 @@ func (es *expressLaneService) Start(ctxIn context.Context) { ) es.Lock() // Reset the sequence numbers map for the new round. - es.messagesBySequenceNumber = make(map[uint64]*timeboost.ExpressLaneSubmission) + es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult) es.Unlock() } } @@ -296,14 +304,14 @@ func (es *expressLaneService) Start(ctxIn context.Context) { es.Lock() // Changes to roundControl by itself are atomic but we need to udpate both roundControl - // and messagesBySequenceNumber atomically here. + // and msgAndResultBySequenceNumber atomically here. es.roundControl.Add(round, &expressLaneControl{ controller: setExpressLaneIterator.Event.NewExpressLaneController, sequence: 0, }) // Since the sequence number for this round has been reset to zero, the map of messages // by sequence number must be reset otherwise old messages would be replayed. - es.messagesBySequenceNumber = make(map[uint64]*timeboost.ExpressLaneSubmission) + es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult) es.Unlock() } fromBlock = toBlock @@ -326,10 +334,15 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( ctx context.Context, msg *timeboost.ExpressLaneSubmission, ) error { + unlockByDefer := true es.Lock() - defer es.Unlock() + defer func() { + if unlockByDefer { + es.Unlock() + } + }() // Although access to roundControl by itself is thread-safe, when the round control is transferred - // we need to reset roundControl and messagesBySequenceNumber atomically, so the following access + // we need to reset roundControl and msgAndResultBySequenceNumber atomically, so the following access // must be within the lock. control, ok := es.roundControl.Get(msg.Round) if !ok { @@ -341,7 +354,7 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( return timeboost.ErrSequenceNumberTooLow } // Check if a duplicate submission exists already, and reject if so. - if _, exists := es.messagesBySequenceNumber[msg.SequenceNumber]; exists { + if _, exists := es.msgAndResultBySequenceNumber[msg.SequenceNumber]; exists { return timeboost.ErrDuplicateSequenceNumber } // Log an informational warning if the message's sequence number is in the future. @@ -349,27 +362,44 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( log.Info("Received express lane submission with future sequence number", "SequenceNumber", msg.SequenceNumber) } // Put into the sequence number map. - es.messagesBySequenceNumber[msg.SequenceNumber] = msg + resultChan := make(chan error, 1) + es.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan} - for { + now := time.Now() + for es.roundTimingInfo.RoundNumber() == msg.Round { // This is an important check that mitigates a security concern // Get the next message in the sequence. - nextMsg, exists := es.messagesBySequenceNumber[control.sequence] + nextMsgAndResult, exists := es.msgAndResultBySequenceNumber[control.sequence] if !exists { break } - delete(es.messagesBySequenceNumber, nextMsg.SequenceNumber) - if err := es.transactionPublisher.PublishTimeboostedTransaction( - ctx, - nextMsg.Transaction, - msg.Options, - ); err != nil { - // If the tx fails we return an error with all the necessary info for the controller to successfully try again - return fmt.Errorf("express lane transaction of sequence number: %d and transaction hash: %v, failed with an error: %w", nextMsg.SequenceNumber, nextMsg.Transaction.Hash(), err) - } + delete(es.msgAndResultBySequenceNumber, nextMsgAndResult.msg.SequenceNumber) + txIsQueued := make(chan struct{}) + es.LaunchThread(func(ctx context.Context) { + nextMsgAndResult.resultChan <- es.transactionPublisher.PublishTimeboostedTransaction(ctx, nextMsgAndResult.msg.Transaction, nextMsgAndResult.msg.Options, txIsQueued) + }) + <-txIsQueued // Increase the global round sequence number. control.sequence += 1 } es.roundControl.Add(msg.Round, control) + unlockByDefer = false + es.Unlock() // Release lock so that other timeboost txs can be processed + + abortCtx, cancel := ctxWithTimeout(ctx, es.txQueueTimeout*2) // We use the same timeout value that sequencer imposes + defer cancel() + var err error + select { + case err = <-resultChan: + case <-abortCtx.Done(): + if ctx.Err() == nil { + log.Warn("Transaction sequencing hit abort deadline", "err", abortCtx.Err(), "submittedAt", now, "TxProcessingTimeout", es.txQueueTimeout*2, "txHash", msg.Transaction.Hash()) + } + err = fmt.Errorf("Transaction sequencing hit timeout: %w", abortCtx.Err()) + } + if err != nil { + // If the tx fails we return an error with all the necessary info for the controller + return fmt.Errorf("%w: Sequence number: %d, Transaction hash: %v, Error: %w", timeboost.ErrAcceptedTxFailed, msg.SequenceNumber, msg.Transaction.Hash(), err) + } return nil } diff --git a/execution/gethexec/express_lane_service_test.go b/execution/gethexec/express_lane_service_test.go index 0c69c341a0..0053e340e8 100644 --- a/execution/gethexec/express_lane_service_test.go +++ b/execution/gethexec/express_lane_service_test.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "math/big" + "sync" "testing" "time" @@ -297,8 +298,9 @@ func makeStubPublisher(els *expressLaneService) *stubPublisher { } } -func (s *stubPublisher) PublishTimeboostedTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { - if tx == nil { +func (s *stubPublisher) PublishTimeboostedTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, txIsQueuedNotifier chan struct{}) error { + defer close(txIsQueuedNotifier) + if tx.CalldataUnits != 0 { return errors.New("oops, bad tx") } control, _ := s.els.roundControl.Get(0) @@ -311,9 +313,10 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_nonceTooLow(t *testin ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission), - roundControl: lru.NewCache[uint64, *expressLaneControl](8), + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + roundControl: lru.NewCache[uint64, *expressLaneControl](8), } + els.StopWaiter.Start(ctx, els) stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher els.roundControl.Add(0, &expressLaneControl{ @@ -331,9 +334,11 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission), + roundControl: lru.NewCache[uint64, *expressLaneControl](8), + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), } + els.StopWaiter.Start(ctx, els) stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher els.roundControl.Add(0, &expressLaneControl{ @@ -342,13 +347,15 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes msg := &timeboost.ExpressLaneSubmission{ SequenceNumber: 2, } - err := els.sequenceExpressLaneSubmission(ctx, msg) - require.NoError(t, err) + go func() { + _ = els.sequenceExpressLaneSubmission(ctx, msg) + }() + time.Sleep(time.Second) // wait for the above tx to go through // Because the message is for a future sequence number, it // should get queued, but not yet published. require.Equal(t, 0, len(stubPublisher.publishedTxOrder)) // Sending it again should give us an error. - err = els.sequenceExpressLaneSubmission(ctx, msg) + err := els.sequenceExpressLaneSubmission(ctx, msg) require.ErrorIs(t, err, timeboost.ErrDuplicateSequenceNumber) } @@ -356,9 +363,11 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission), + roundControl: lru.NewCache[uint64, *expressLaneControl](8), + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), } + els.StopWaiter.Start(ctx, els) stubPublisher := makeStubPublisher(els) els.transactionPublisher = stubPublisher @@ -388,26 +397,41 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing Transaction: &types.Transaction{}, }, } + var wg sync.WaitGroup + wg.Add(7) for _, msg := range messages { - err := els.sequenceExpressLaneSubmission(ctx, msg) - require.NoError(t, err) + go func(w *sync.WaitGroup) { + w.Done() + err := els.sequenceExpressLaneSubmission(ctx, msg) + require.NoError(t, err) + w.Done() + }(&wg) } + wg.Wait() + // We should have only published 2, as we are missing sequence number 3. + time.Sleep(2 * time.Second) require.Equal(t, 2, len(stubPublisher.publishedTxOrder)) - require.Equal(t, len(messages), len(els.messagesBySequenceNumber)) + require.Equal(t, 3, len(els.msgAndResultBySequenceNumber)) // Processed txs are deleted + wg.Add(2) // 4 & 5 should be able to get in after 3 err := els.sequenceExpressLaneSubmission(ctx, &timeboost.ExpressLaneSubmission{SequenceNumber: 3, Transaction: &types.Transaction{}}) require.NoError(t, err) + wg.Wait() require.Equal(t, 5, len(stubPublisher.publishedTxOrder)) + + require.Equal(t, 1, len(els.msgAndResultBySequenceNumber)) // Tx with seq num 10 should still be present } func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() els := &expressLaneService{ - roundControl: lru.NewCache[uint64, *expressLaneControl](8), - messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission), + roundControl: lru.NewCache[uint64, *expressLaneControl](8), + msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult), + roundTimingInfo: defaultTestRoundTimingInfo(time.Now()), } + els.StopWaiter.Start(ctx, els) els.roundControl.Add(0, &expressLaneControl{ sequence: 1, }) @@ -420,20 +444,21 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing. Transaction: &types.Transaction{}, }, { - SequenceNumber: 3, - Transaction: &types.Transaction{}, + SequenceNumber: 2, + Transaction: types.NewTx(&types.DynamicFeeTx{Data: []byte{1}}), }, { - SequenceNumber: 2, - Transaction: nil, + SequenceNumber: 3, + Transaction: &types.Transaction{}, }, { - SequenceNumber: 2, + SequenceNumber: 4, Transaction: &types.Transaction{}, }, } + messages[1].Transaction.CalldataUnits = 1 for _, msg := range messages { - if msg.Transaction == nil { + if msg.Transaction.CalldataUnits != 0 { err := els.sequenceExpressLaneSubmission(ctx, msg) require.ErrorContains(t, err, "oops, bad tx") } else { @@ -442,8 +467,9 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing. } } // One tx out of the four should have failed, so we should have only published 3. + // Since sequence number 2 failed after submission stage, that nonce is used up require.Equal(t, 3, len(stubPublisher.publishedTxOrder)) - require.Equal(t, []uint64{1, 2, 3}, stubPublisher.publishedTxOrder) + require.Equal(t, []uint64{1, 3, 4}, stubPublisher.publishedTxOrder) } // TODO this test is just for RoundTimingInfo diff --git a/execution/gethexec/sequencer.go b/execution/gethexec/sequencer.go index 98d6d1d480..bfd672d13d 100644 --- a/execution/gethexec/sequencer.go +++ b/execution/gethexec/sequencer.go @@ -461,14 +461,25 @@ func ctxWithTimeout(ctx context.Context, timeout time.Duration) (context.Context } func (s *Sequencer) PublishTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { - return s.publishTransactionImpl(parentCtx, tx, options, false /* delay tx if express lane is active */) + return s.publishTransactionImpl(parentCtx, tx, options, nil, false /* delay tx if express lane is active */) } -func (s *Sequencer) PublishTimeboostedTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error { - return s.publishTransactionImpl(parentCtx, tx, options, true) +func (s *Sequencer) PublishTimeboostedTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, txIsQueuedNotifier chan struct{}) error { + return s.publishTransactionImpl(parentCtx, tx, options, txIsQueuedNotifier, true) } -func (s *Sequencer) publishTransactionImpl(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, isExpressLaneController bool) error { +func (s *Sequencer) publishTransactionImpl(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, txIsQueuedNotifier chan struct{}, isExpressLaneController bool) error { + closeNotifier := func() { + if txIsQueuedNotifier != nil { + close(txIsQueuedNotifier) // Notifies express lane service to continue with next tx + } + } + closeByDefer := true + defer func() { + if closeByDefer { + closeNotifier() + } + }() config := s.config() // Only try to acquire Rlock and check for hard threshold if l1reader is not nil // And hard threshold was enabled, this prevents spamming of read locks when not needed @@ -539,6 +550,8 @@ func (s *Sequencer) publishTransactionImpl(parentCtx context.Context, tx *types. } select { case s.txQueue <- queueItem: + closeByDefer = false + closeNotifier() case <-queueCtx.Done(): return queueCtx.Err() } @@ -1299,6 +1312,7 @@ func (s *Sequencer) StartExpressLane( auctionContractAddr, s.execEngine.bc, earlySubmissionGrace, + s.config().QueueTimeout, ) if err != nil { log.Crit("Failed to create express lane service", "err", err, "auctionContractAddr", auctionContractAddr) diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index e8b9b57175..6e741ee874 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -8,6 +8,7 @@ import ( "net" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -283,6 +284,9 @@ func TestSequencerFeed_ExpressLaneAuction_InnerPayloadNoncesAreRespected(t *test if err2 == nil { t.Fatal("Charlie should not be able to send tx with nonce 1") } + if !strings.Contains(err2.Error(), timeboost.ErrAcceptedTxFailed.Error()) { + t.Fatal("Charlie's first tx should've consumed a sequence number as it was initially accepted") + } // After round is done, verify that Charlie beats Alice in the final sequence, and that the emitted txs // for Charlie are correct. aliceReceipt, err := seqClient.TransactionReceipt(ctx, aliceTx.Hash()) @@ -809,6 +813,9 @@ func (elc *expressLaneClient) SendTransaction(ctx context.Context, transaction * msg.Signature = signature promise := elc.sendExpressLaneRPC(msg) if _, err := promise.Await(ctx); err != nil { + if strings.Contains(err.Error(), timeboost.ErrAcceptedTxFailed.Error()) { + elc.sequence += 1 + } return err } elc.sequence += 1 diff --git a/timeboost/errors.go b/timeboost/errors.go index ef8dc2c8dc..1d55cdf201 100644 --- a/timeboost/errors.go +++ b/timeboost/errors.go @@ -16,4 +16,5 @@ var ( ErrDuplicateSequenceNumber = errors.New("SUBMISSION_NONCE_ALREADY_SEEN") ErrSequenceNumberTooLow = errors.New("SUBMISSION_NONCE_TOO_LOW") ErrTooManyBids = errors.New("PER_ROUND_BID_LIMIT_REACHED") + ErrAcceptedTxFailed = errors.New("Accepted timeboost tx failed") ) From 99671209ee35f7769fe5101964111065bc48fe20 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli <gvanahalli@offchainlabs.com> Date: Mon, 23 Dec 2024 10:07:20 -0600 Subject: [PATCH 2/8] fix lint errors --- system_tests/timeboost_test.go | 6 +++++- timeboost/bid_cache_test.go | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 6e741ee874..e103a13f4e 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -843,5 +843,9 @@ func getRandomPort(t testing.TB) int { listener, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) defer listener.Close() - return listener.Addr().(*net.TCPAddr).Port + tcpAddr, ok := listener.Addr().(*net.TCPAddr) + if !ok { + t.Fatalf("failed to cast listener address to *net.TCPAddr") + } + return tcpAddr.Port } diff --git a/timeboost/bid_cache_test.go b/timeboost/bid_cache_test.go index 8266fca202..b28d69dd1c 100644 --- a/timeboost/bid_cache_test.go +++ b/timeboost/bid_cache_test.go @@ -210,5 +210,9 @@ func getRandomPort(t testing.TB) int { listener, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) defer listener.Close() - return listener.Addr().(*net.TCPAddr).Port + tcpAddr, ok := listener.Addr().(*net.TCPAddr) + if !ok { + t.Fatalf("failed to cast listener address to *net.TCPAddr") + } + return tcpAddr.Port } From 72fb72a9f83c0988bbb62e984fa0bb8110b195bb Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli <gvanahalli@offchainlabs.com> Date: Mon, 23 Dec 2024 11:05:44 -0600 Subject: [PATCH 3/8] fix race in expresslaneservice_test file --- execution/gethexec/express_lane_service_test.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/execution/gethexec/express_lane_service_test.go b/execution/gethexec/express_lane_service_test.go index 0053e340e8..8ef699c768 100644 --- a/execution/gethexec/express_lane_service_test.go +++ b/execution/gethexec/express_lane_service_test.go @@ -346,6 +346,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes }) msg := &timeboost.ExpressLaneSubmission{ SequenceNumber: 2, + Transaction: types.NewTx(&types.DynamicFeeTx{Data: []byte{1}}), } go func() { _ = els.sequenceExpressLaneSubmission(ctx, msg) @@ -378,7 +379,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing messages := []*timeboost.ExpressLaneSubmission{ { SequenceNumber: 10, - Transaction: &types.Transaction{}, + Transaction: types.NewTx(&types.DynamicFeeTx{Data: []byte{1}}), }, { SequenceNumber: 5, @@ -403,8 +404,10 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing go func(w *sync.WaitGroup) { w.Done() err := els.sequenceExpressLaneSubmission(ctx, msg) - require.NoError(t, err) - w.Done() + if msg.SequenceNumber != 10 { // Because this go-routine will be interrupted after the test itself ends and 10 will still be waiting for result + require.NoError(t, err) + w.Done() + } }(&wg) } wg.Wait() @@ -412,7 +415,9 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing // We should have only published 2, as we are missing sequence number 3. time.Sleep(2 * time.Second) require.Equal(t, 2, len(stubPublisher.publishedTxOrder)) + els.Lock() require.Equal(t, 3, len(els.msgAndResultBySequenceNumber)) // Processed txs are deleted + els.Unlock() wg.Add(2) // 4 & 5 should be able to get in after 3 err := els.sequenceExpressLaneSubmission(ctx, &timeboost.ExpressLaneSubmission{SequenceNumber: 3, Transaction: &types.Transaction{}}) @@ -420,7 +425,9 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing wg.Wait() require.Equal(t, 5, len(stubPublisher.publishedTxOrder)) + els.Lock() require.Equal(t, 1, len(els.msgAndResultBySequenceNumber)) // Tx with seq num 10 should still be present + els.Unlock() } func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.T) { From a449e925947f870744a2945c6f7657e489fb9209 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli <gvanahalli@offchainlabs.com> Date: Mon, 23 Dec 2024 15:46:35 -0600 Subject: [PATCH 4/8] add a system test to test transaction handling --- system_tests/timeboost_test.go | 149 +++++++++++++++++++++++++++++++-- 1 file changed, 141 insertions(+), 8 deletions(-) diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index e103a13f4e..9fd6837dc3 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -19,6 +19,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" @@ -41,6 +42,134 @@ import ( "github.com/offchainlabs/nitro/util/stopwaiter" ) +func TestExpressLaneTransactionHandling(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpDir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(tmpDir)) + }) + jwtSecretPath := filepath.Join(tmpDir, "sequencer.jwt") + + seq, seqClient, seqInfo, auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, cleanupSeq := setupExpressLaneAuction(t, tmpDir, ctx, jwtSecretPath) + defer cleanupSeq() + + auctionContract, err := express_lane_auctiongen.NewExpressLaneAuction(auctionContractAddr, seqClient) + Require(t, err) + rawRoundTimingInfo, err := auctionContract.RoundTimingInfo(&bind.CallOpts{}) + Require(t, err) + roundTimingInfo, err := timeboost.NewRoundTimingInfo(rawRoundTimingInfo) + Require(t, err) + + placeBidsAndDecideWinner(t, ctx, seqClient, seqInfo, auctionContract, "Bob", "Alice", bobBidderClient, aliceBidderClient, roundDuration) + time.Sleep(roundTimingInfo.TimeTilNextRound()) + + chainId, err := seqClient.ChainID(ctx) + Require(t, err) + + // Prepare a client that can submit txs to the sequencer via the express lane. + bobPriv := seqInfo.Accounts["Bob"].PrivateKey + seqDial, err := rpc.Dial(seq.Stack.HTTPEndpoint()) + Require(t, err) + expressLaneClient := newExpressLaneClient( + bobPriv, + chainId, + *roundTimingInfo, + auctionContractAddr, + seqDial, + ) + expressLaneClient.Start(ctx) + + currNonce, err := seqClient.PendingNonceAt(ctx, seqInfo.GetAddress("Alice")) + Require(t, err) + seqInfo.GetInfoWithPrivKey("Alice").Nonce.Store(currNonce) + + // Send txs out of order + var txs types.Transactions + txs = append(txs, seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil)) // currNonce + txs = append(txs, seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil)) // currNonce + 1 + txs = append(txs, seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil)) // currNonce + 2 + + var wg sync.WaitGroup + wg.Add(2) + for i := uint64(2); i > 0; i-- { + go func(w *sync.WaitGroup) { + err := expressLaneClient.SendTransactionWithSequence(ctx, txs[i], i) + Require(t, err) + w.Done() + }(&wg) + } + + time.Sleep(time.Second) // Wait for both txs to be submitted + + // Send the first transaction which will unblock the future ones + err = expressLaneClient.SendTransactionWithSequence(ctx, txs[0], 0) // we'll wait for the result + Require(t, err) + + wg.Wait() // Make sure future sequence number txs that were sent earlier did not error + + var txReceipts types.Receipts + for _, tx := range txs { + receipt, err := seqClient.TransactionReceipt(ctx, tx.Hash()) + Require(t, err) + txReceipts = append(txReceipts, receipt) + } + + if !(txReceipts[0].BlockNumber.Cmp(txReceipts[1].BlockNumber) <= 0 && + txReceipts[1].BlockNumber.Cmp(txReceipts[2].BlockNumber) <= 0) { + t.Fatal("incorrect ordering of txs acceptance, lower sequence number txs should appear earlier block") + } + + if txReceipts[0].BlockNumber.Cmp(txReceipts[1].BlockNumber) == 0 && + txReceipts[0].TransactionIndex > txReceipts[1].TransactionIndex { + t.Fatal("incorrect ordering of txs in a block, lower sequence number txs should appear earlier") + } + + if txReceipts[1].BlockNumber.Cmp(txReceipts[2].BlockNumber) == 0 && + txReceipts[1].TransactionIndex > txReceipts[2].TransactionIndex { + t.Fatal("incorrect ordering of txs in a block, lower sequence number txs should appear earlier") + } + + // Test that failed txs are given responses + passTx := seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil) // currNonce + 3 + passTx2 := seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil) // currNonce + 4 + + seqInfo.GetInfoWithPrivKey("Alice").Nonce.Store(20) + failTx := seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil) + + currSeqNumber := uint64(3) + wg.Add(2) + var failErr error + go func(w *sync.WaitGroup) { + failErr = expressLaneClient.SendTransactionWithSequence(ctx, failTx, currSeqNumber+1) // Should give out nonce too high error + w.Done() + }(&wg) + + time.Sleep(time.Second) + + go func(w *sync.WaitGroup) { + err := expressLaneClient.SendTransactionWithSequence(ctx, passTx2, currSeqNumber+2) + Require(t, err) + w.Done() + }(&wg) + + err = expressLaneClient.SendTransactionWithSequence(ctx, passTx, currSeqNumber) + Require(t, err) + + wg.Wait() + + if failErr == nil { + t.Fatal("incorrect express lane tx didn't fail upon submission") + } + if !strings.Contains(failErr.Error(), timeboost.ErrAcceptedTxFailed.Error()) || // Should be an ErrAcceptedTxFailed error that would consume sequence number + !strings.Contains(failErr.Error(), core.ErrNonceTooHigh.Error()) { // Main error should be ErrNonceTooHigh + t.Fatalf("unexpected error string returned: %s", failErr.Error()) + } +} + func TestExpressLaneControlTransfer(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) @@ -783,9 +912,7 @@ func (elc *expressLaneClient) Start(ctxIn context.Context) { elc.StopWaiter.Start(ctxIn, elc) } -func (elc *expressLaneClient) SendTransaction(ctx context.Context, transaction *types.Transaction) error { - elc.Lock() - defer elc.Unlock() +func (elc *expressLaneClient) SendTransactionWithSequence(ctx context.Context, transaction *types.Transaction, seq uint64) error { encodedTx, err := transaction.MarshalBinary() if err != nil { return err @@ -795,7 +922,7 @@ func (elc *expressLaneClient) SendTransaction(ctx context.Context, transaction * Round: hexutil.Uint64(elc.roundTimingInfo.RoundNumber()), AuctionContractAddress: elc.auctionContractAddr, Transaction: encodedTx, - SequenceNumber: hexutil.Uint64(elc.sequence), + SequenceNumber: hexutil.Uint64(seq), Signature: hexutil.Bytes{}, } msgGo, err := timeboost.JsonSubmissionToGo(msg) @@ -813,15 +940,21 @@ func (elc *expressLaneClient) SendTransaction(ctx context.Context, transaction * msg.Signature = signature promise := elc.sendExpressLaneRPC(msg) if _, err := promise.Await(ctx); err != nil { - if strings.Contains(err.Error(), timeboost.ErrAcceptedTxFailed.Error()) { - elc.sequence += 1 - } return err } - elc.sequence += 1 return nil } +func (elc *expressLaneClient) SendTransaction(ctx context.Context, transaction *types.Transaction) error { + elc.Lock() + defer elc.Unlock() + err := elc.SendTransactionWithSequence(ctx, transaction, elc.sequence) + if err == nil || strings.Contains(err.Error(), timeboost.ErrAcceptedTxFailed.Error()) { + elc.sequence += 1 + } + return err +} + func (elc *expressLaneClient) sendExpressLaneRPC(msg *timeboost.JsonExpressLaneSubmission) containers.PromiseInterface[struct{}] { return stopwaiter.LaunchPromiseThread(elc, func(ctx context.Context) (struct{}, error) { err := elc.client.CallContext(ctx, nil, "timeboost_sendExpressLaneTransaction", msg) From 1837035fea5d78c5e4d553c8a61fafa556935da2 Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli <gvanahalli@offchainlabs.com> Date: Mon, 23 Dec 2024 16:06:27 -0600 Subject: [PATCH 5/8] fix race in TestBidValidatorAuctioneerRedisStream --- timeboost/auctioneer_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/timeboost/auctioneer_test.go b/timeboost/auctioneer_test.go index 3e5e24a829..855ec53687 100644 --- a/timeboost/auctioneer_test.go +++ b/timeboost/auctioneer_test.go @@ -153,7 +153,9 @@ func TestBidValidatorAuctioneerRedisStream(t *testing.T) { // We verify that the auctioneer has consumed all validated bids from the single Redis stream. // We also verify the top two bids are those we expect. + am.bidCache.Lock() require.Equal(t, 3, len(am.bidCache.bidsByExpressLaneControllerAddr)) + am.bidCache.Unlock() result := am.bidCache.topTwoBids() require.Equal(t, big.NewInt(7), result.firstPlace.Amount) // Best bid should be Charlie's last bid 7 require.Equal(t, charlieAddr, result.firstPlace.Bidder) From 68388dd1c44d026f71ec215a4b87291057d8860e Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli <gvanahalli@offchainlabs.com> Date: Thu, 26 Dec 2024 16:41:47 -0600 Subject: [PATCH 6/8] check that timed out express-lane tx returns an error --- system_tests/timeboost_test.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 9fd6837dc3..71a970c0ae 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -139,6 +139,7 @@ func TestExpressLaneTransactionHandling(t *testing.T) { seqInfo.GetInfoWithPrivKey("Alice").Nonce.Store(20) failTx := seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil) + failTxDueToTimeout := seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil) currSeqNumber := uint64(3) wg.Add(2) @@ -161,13 +162,25 @@ func TestExpressLaneTransactionHandling(t *testing.T) { wg.Wait() - if failErr == nil { - t.Fatal("incorrect express lane tx didn't fail upon submission") - } - if !strings.Contains(failErr.Error(), timeboost.ErrAcceptedTxFailed.Error()) || // Should be an ErrAcceptedTxFailed error that would consume sequence number - !strings.Contains(failErr.Error(), core.ErrNonceTooHigh.Error()) { // Main error should be ErrNonceTooHigh - t.Fatalf("unexpected error string returned: %s", failErr.Error()) + checkFailErr := func(reason string) { + if failErr == nil { + t.Fatal("incorrect express lane tx didn't fail upon submission") + } + if !strings.Contains(failErr.Error(), timeboost.ErrAcceptedTxFailed.Error()) || // Should be an ErrAcceptedTxFailed error that would consume sequence number + !strings.Contains(failErr.Error(), reason) { + t.Fatalf("unexpected error string returned: %s", failErr.Error()) + } } + checkFailErr(core.ErrNonceTooHigh.Error()) + + wg.Add(1) + go func(w *sync.WaitGroup) { + failErr = expressLaneClient.SendTransactionWithSequence(ctx, failTxDueToTimeout, currSeqNumber+4) // Should give out a tx aborted error as this tx is never processed + w.Done() + }(&wg) + wg.Wait() + + checkFailErr("Transaction sequencing hit timeout") } func TestExpressLaneControlTransfer(t *testing.T) { From e775ef20f9cf107f26dfba3d362c3ad7ee3f520c Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli <gvanahalli@offchainlabs.com> Date: Fri, 27 Dec 2024 18:19:31 -0600 Subject: [PATCH 7/8] test that express lane control switch happens seamlessly in extreme conditions --- system_tests/timeboost_test.go | 98 ++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 64d9b7b737..414a3f94dd 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -45,6 +45,104 @@ import ( "github.com/offchainlabs/nitro/util/stopwaiter" ) +func TestExpressLaneTransactionHandlingComplex(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpDir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(tmpDir)) + }) + jwtSecretPath := filepath.Join(tmpDir, "sequencer.jwt") + + seq, seqClient, seqInfo, auctionContractAddr, aliceBidderClient, bobBidderClient, roundDuration, cleanupSeq, _, cleanupFeedListener := setupExpressLaneAuction(t, tmpDir, ctx, jwtSecretPath) + defer cleanupSeq() + defer cleanupFeedListener() + + auctionContract, err := express_lane_auctiongen.NewExpressLaneAuction(auctionContractAddr, seqClient) + Require(t, err) + rawRoundTimingInfo, err := auctionContract.RoundTimingInfo(&bind.CallOpts{}) + Require(t, err) + roundTimingInfo, err := timeboost.NewRoundTimingInfo(rawRoundTimingInfo) + Require(t, err) + + // Prepare clients that can submit txs to the sequencer via the express lane. + chainId, err := seqClient.ChainID(ctx) + Require(t, err) + seqDial, err := rpc.Dial(seq.Stack.HTTPEndpoint()) + Require(t, err) + createExpressLaneClientFor := func(name string) (*expressLaneClient, bind.TransactOpts) { + priv := seqInfo.Accounts[name].PrivateKey + expressLaneClient := newExpressLaneClient( + priv, + chainId, + *roundTimingInfo, + auctionContractAddr, + seqDial, + ) + expressLaneClient.Start(ctx) + transacOpts := seqInfo.GetDefaultTransactOpts(name, ctx) + transacOpts.NoSend = true + return expressLaneClient, transacOpts + } + bobExpressLaneClient, _ := createExpressLaneClientFor("Bob") + aliceExpressLaneClient, _ := createExpressLaneClientFor("Alice") + + // Bob will win the auction and become controller for next round = x + placeBidsAndDecideWinner(t, ctx, seqClient, seqInfo, auctionContract, "Bob", "Alice", bobBidderClient, aliceBidderClient, roundDuration) + time.Sleep(roundTimingInfo.TimeTilNextRound()) + + // Check that Bob's tx gets priority since he's the controller + verifyControllerAdvantage(t, ctx, seqClient, bobExpressLaneClient, seqInfo, "Bob", "Alice") + + currNonce, err := seqClient.PendingNonceAt(ctx, seqInfo.GetAddress("Alice")) + Require(t, err) + seqInfo.GetInfoWithPrivKey("Alice").Nonce.Store(currNonce) + unblockingTx := seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil) + + bobExpressLaneClient.Lock() + currSeq := bobExpressLaneClient.sequence + bobExpressLaneClient.Unlock() + + // Send bunch of future txs so that they are queued up waiting for the unblocking seq num tx + var bobExpressLaneTxs types.Transactions + for i := currSeq + 1; i < 1000; i++ { + futureSeqTx := seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil) + bobExpressLaneTxs = append(bobExpressLaneTxs, futureSeqTx) + go func(tx *types.Transaction, seqNum uint64) { + err := bobExpressLaneClient.SendTransactionWithSequence(ctx, tx, seqNum) + t.Logf("got error for tx: hash-%s, seqNum-%d, err-%s", tx.Hash(), seqNum, err.Error()) + }(futureSeqTx, i) + } + + // Alice will win the auction for next round = x+1 + placeBidsAndDecideWinner(t, ctx, seqClient, seqInfo, auctionContract, "Alice", "Bob", aliceBidderClient, bobBidderClient, roundDuration) + + time.Sleep(roundTimingInfo.TimeTilNextRound() - 500*time.Millisecond) // we'll wait till the 1/2 second mark to the next round and then send the unblocking tx + + Require(t, bobExpressLaneClient.SendTransactionWithSequence(ctx, unblockingTx, currSeq)) // the unblockingTx itself should ideally pass, but the released 1000 txs shouldn't affect the round for which alice has won the bid for + + time.Sleep(500 * time.Millisecond) // Wait for controller change after the current round's end + + // Check that Alice's tx gets priority since she's the controller + verifyControllerAdvantage(t, ctx, seqClient, aliceExpressLaneClient, seqInfo, "Alice", "Bob") + + // Binary search and find how many of bob's futureSeqTxs were able to go through + s, f := 0, len(bobExpressLaneTxs) + for s < f { + m := (s + f + 1) / 2 + _, err := seqClient.TransactionReceipt(ctx, bobExpressLaneTxs[m].Hash()) + if err != nil { + f = m - 1 + } else { + s = m + } + } + t.Logf("%d of the total %d bob's pending txs were accepted", s+1, len(bobExpressLaneTxs)) +} + func TestExpressLaneTransactionHandling(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) From f17852cef79a56d116970483a4c40d688e9df2dd Mon Sep 17 00:00:00 2001 From: Ganesh Vanahalli <gvanahalli@offchainlabs.com> Date: Mon, 30 Dec 2024 09:26:21 -0600 Subject: [PATCH 8/8] address PR comments --- execution/gethexec/express_lane_service.go | 6 ++-- .../gethexec/express_lane_service_test.go | 36 +++++++++++++------ system_tests/timeboost_test.go | 4 +-- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/execution/gethexec/express_lane_service.go b/execution/gethexec/express_lane_service.go index d13f7daacc..a65c8bf268 100644 --- a/execution/gethexec/express_lane_service.go +++ b/execution/gethexec/express_lane_service.go @@ -366,7 +366,7 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( es.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan} now := time.Now() - for es.roundTimingInfo.RoundNumber() == msg.Round { // This is an important check that mitigates a security concern + for es.roundTimingInfo.RoundNumber() == msg.Round { // This check ensures that the controller for this round is not allowed to send transactions from msgAndResultBySequenceNumber map once the next round starts // Get the next message in the sequence. nextMsgAndResult, exists := es.msgAndResultBySequenceNumber[control.sequence] if !exists { @@ -394,11 +394,11 @@ func (es *expressLaneService) sequenceExpressLaneSubmission( if ctx.Err() == nil { log.Warn("Transaction sequencing hit abort deadline", "err", abortCtx.Err(), "submittedAt", now, "TxProcessingTimeout", es.txQueueTimeout*2, "txHash", msg.Transaction.Hash()) } - err = fmt.Errorf("Transaction sequencing hit timeout: %w", abortCtx.Err()) + err = fmt.Errorf("Transaction sequencing hit timeout, result for the submitted transaction is not yet available: %w", abortCtx.Err()) } if err != nil { // If the tx fails we return an error with all the necessary info for the controller - return fmt.Errorf("%w: Sequence number: %d, Transaction hash: %v, Error: %w", timeboost.ErrAcceptedTxFailed, msg.SequenceNumber, msg.Transaction.Hash(), err) + return fmt.Errorf("%w: Sequence number: %d (consumed), Transaction hash: %v, Error: %w", timeboost.ErrAcceptedTxFailed, msg.SequenceNumber, msg.Transaction.Hash(), err) } return nil } diff --git a/execution/gethexec/express_lane_service_test.go b/execution/gethexec/express_lane_service_test.go index 07ef45e0ae..0c52272322 100644 --- a/execution/gethexec/express_lane_service_test.go +++ b/execution/gethexec/express_lane_service_test.go @@ -350,16 +350,29 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes SequenceNumber: 2, Transaction: types.NewTx(&types.DynamicFeeTx{Data: []byte{1}}), } - go func() { - _ = els.sequenceExpressLaneSubmission(ctx, msg) - }() - time.Sleep(time.Second) // wait for the above tx to go through - // Because the message is for a future sequence number, it - // should get queued, but not yet published. - require.Equal(t, 0, len(stubPublisher.publishedTxOrder)) - // Sending it again should give us an error. - err := els.sequenceExpressLaneSubmission(ctx, msg) - require.ErrorIs(t, err, timeboost.ErrDuplicateSequenceNumber) + var wg sync.WaitGroup + wg.Add(3) // We expect only of the below two to return with an error here + var err1, err2 error + go func(w *sync.WaitGroup) { + w.Done() + err1 = els.sequenceExpressLaneSubmission(ctx, msg) + wg.Done() + }(&wg) + go func(w *sync.WaitGroup) { + w.Done() + err2 = els.sequenceExpressLaneSubmission(ctx, msg) + wg.Done() + }(&wg) + wg.Wait() + if err1 != nil && err2 != nil || err1 == nil && err2 == nil { + t.Fatalf("cannot have err1 and err2 both nil or non-nil. err1: %v, err2: %v", err1, err2) + } + if err1 != nil { + require.ErrorIs(t, err1, timeboost.ErrDuplicateSequenceNumber) + } else { + require.ErrorIs(t, err2, timeboost.ErrDuplicateSequenceNumber) + } + wg.Add(1) // As the goroutine that's still running will call wg.Done() after the test ends } func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing.T) { @@ -400,6 +413,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing Transaction: emptyTx, }, } + // We launch 5 goroutines out of which 2 would return with a result hence we initially add a delta of 7 var wg sync.WaitGroup wg.Add(7) for _, msg := range messages { @@ -421,7 +435,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing require.Equal(t, 3, len(els.msgAndResultBySequenceNumber)) // Processed txs are deleted els.Unlock() - wg.Add(2) // 4 & 5 should be able to get in after 3 + wg.Add(2) // 4 & 5 should be able to get in after 3 so we add a delta of 2 err := els.sequenceExpressLaneSubmission(ctx, &timeboost.ExpressLaneSubmission{SequenceNumber: 3, Transaction: emptyTx}) require.NoError(t, err) wg.Wait() diff --git a/system_tests/timeboost_test.go b/system_tests/timeboost_test.go index 061840f47f..81b4613077 100644 --- a/system_tests/timeboost_test.go +++ b/system_tests/timeboost_test.go @@ -198,7 +198,7 @@ func TestExpressLaneTransactionHandling(t *testing.T) { txs = append(txs, seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil)) // currNonce + 2 var wg sync.WaitGroup - wg.Add(2) + wg.Add(2) // We send two txs in out of order for i := uint64(2); i > 0; i-- { go func(w *sync.WaitGroup) { err := expressLaneClient.SendTransactionWithSequence(ctx, txs[i], i) @@ -246,7 +246,7 @@ func TestExpressLaneTransactionHandling(t *testing.T) { failTxDueToTimeout := seqInfo.PrepareTx("Alice", "Owner", seqInfo.TransferGas, big.NewInt(1), nil) currSeqNumber := uint64(3) - wg.Add(2) + wg.Add(2) // We send a failing and a passing tx with cummulative future seq numbers, followed by a unblocking seq num tx var failErr error go func(w *sync.WaitGroup) { failErr = expressLaneClient.SendTransactionWithSequence(ctx, failTx, currSeqNumber+1) // Should give out nonce too high error