Skip to content

Commit

Permalink
Merge pull request #2850 from OffchainLabs/fix-expresslaneservice-txp…
Browse files Browse the repository at this point in the history
…rocessing

Fix processing of transactions in expressLaneService
  • Loading branch information
ganeshvanahalli authored Dec 30, 2024
2 parents a9bbd0b + 3b96d19 commit 2ac7fea
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 75 deletions.
108 changes: 69 additions & 39 deletions execution/gethexec/express_lane_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,28 @@ type expressLaneControl struct {
}

type transactionPublisher interface {
PublishTimeboostedTransaction(context.Context, *types.Transaction, *arbitrum_types.ConditionalOptions) error
PublishTimeboostedTransaction(context.Context, *types.Transaction, *arbitrum_types.ConditionalOptions, chan struct{}) error
}

type msgAndResult struct {
msg *timeboost.ExpressLaneSubmission
resultChan chan error
}

type expressLaneService struct {
stopwaiter.StopWaiter
sync.RWMutex
transactionPublisher transactionPublisher
auctionContractAddr common.Address
apiBackend *arbitrum.APIBackend
roundTimingInfo timeboost.RoundTimingInfo
earlySubmissionGrace time.Duration
chainConfig *params.ChainConfig
logs chan []*types.Log
auctionContract *express_lane_auctiongen.ExpressLaneAuction
roundControl *lru.Cache[uint64, *expressLaneControl] // thread safe
messagesBySequenceNumber map[uint64]*timeboost.ExpressLaneSubmission
transactionPublisher transactionPublisher
auctionContractAddr common.Address
apiBackend *arbitrum.APIBackend
roundTimingInfo timeboost.RoundTimingInfo
earlySubmissionGrace time.Duration
txQueueTimeout time.Duration
chainConfig *params.ChainConfig
logs chan []*types.Log
auctionContract *express_lane_auctiongen.ExpressLaneAuction
roundControl *lru.Cache[uint64, *expressLaneControl] // thread safe
msgAndResultBySequenceNumber map[uint64]*msgAndResult
}

type contractAdapter struct {
Expand Down Expand Up @@ -127,6 +133,7 @@ func newExpressLaneService(
auctionContractAddr common.Address,
bc *core.BlockChain,
earlySubmissionGrace time.Duration,
txQueueTimeout time.Duration,
) (*expressLaneService, error) {
chainConfig := bc.Config()

Expand Down Expand Up @@ -158,16 +165,17 @@ pending:
}

return &expressLaneService{
transactionPublisher: transactionPublisher,
auctionContract: auctionContract,
apiBackend: apiBackend,
chainConfig: chainConfig,
roundTimingInfo: *roundTimingInfo,
earlySubmissionGrace: earlySubmissionGrace,
roundControl: lru.NewCache[uint64, *expressLaneControl](8), // Keep 8 rounds cached.
auctionContractAddr: auctionContractAddr,
logs: make(chan []*types.Log, 10_000),
messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission),
transactionPublisher: transactionPublisher,
auctionContract: auctionContract,
apiBackend: apiBackend,
chainConfig: chainConfig,
roundTimingInfo: *roundTimingInfo,
earlySubmissionGrace: earlySubmissionGrace,
roundControl: lru.NewCache[uint64, *expressLaneControl](8), // Keep 8 rounds cached.
auctionContractAddr: auctionContractAddr,
logs: make(chan []*types.Log, 10_000),
msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult),
txQueueTimeout: txQueueTimeout,
}, nil
}

