From f6f4e82d2c18f96f37acf0990d11d0e9ffd625fa Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 23 Mar 2022 16:36:02 +0100 Subject: [PATCH] Use an unsorted slice --- .../sequence/account_tx_submission_queue.go | 119 +++++++----------- .../account_tx_submission_queue_test.go | 41 +----- 2 files changed, 52 insertions(+), 108 deletions(-) diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go index a9f4e450f8..03be11e92d 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go @@ -17,7 +17,14 @@ type AccountTxSubmissionQueue struct { lastActiveAt time.Time timeout time.Duration lastSeenAccountSequence uint64 - queue submissionQueue + transactions []txToSubmit +} + +// txToSubmit represents a transaction being tracked by the queue +type txToSubmit struct { + minAccSeqNum uint64 // minimum account sequence required to send the transaction + maxAccSeqNum uint64 // maximum account sequence required to send the transaction + notifyBackChan chan error // submission notification channel } // NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue @@ -25,14 +32,13 @@ func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue { result := &AccountTxSubmissionQueue{ lastActiveAt: time.Now(), timeout: 10 * time.Second, - queue: nil, } return result } // Size returns the count of currently buffered submissions in the queue. func (q *AccountTxSubmissionQueue) Size() int { - return len(q.queue) + return len(q.transactions) } // Push enqueues the intent to submit a transaction at the provided sequence @@ -49,17 +55,17 @@ func (q *AccountTxSubmissionQueue) Size() int { // 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if // possible func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error { - ch := make(chan error, 1) // From CAP 21: If minSeqNum is nil, the txToSubmit is only valid when sourceAccount's sequence number is seqNum - 1. // Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < txToSubmit.seqNum. effectiveMinSeqNum := sequence - 1 if minSeqNum != nil { effectiveMinSeqNum = *minSeqNum } - q.queue.insert(txToSubmit{ - MinAccSeqNum: effectiveMinSeqNum, - MaxAccSeqNum: sequence - 1, - Chan: ch, + ch := make(chan error, 1) + q.transactions = append(q.transactions, txToSubmit{ + minAccSeqNum: effectiveMinSeqNum, + maxAccSeqNum: sequence - 1, + notifyBackChan: ch, }) return ch } @@ -74,35 +80,43 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { q.lastSeenAccountSequence = sequence } - wasChanged := false - - // We need to traverse the full queue (ordered by MaxAccSeqNum) - // in case there is a transaction with a submittable MinSeqNum we can use later on. - for i := 0; i < len(q.queue); { - tx := q.queue[i] - - removeWithErr := func(err error) { - tx.Chan <- err - close(tx.Chan) - wasChanged = true - q.queue.remove(i) + queueWasChanged := false + + txsToSubmit := make([]txToSubmit, 0, len(q.transactions)) + // Extract transactions ready to submit and notify those which are un-submittable. + for i := 0; i < len(q.transactions); { + candidate := q.transactions[i] + removeCandidateFromQueue := false + if q.lastSeenAccountSequence > candidate.maxAccSeqNum { + // this transaction can never be submitted because account sequence numbers only grow + candidate.notifyBackChan <- ErrBadSequence + close(candidate.notifyBackChan) + removeCandidateFromQueue = true + } else if q.lastSeenAccountSequence >= candidate.minAccSeqNum { + txsToSubmit = append(txsToSubmit, candidate) + removeCandidateFromQueue = true } - - if q.lastSeenAccountSequence > tx.MaxAccSeqNum { - // The transaction and account sequences will never match - removeWithErr(ErrBadSequence) - } else if q.lastSeenAccountSequence >= tx.MinAccSeqNum { - // within range, ready to submit! - removeWithErr(nil) + if removeCandidateFromQueue { + q.transactions = append(q.transactions[:i], q.transactions[i+1:]...) + queueWasChanged = true } else { - // we only need to increment the index when we don't remove - // a transaction + // only increment the index if there was no removal i++ } } + // To maximize successful submission opportunity, submit transactions by the account sequence + // which would result from a successful submission (i.e. maxAccSeqNum+1) + sort.Slice(txsToSubmit, func(i, j int) bool { + return txsToSubmit[i].maxAccSeqNum < txsToSubmit[j].maxAccSeqNum + }) + for _, tx := range txsToSubmit { + tx.notifyBackChan <- nil + close(tx.notifyBackChan) + } + // if we modified the queue, bump the timeout for this queue - if wasChanged { + if queueWasChanged { q.lastActiveAt = time.Now() return } @@ -110,47 +124,10 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { // if the queue wasn't changed, see if it is too old, clear // it and make room for other submissions if time.Since(q.lastActiveAt) > q.timeout { - for i := 0; i < len(q.queue); i++ { - c := q.queue[i].Chan - c <- ErrBadSequence - close(c) + for _, tx := range q.transactions { + tx.notifyBackChan <- ErrBadSequence + close(tx.notifyBackChan) } - q.queue = nil + q.transactions = nil } } - -// txToSubmit represents a transaction being tracked in submissionQueue -type txToSubmit struct { - MinAccSeqNum uint64 // minimum account sequence required to send the transaction - MaxAccSeqNum uint64 // maximum account sequence required to send the transaction - Chan chan error -} - -// submissionQueue is a priority queue (implemented as a sorted slice), -// used to track the submission order of transactions. -// Lower indices have higher submission priority. -type submissionQueue []txToSubmit - -func (sq *submissionQueue) insert(tx txToSubmit) { - txHasLessPriorityThanIth := func(i int) bool { - // To maximize transaction submission opportunity, we prioritize transactions by the account sequence - // which would result from successful submission (i.e. MaxAccSeqNum+1) but, - // if those are the same, by higher minimum sequence since there is less margin to send those - // (a smaller interval). - if tx.MaxAccSeqNum != (*sq)[i].MaxAccSeqNum { - return tx.MaxAccSeqNum < (*sq)[i].MaxAccSeqNum - } - return tx.MinAccSeqNum > (*sq)[i].MinAccSeqNum - } - i := sort.Search(len(*sq), txHasLessPriorityThanIth) - if len(*sq) == i { - *sq = append(*sq, tx) - return - } - *sq = append((*sq)[:i+1], (*sq)[i:]...) - (*sq)[i] = tx -} - -func (sq *submissionQueue) remove(i int) { - *sq = append((*sq)[:i], (*sq)[i+1:]...) -} diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go index 2f16a65409..0b4497f2fd 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go @@ -23,39 +23,6 @@ func (suite *QueueTestSuite) SetupTest() { suite.queue = NewAccountTxSubmissionQueue() } -//Push adds the provided channel on to the priority queue -func (suite *QueueTestSuite) TestQueue_Push() { - ctx := test.Context() - _ = ctx - - assert.Equal(suite.T(), 0, suite.queue.Size()) - - suite.queue.Push(6, nil) - assert.Equal(suite.T(), 1, suite.queue.Size()) - entry := suite.queue.queue[0] - assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum) - - min := uint64(1) - suite.queue.Push(5, &min) - assert.Equal(suite.T(), 2, suite.queue.Size()) - entry = suite.queue.queue[0] - assert.Equal(suite.T(), uint64(1), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum) - - suite.queue.Push(5, nil) - assert.Equal(suite.T(), 3, suite.queue.Size()) - entry = suite.queue.queue[0] - assert.Equal(suite.T(), uint64(4), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum) - - suite.queue.Push(4, nil) - assert.Equal(suite.T(), 4, suite.queue.Size()) - entry = suite.queue.queue[0] - assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum) -} - // Tests the NotifyLastAccountSequence method func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() { // NotifyLastAccountSequence removes sequences that are submittable or in the past @@ -71,11 +38,11 @@ func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() { suite.queue.NotifyLastAccountSequence(2) // the update above signifies that 2 is the accounts current sequence, - // meaning that 3 is submittable, and so only 4 (Min/MaxAccSeqNum=3) should remain + // meaning that 3 is submittable, and so only 4 (Min/maxAccSeqNum=3) should remain assert.Equal(suite.T(), 1, suite.queue.Size()) - entry := suite.queue.queue[0] - assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum) + entry := suite.queue.transactions[0] + assert.Equal(suite.T(), uint64(3), entry.minAccSeqNum) + assert.Equal(suite.T(), uint64(3), entry.maxAccSeqNum) suite.queue.NotifyLastAccountSequence(4) assert.Equal(suite.T(), 0, suite.queue.Size())