Expand Down Expand Up @@ -204,7 +212,7 @@ func (es *expressLaneService) Start(ctxIn context.Context) {
)
es.Lock()
// Reset the sequence numbers map for the new round.
es.messagesBySequenceNumber = make(map[uint64]*timeboost.ExpressLaneSubmission)
es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult)
es.Unlock()
}
}
Expand Down Expand Up @@ -296,14 +304,14 @@ func (es *expressLaneService) Start(ctxIn context.Context) {

es.Lock()
// Changes to roundControl by itself are atomic but we need to udpate both roundControl
// and messagesBySequenceNumber atomically here.
// and msgAndResultBySequenceNumber atomically here.
es.roundControl.Add(round, &expressLaneControl{
controller: setExpressLaneIterator.Event.NewExpressLaneController,
sequence: 0,
})
// Since the sequence number for this round has been reset to zero, the map of messages
// by sequence number must be reset otherwise old messages would be replayed.
es.messagesBySequenceNumber = make(map[uint64]*timeboost.ExpressLaneSubmission)
es.msgAndResultBySequenceNumber = make(map[uint64]*msgAndResult)
es.Unlock()
}
fromBlock = toBlock
Expand All @@ -326,10 +334,15 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
ctx context.Context,
msg *timeboost.ExpressLaneSubmission,
) error {
unlockByDefer := true
es.Lock()
defer es.Unlock()
defer func() {
if unlockByDefer {
es.Unlock()
}
}()
// Although access to roundControl by itself is thread-safe, when the round control is transferred
// we need to reset roundControl and messagesBySequenceNumber atomically, so the following access
// we need to reset roundControl and msgAndResultBySequenceNumber atomically, so the following access
// must be within the lock.
control, ok := es.roundControl.Get(msg.Round)
if !ok {
Expand All @@ -341,35 +354,52 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
return timeboost.ErrSequenceNumberTooLow
}
// Check if a duplicate submission exists already, and reject if so.
if _, exists := es.messagesBySequenceNumber[msg.SequenceNumber]; exists {
if _, exists := es.msgAndResultBySequenceNumber[msg.SequenceNumber]; exists {
return timeboost.ErrDuplicateSequenceNumber
}
// Log an informational warning if the message's sequence number is in the future.
if msg.SequenceNumber > control.sequence {
log.Info("Received express lane submission with future sequence number", "SequenceNumber", msg.SequenceNumber)
}
// Put into the sequence number map.
es.messagesBySequenceNumber[msg.SequenceNumber] = msg
resultChan := make(chan error, 1)
es.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan}

for {
now := time.Now()
for es.roundTimingInfo.RoundNumber() == msg.Round { // This check ensures that the controller for this round is not allowed to send transactions from msgAndResultBySequenceNumber map once the next round starts
// Get the next message in the sequence.
nextMsg, exists := es.messagesBySequenceNumber[control.sequence]
nextMsgAndResult, exists := es.msgAndResultBySequenceNumber[control.sequence]
if !exists {
break
}
delete(es.messagesBySequenceNumber, nextMsg.SequenceNumber)
if err := es.transactionPublisher.PublishTimeboostedTransaction(
ctx,
nextMsg.Transaction,
nextMsg.Options,
); err != nil {
// If the tx fails we return an error with all the necessary info for the controller to successfully try again
return fmt.Errorf("express lane transaction of sequence number: %d and transaction hash: %v, failed with an error: %w", nextMsg.SequenceNumber, nextMsg.Transaction.Hash(), err)
}
delete(es.msgAndResultBySequenceNumber, nextMsgAndResult.msg.SequenceNumber)
txIsQueued := make(chan struct{})
es.LaunchThread(func(ctx context.Context) {
nextMsgAndResult.resultChan <- es.transactionPublisher.PublishTimeboostedTransaction(ctx, nextMsgAndResult.msg.Transaction, nextMsgAndResult.msg.Options, txIsQueued)
})
<-txIsQueued
// Increase the global round sequence number.
control.sequence += 1
}
es.roundControl.Add(msg.Round, control)
unlockByDefer = false
es.Unlock() // Release lock so that other timeboost txs can be processed

abortCtx, cancel := ctxWithTimeout(ctx, es.txQueueTimeout*2) // We use the same timeout value that sequencer imposes
defer cancel()
var err error
select {
case err = <-resultChan:
case <-abortCtx.Done():
if ctx.Err() == nil {
log.Warn("Transaction sequencing hit abort deadline", "err", abortCtx.Err(), "submittedAt", now, "TxProcessingTimeout", es.txQueueTimeout*2, "txHash", msg.Transaction.Hash())
}
err = fmt.Errorf("Transaction sequencing hit timeout, result for the submitted transaction is not yet available: %w", abortCtx.Err())
}
if err != nil {
// If the tx fails we return an error with all the necessary info for the controller
return fmt.Errorf("%w: Sequence number: %d (consumed), Transaction hash: %v, Error: %w", timeboost.ErrAcceptedTxFailed, msg.SequenceNumber, msg.Transaction.Hash(), err)
}
return nil
}

Expand Down
100 changes: 73 additions & 27 deletions execution/gethexec/express_lane_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -299,7 +300,8 @@ func makeStubPublisher(els *expressLaneService) *stubPublisher {

var emptyTx = types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), nil)

func (s *stubPublisher) PublishTimeboostedTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions) error {
func (s *stubPublisher) PublishTimeboostedTransaction(parentCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, txIsQueuedNotifier chan struct{}) error {
defer close(txIsQueuedNotifier)
if tx.Hash() != emptyTx.Hash() {
return errors.New("oops, bad tx")
}
Expand All @@ -313,9 +315,10 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_nonceTooLow(t *testin
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
els := &expressLaneService{
messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission),
roundControl: lru.NewCache[uint64, *expressLaneControl](8),
msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult),
roundControl: lru.NewCache[uint64, *expressLaneControl](8),
}
els.StopWaiter.Start(ctx, els)
stubPublisher := makeStubPublisher(els)
els.transactionPublisher = stubPublisher
els.roundControl.Add(0, &expressLaneControl{
Expand All @@ -333,34 +336,54 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
els := &expressLaneService{
roundControl: lru.NewCache[uint64, *expressLaneControl](8),
messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission),
roundControl: lru.NewCache[uint64, *expressLaneControl](8),
msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
}
els.StopWaiter.Start(ctx, els)
stubPublisher := makeStubPublisher(els)
els.transactionPublisher = stubPublisher
els.roundControl.Add(0, &expressLaneControl{
sequence: 1,
})
msg := &timeboost.ExpressLaneSubmission{
SequenceNumber: 2,
Transaction: types.NewTx(&types.DynamicFeeTx{Data: []byte{1}}),
}
err := els.sequenceExpressLaneSubmission(ctx, msg)
require.NoError(t, err)
// Because the message is for a future sequence number, it
// should get queued, but not yet published.
require.Equal(t, 0, len(stubPublisher.publishedTxOrder))
// Sending it again should give us an error.
err = els.sequenceExpressLaneSubmission(ctx, msg)
require.ErrorIs(t, err, timeboost.ErrDuplicateSequenceNumber)
var wg sync.WaitGroup
wg.Add(3) // We expect only of the below two to return with an error here
var err1, err2 error
go func(w *sync.WaitGroup) {
w.Done()
err1 = els.sequenceExpressLaneSubmission(ctx, msg)
wg.Done()
}(&wg)
go func(w *sync.WaitGroup) {
w.Done()
err2 = els.sequenceExpressLaneSubmission(ctx, msg)
wg.Done()
}(&wg)
wg.Wait()
if err1 != nil && err2 != nil || err1 == nil && err2 == nil {
t.Fatalf("cannot have err1 and err2 both nil or non-nil. err1: %v, err2: %v", err1, err2)
}
if err1 != nil {
require.ErrorIs(t, err1, timeboost.ErrDuplicateSequenceNumber)
} else {
require.ErrorIs(t, err2, timeboost.ErrDuplicateSequenceNumber)
}
wg.Add(1) // As the goroutine that's still running will call wg.Done() after the test ends
}

func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
els := &expressLaneService{
roundControl: lru.NewCache[uint64, *expressLaneControl](8),
messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission),
roundControl: lru.NewCache[uint64, *expressLaneControl](8),
msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
}
els.StopWaiter.Start(ctx, els)
stubPublisher := makeStubPublisher(els)
els.transactionPublisher = stubPublisher

Expand All @@ -371,7 +394,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing
messages := []*timeboost.ExpressLaneSubmission{
{
SequenceNumber: 10,
Transaction: emptyTx,
Transaction: types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1}),
},
{
SequenceNumber: 5,
Expand All @@ -390,26 +413,48 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing
Transaction: emptyTx,
},
}
// We launch 5 goroutines out of which 2 would return with a result hence we initially add a delta of 7
var wg sync.WaitGroup
wg.Add(7)
for _, msg := range messages {
err := els.sequenceExpressLaneSubmission(ctx, msg)
require.NoError(t, err)
go func(w *sync.WaitGroup) {
w.Done()
err := els.sequenceExpressLaneSubmission(ctx, msg)
if msg.SequenceNumber != 10 { // Because this go-routine will be interrupted after the test itself ends and 10 will still be waiting for result
require.NoError(t, err)
w.Done()
}
}(&wg)
}
wg.Wait()

// We should have only published 2, as we are missing sequence number 3.
time.Sleep(2 * time.Second)
require.Equal(t, 2, len(stubPublisher.publishedTxOrder))
require.Equal(t, 3, len(els.messagesBySequenceNumber)) // Processed txs are deleted
els.Lock()
require.Equal(t, 3, len(els.msgAndResultBySequenceNumber)) // Processed txs are deleted
els.Unlock()

wg.Add(2) // 4 & 5 should be able to get in after 3 so we add a delta of 2
err := els.sequenceExpressLaneSubmission(ctx, &timeboost.ExpressLaneSubmission{SequenceNumber: 3, Transaction: emptyTx})
require.NoError(t, err)
wg.Wait()
require.Equal(t, 5, len(stubPublisher.publishedTxOrder))

els.Lock()
require.Equal(t, 1, len(els.msgAndResultBySequenceNumber)) // Tx with seq num 10 should still be present
els.Unlock()
}

func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
els := &expressLaneService{
roundControl: lru.NewCache[uint64, *expressLaneControl](8),
messagesBySequenceNumber: make(map[uint64]*timeboost.ExpressLaneSubmission),
roundControl: lru.NewCache[uint64, *expressLaneControl](8),
msgAndResultBySequenceNumber: make(map[uint64]*msgAndResult),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
}
els.StopWaiter.Start(ctx, els)
els.roundControl.Add(0, &expressLaneControl{
sequence: 1,
})
Expand All @@ -421,16 +466,16 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.
SequenceNumber: 1,
Transaction: emptyTx,
},
{
SequenceNumber: 3,
Transaction: emptyTx,
},
{
SequenceNumber: 2,
Transaction: types.NewTransaction(0, common.MaxAddress, big.NewInt(0), 0, big.NewInt(0), []byte{1}),
},
{
SequenceNumber: 2,
SequenceNumber: 3,
Transaction: emptyTx,
},
{
SequenceNumber: 4,
Transaction: emptyTx,
},
}
Expand All @@ -444,8 +489,9 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.
}
}
// One tx out of the four should have failed, so we should have only published 3.
// Since sequence number 2 failed after submission stage, that nonce is used up
require.Equal(t, 3, len(stubPublisher.publishedTxOrder))
require.Equal(t, []uint64{1, 2, 3}, stubPublisher.publishedTxOrder)
require.Equal(t, []uint64{1, 3, 4}, stubPublisher.publishedTxOrder)
}

// TODO this test is just for RoundTimingInfo
Expand Down
Loading

0 comments on commit 2ac7fea

Please sign in to comment.