From b5d5182201880ec0e42a6167b5b7e503787b577c Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 22 Mar 2022 01:11:48 +0100 Subject: [PATCH 1/4] Update txsub queue to account for new CAP-21 preconditions --- .../sequence/account_tx_submission_queue.go | 101 +++++++++--------- .../account_tx_submission_queue_test.go | 51 ++++++--- .../internal/txsub/sequence/manager.go | 10 +- .../internal/txsub/sequence/manager_test.go | 26 ++--- services/horizon/internal/txsub/system.go | 8 +- .../horizon/internal/txsub/system_test.go | 2 +- 6 files changed, 110 insertions(+), 88 deletions(-) diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go index b43e60d908..30e0a4cd0d 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go @@ -14,10 +14,10 @@ import ( // being managed, queued submissions that can be acted upon will be unblocked. // type AccountTxSubmissionQueue struct { - lastActiveAt time.Time - timeout time.Duration - nextSequence uint64 - queue pqueue + lastActiveAt time.Time + timeout time.Duration + lastSeenAccountSequence uint64 + queue pqueue } // NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue @@ -51,9 +51,19 @@ func (q *AccountTxSubmissionQueue) Size() int { // 2. Load the current sequence number for the source account from the DB // 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if // possible -func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error { +func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error { ch := make(chan error, 1) - heap.Push(&q.queue, item{sequence, ch}) + // From CAP 19: If minSeqNum is nil, the tx is only valid when sourceAccount's sequence number is seqNum - 1. + // Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < tx.seqNum. + effectiveMinSeqNum := sequence - 1 + if minSeqNum != nil { + effectiveMinSeqNum = *minSeqNum + } + heap.Push(&q.queue, item{ + MinAccSeqNum: effectiveMinSeqNum, + MaxAccSeqNum: sequence - 1, + Chan: ch, + }) return ch } @@ -63,35 +73,34 @@ func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error { // This function is monotonic... calling it with a sequence number lower than // the latest seen sequence number is a noop. func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { - if q.nextSequence <= sequence { - q.nextSequence = sequence + 1 + if q.lastSeenAccountSequence <= sequence { + q.lastSeenAccountSequence = sequence } wasChanged := false - for { - if q.Size() == 0 { - break - } + // We need to traverse the full queue (ordered by MaxAccSeqNum) + // in case there is a transaction with a submittable MinSeqNum we can use later on. + for i := 0; i < q.Size(); { + tx := q.queue[i] - ch, hseq := q.head() - // if the next queued transaction has a sequence higher than the account's - // current sequence, stop removing entries - if hseq > q.nextSequence { - break + removeWithErr := func(err error) { + tx.Chan <- err + close(tx.Chan) + wasChanged = true + heap.Remove(&q.queue, i) } - // since this entry is unlocked (i.e. it's sequence is the next available - // or in the past we can remove it an mark the queue as changed - q.pop() - wasChanged = true - - if hseq < q.nextSequence { - ch <- ErrBadSequence - close(ch) - } else if hseq == q.nextSequence { - ch <- nil - close(ch) + if q.lastSeenAccountSequence > tx.MaxAccSeqNum { + // The transaction and account sequences will never match + removeWithErr(ErrBadSequence) + } else if q.lastSeenAccountSequence >= tx.MinAccSeqNum { + // within range, ready to submit! + removeWithErr(nil) + } else { + // we only need to increment the heap index when we don't remove + // an item + i++ } } @@ -105,33 +114,18 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { // it and make room for other's if time.Since(q.lastActiveAt) > q.timeout { for q.Size() > 0 { - ch, _ := q.pop() - ch <- ErrBadSequence - close(ch) + entry := q.queue.Pop().(item) + entry.Chan <- ErrBadSequence + close(entry.Chan) } } } -// helper function for interacting with the priority queue -func (q *AccountTxSubmissionQueue) head() (chan error, uint64) { - if len(q.queue) == 0 { - return nil, uint64(0) - } - - return q.queue[0].Chan, q.queue[0].Sequence -} - -// helper function for interacting with the priority queue -func (q *AccountTxSubmissionQueue) pop() (chan error, uint64) { - i := heap.Pop(&q.queue).(item) - - return i.Chan, i.Sequence -} - // item is a element of the priority queue type item struct { - Sequence uint64 - Chan chan error + MinAccSeqNum uint64 // minimum account sequence required to send the transaction + MaxAccSeqNum uint64 // maximum account sequence required to send the transaction + Chan chan error } // pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It @@ -141,7 +135,14 @@ type pqueue []item func (pq pqueue) Len() int { return len(pq) } func (pq pqueue) Less(i, j int) bool { - return pq[i].Sequence < pq[j].Sequence + // To maximize tx submission opportunity, order transactions by the account sequence + // which would result from successful submission (MaxAccSeqNum+1) but, + // if those are the same, by higher minimum sequence since there is less margin to send those + // (a smaller interval). + if pq[i].MaxAccSeqNum != pq[j].MaxAccSeqNum { + return pq[i].MaxAccSeqNum < pq[j].MaxAccSeqNum + } + return pq[i].MinAccSeqNum > pq[j].MinAccSeqNum } func (pq pqueue) Swap(i, j int) { diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go index d7db456aed..2f16a65409 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go @@ -30,34 +30,52 @@ func (suite *QueueTestSuite) TestQueue_Push() { assert.Equal(suite.T(), 0, suite.queue.Size()) - suite.queue.Push(2) + suite.queue.Push(6, nil) assert.Equal(suite.T(), 1, suite.queue.Size()) - _, s := suite.queue.head() - assert.Equal(suite.T(), uint64(2), s) + entry := suite.queue.queue[0] + assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum) - suite.queue.Push(1) + min := uint64(1) + suite.queue.Push(5, &min) assert.Equal(suite.T(), 2, suite.queue.Size()) - _, s = suite.queue.head() - assert.Equal(suite.T(), uint64(1), s) + entry = suite.queue.queue[0] + assert.Equal(suite.T(), uint64(1), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum) + + suite.queue.Push(5, nil) + assert.Equal(suite.T(), 3, suite.queue.Size()) + entry = suite.queue.queue[0] + assert.Equal(suite.T(), uint64(4), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum) + + suite.queue.Push(4, nil) + assert.Equal(suite.T(), 4, suite.queue.Size()) + entry = suite.queue.queue[0] + assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum) } -// Tests the update method -func (suite *QueueTestSuite) TestQueue_Update() { +// Tests the NotifyLastAccountSequence method +func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() { // NotifyLastAccountSequence removes sequences that are submittable or in the past + lowMin := uint64(1) results := []<-chan error{ - suite.queue.Push(1), - suite.queue.Push(2), - suite.queue.Push(3), - suite.queue.Push(4), + suite.queue.Push(1, nil), + suite.queue.Push(2, nil), + suite.queue.Push(3, nil), + suite.queue.Push(4, nil), + suite.queue.Push(4, &lowMin), } suite.queue.NotifyLastAccountSequence(2) // the update above signifies that 2 is the accounts current sequence, - // meaning that 3 is submittable, and so only 4 should still be queued + // meaning that 3 is submittable, and so only 4 (Min/MaxAccSeqNum=3) should remain assert.Equal(suite.T(), 1, suite.queue.Size()) - _, s := suite.queue.head() - assert.Equal(suite.T(), uint64(4), s) + entry := suite.queue.queue[0] + assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum) suite.queue.NotifyLastAccountSequence(4) assert.Equal(suite.T(), 0, suite.queue.Size()) @@ -66,10 +84,11 @@ func (suite *QueueTestSuite) TestQueue_Update() { assert.Equal(suite.T(), ErrBadSequence, <-results[1]) assert.Equal(suite.T(), nil, <-results[2]) assert.Equal(suite.T(), ErrBadSequence, <-results[3]) + assert.Equal(suite.T(), nil, <-results[4]) // NotifyLastAccountSequence clears the queue if the head has not been released within the time limit suite.queue.timeout = 1 * time.Millisecond - result := suite.queue.Push(2) + result := suite.queue.Push(2, nil) <-time.After(10 * time.Millisecond) suite.queue.NotifyLastAccountSequence(0) diff --git a/services/horizon/internal/txsub/sequence/manager.go b/services/horizon/internal/txsub/sequence/manager.go index 566239d509..1a776f44d8 100644 --- a/services/horizon/internal/txsub/sequence/manager.go +++ b/services/horizon/internal/txsub/sequence/manager.go @@ -31,7 +31,7 @@ func (m *Manager) String() string { var addys []string for addy, q := range m.queues { - addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.nextSequence)) + addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.lastSeenAccountSequence)) } return "[ " + strings.Join(addys, ",") + " ]" @@ -60,7 +60,7 @@ func (m *Manager) Addresses() []string { // Push registers an intent to submit a transaction for the provided address at // the provided sequence. A channel is returned that will be written to when // the requester should attempt the submission. -func (m *Manager) Push(address string, sequence uint64) <-chan error { +func (m *Manager) Push(address string, sequence uint64, minSeqNum *uint64) <-chan error { m.mutex.Lock() defer m.mutex.Unlock() @@ -74,12 +74,12 @@ func (m *Manager) Push(address string, sequence uint64) <-chan error { m.queues[address] = aq } - return aq.Push(sequence) + return aq.Push(sequence, minSeqNum) } -// Update notifies the manager of newly loaded account sequence information. The manager uses this information +// NotifyLastAccountSequences notifies the manager of newly loaded account sequence information. The manager uses this information // to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic. -func (m *Manager) Update(updates map[string]uint64) { +func (m *Manager) NotifyLastAccountSequences(updates map[string]uint64) { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/services/horizon/internal/txsub/sequence/manager_test.go b/services/horizon/internal/txsub/sequence/manager_test.go index eff0e4ec9e..e69b8be6fc 100644 --- a/services/horizon/internal/txsub/sequence/manager_test.go +++ b/services/horizon/internal/txsub/sequence/manager_test.go @@ -10,26 +10,28 @@ import ( func TestManager_Push(t *testing.T) { mgr := NewManager() - mgr.Push("1", 2) - mgr.Push("1", 2) - mgr.Push("1", 3) - mgr.Push("2", 2) + minSeq := uint64(1) + mgr.Push("1", 2, nil) + mgr.Push("1", 2, nil) + mgr.Push("1", 3, &minSeq) + mgr.Push("2", 2, nil) assert.Equal(t, 4, mgr.Size()) assert.Equal(t, 3, mgr.queues["1"].Size()) assert.Equal(t, 1, mgr.queues["2"].Size()) } -// Test the NotifyLastAccountSequence method -func TestManager_Update(t *testing.T) { +// Test the NotifyLastAccountSequences method +func TestManager_NotifyLastAccountSequences(t *testing.T) { mgr := NewManager() + minSeq := uint64(1) results := []<-chan error{ - mgr.Push("1", 2), - mgr.Push("1", 3), - mgr.Push("2", 2), + mgr.Push("1", 4, &minSeq), + mgr.Push("1", 3, nil), + mgr.Push("2", 2, nil), } - mgr.Update(map[string]uint64{ + mgr.NotifyLastAccountSequences(map[string]uint64{ "1": 1, "2": 1, }) @@ -47,9 +49,9 @@ func TestManager_Update(t *testing.T) { func TestManager_PushNoMoreRoom(t *testing.T) { mgr := NewManager() for i := 0; i < mgr.MaxSize; i++ { - mgr.Push("1", 2) + mgr.Push("1", 2, nil) } assert.Equal(t, 1024, mgr.Size()) - assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2)) + assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2, nil)) } diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 70d0a67aa5..44b0876383 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -132,11 +132,11 @@ func (sys *System) Submit( // queue the submission and get the channel that will emit when // submission is valid - seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum())) + seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum()), nil) // update the submission queue with the source accounts current sequence value // which will cause the channel returned by Push() to emit if possible. - sys.SubmissionQueue.Update(map[string]uint64{ + sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ sourceAddress: sequenceNumber, }) @@ -160,7 +160,7 @@ func (sys *System) Submit( // add transactions to open list sys.Pending.Add(ctx, hash, response) // update the submission queue, allowing the next submission to proceed - sys.SubmissionQueue.Update(map[string]uint64{ + sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ sourceAddress: uint64(envelope.SeqNum()), }) return @@ -326,7 +326,7 @@ func (sys *System) Tick(ctx context.Context) { logger.WithStack(err).Error(err) return } else { - sys.SubmissionQueue.Update(curSeq) + sys.SubmissionQueue.NotifyLastAccountSequences(curSeq) } } diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index 6f01134f39..acaf6746fa 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -370,7 +370,7 @@ func (suite *SystemTestSuite) TestTick_Deadlock() { suite.db.On("Rollback").Return(nil).Once() // Start first Tick - suite.system.SubmissionQueue.Push("address", 0) + suite.system.SubmissionQueue.Push("address", 0, nil) suite.db.On("GetSequenceNumbers", suite.ctx, []string{"address"}). Return(map[string]uint64{}, nil). Run(func(args mock.Arguments) { From 00d541d3f6ef75dd835a9e327268f0c0d3a7fcf7 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 23 Mar 2022 15:28:50 +0100 Subject: [PATCH 2/4] Use a sorted slice instead of a heap --- .../sequence/account_tx_submission_queue.go | 88 +++++++++---------- 1 file changed, 41 insertions(+), 47 deletions(-) diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go index 30e0a4cd0d..a9f4e450f8 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go @@ -1,7 +1,7 @@ package sequence import ( - "container/heap" + "sort" "time" ) @@ -17,7 +17,7 @@ type AccountTxSubmissionQueue struct { lastActiveAt time.Time timeout time.Duration lastSeenAccountSequence uint64 - queue pqueue + queue submissionQueue } // NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue @@ -27,9 +27,6 @@ func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue { timeout: 10 * time.Second, queue: nil, } - - heap.Init(&result.queue) - return result } @@ -53,13 +50,13 @@ func (q *AccountTxSubmissionQueue) Size() int { // possible func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error { ch := make(chan error, 1) - // From CAP 19: If minSeqNum is nil, the tx is only valid when sourceAccount's sequence number is seqNum - 1. - // Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < tx.seqNum. + // From CAP 21: If minSeqNum is nil, the txToSubmit is only valid when sourceAccount's sequence number is seqNum - 1. + // Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < txToSubmit.seqNum. effectiveMinSeqNum := sequence - 1 if minSeqNum != nil { effectiveMinSeqNum = *minSeqNum } - heap.Push(&q.queue, item{ + q.queue.insert(txToSubmit{ MinAccSeqNum: effectiveMinSeqNum, MaxAccSeqNum: sequence - 1, Chan: ch, @@ -81,14 +78,14 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { // We need to traverse the full queue (ordered by MaxAccSeqNum) // in case there is a transaction with a submittable MinSeqNum we can use later on. - for i := 0; i < q.Size(); { + for i := 0; i < len(q.queue); { tx := q.queue[i] removeWithErr := func(err error) { tx.Chan <- err close(tx.Chan) wasChanged = true - heap.Remove(&q.queue, i) + q.queue.remove(i) } if q.lastSeenAccountSequence > tx.MaxAccSeqNum { @@ -98,8 +95,8 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { // within range, ready to submit! removeWithErr(nil) } else { - // we only need to increment the heap index when we don't remove - // an item + // we only need to increment the index when we don't remove + // a transaction i++ } } @@ -111,52 +108,49 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { } // if the queue wasn't changed, see if it is too old, clear - // it and make room for other's + // it and make room for other submissions if time.Since(q.lastActiveAt) > q.timeout { - for q.Size() > 0 { - entry := q.queue.Pop().(item) - entry.Chan <- ErrBadSequence - close(entry.Chan) + for i := 0; i < len(q.queue); i++ { + c := q.queue[i].Chan + c <- ErrBadSequence + close(c) } + q.queue = nil } } -// item is a element of the priority queue -type item struct { +// txToSubmit represents a transaction being tracked in submissionQueue +type txToSubmit struct { MinAccSeqNum uint64 // minimum account sequence required to send the transaction MaxAccSeqNum uint64 // maximum account sequence required to send the transaction Chan chan error } -// pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It -// implements heap.Interface. -type pqueue []item - -func (pq pqueue) Len() int { return len(pq) } - -func (pq pqueue) Less(i, j int) bool { - // To maximize tx submission opportunity, order transactions by the account sequence - // which would result from successful submission (MaxAccSeqNum+1) but, - // if those are the same, by higher minimum sequence since there is less margin to send those - // (a smaller interval). - if pq[i].MaxAccSeqNum != pq[j].MaxAccSeqNum { - return pq[i].MaxAccSeqNum < pq[j].MaxAccSeqNum +// submissionQueue is a priority queue (implemented as a sorted slice), +// used to track the submission order of transactions. +// Lower indices have higher submission priority. +type submissionQueue []txToSubmit + +func (sq *submissionQueue) insert(tx txToSubmit) { + txHasLessPriorityThanIth := func(i int) bool { + // To maximize transaction submission opportunity, we prioritize transactions by the account sequence + // which would result from successful submission (i.e. MaxAccSeqNum+1) but, + // if those are the same, by higher minimum sequence since there is less margin to send those + // (a smaller interval). + if tx.MaxAccSeqNum != (*sq)[i].MaxAccSeqNum { + return tx.MaxAccSeqNum < (*sq)[i].MaxAccSeqNum + } + return tx.MinAccSeqNum > (*sq)[i].MinAccSeqNum } - return pq[i].MinAccSeqNum > pq[j].MinAccSeqNum -} - -func (pq pqueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] -} - -func (pq *pqueue) Push(x interface{}) { - *pq = append(*pq, x.(item)) + i := sort.Search(len(*sq), txHasLessPriorityThanIth) + if len(*sq) == i { + *sq = append(*sq, tx) + return + } + *sq = append((*sq)[:i+1], (*sq)[i:]...) + (*sq)[i] = tx } -func (pq *pqueue) Pop() interface{} { - old := *pq - n := len(old) - result := old[n-1] - *pq = old[0 : n-1] - return result +func (sq *submissionQueue) remove(i int) { + *sq = append((*sq)[:i], (*sq)[i+1:]...) } From f6f4e82d2c18f96f37acf0990d11d0e9ffd625fa Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 23 Mar 2022 16:36:02 +0100 Subject: [PATCH 3/4] Use an unsorted slice --- .../sequence/account_tx_submission_queue.go | 119 +++++++----------- .../account_tx_submission_queue_test.go | 41 +----- 2 files changed, 52 insertions(+), 108 deletions(-) diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go index a9f4e450f8..03be11e92d 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go @@ -17,7 +17,14 @@ type AccountTxSubmissionQueue struct { lastActiveAt time.Time timeout time.Duration lastSeenAccountSequence uint64 - queue submissionQueue + transactions []txToSubmit +} + +// txToSubmit represents a transaction being tracked by the queue +type txToSubmit struct { + minAccSeqNum uint64 // minimum account sequence required to send the transaction + maxAccSeqNum uint64 // maximum account sequence required to send the transaction + notifyBackChan chan error // submission notification channel } // NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue @@ -25,14 +32,13 @@ func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue { result := &AccountTxSubmissionQueue{ lastActiveAt: time.Now(), timeout: 10 * time.Second, - queue: nil, } return result } // Size returns the count of currently buffered submissions in the queue. func (q *AccountTxSubmissionQueue) Size() int { - return len(q.queue) + return len(q.transactions) } // Push enqueues the intent to submit a transaction at the provided sequence @@ -49,17 +55,17 @@ func (q *AccountTxSubmissionQueue) Size() int { // 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if // possible func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error { - ch := make(chan error, 1) // From CAP 21: If minSeqNum is nil, the txToSubmit is only valid when sourceAccount's sequence number is seqNum - 1. // Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < txToSubmit.seqNum. effectiveMinSeqNum := sequence - 1 if minSeqNum != nil { effectiveMinSeqNum = *minSeqNum } - q.queue.insert(txToSubmit{ - MinAccSeqNum: effectiveMinSeqNum, - MaxAccSeqNum: sequence - 1, - Chan: ch, + ch := make(chan error, 1) + q.transactions = append(q.transactions, txToSubmit{ + minAccSeqNum: effectiveMinSeqNum, + maxAccSeqNum: sequence - 1, + notifyBackChan: ch, }) return ch } @@ -74,35 +80,43 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { q.lastSeenAccountSequence = sequence } - wasChanged := false - - // We need to traverse the full queue (ordered by MaxAccSeqNum) - // in case there is a transaction with a submittable MinSeqNum we can use later on. - for i := 0; i < len(q.queue); { - tx := q.queue[i] - - removeWithErr := func(err error) { - tx.Chan <- err - close(tx.Chan) - wasChanged = true - q.queue.remove(i) + queueWasChanged := false + + txsToSubmit := make([]txToSubmit, 0, len(q.transactions)) + // Extract transactions ready to submit and notify those which are un-submittable. + for i := 0; i < len(q.transactions); { + candidate := q.transactions[i] + removeCandidateFromQueue := false + if q.lastSeenAccountSequence > candidate.maxAccSeqNum { + // this transaction can never be submitted because account sequence numbers only grow + candidate.notifyBackChan <- ErrBadSequence + close(candidate.notifyBackChan) + removeCandidateFromQueue = true + } else if q.lastSeenAccountSequence >= candidate.minAccSeqNum { + txsToSubmit = append(txsToSubmit, candidate) + removeCandidateFromQueue = true } - - if q.lastSeenAccountSequence > tx.MaxAccSeqNum { - // The transaction and account sequences will never match - removeWithErr(ErrBadSequence) - } else if q.lastSeenAccountSequence >= tx.MinAccSeqNum { - // within range, ready to submit! - removeWithErr(nil) + if removeCandidateFromQueue { + q.transactions = append(q.transactions[:i], q.transactions[i+1:]...) + queueWasChanged = true } else { - // we only need to increment the index when we don't remove - // a transaction + // only increment the index if there was no removal i++ } } + // To maximize successful submission opportunity, submit transactions by the account sequence + // which would result from a successful submission (i.e. maxAccSeqNum+1) + sort.Slice(txsToSubmit, func(i, j int) bool { + return txsToSubmit[i].maxAccSeqNum < txsToSubmit[j].maxAccSeqNum + }) + for _, tx := range txsToSubmit { + tx.notifyBackChan <- nil + close(tx.notifyBackChan) + } + // if we modified the queue, bump the timeout for this queue - if wasChanged { + if queueWasChanged { q.lastActiveAt = time.Now() return } @@ -110,47 +124,10 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { // if the queue wasn't changed, see if it is too old, clear // it and make room for other submissions if time.Since(q.lastActiveAt) > q.timeout { - for i := 0; i < len(q.queue); i++ { - c := q.queue[i].Chan - c <- ErrBadSequence - close(c) + for _, tx := range q.transactions { + tx.notifyBackChan <- ErrBadSequence + close(tx.notifyBackChan) } - q.queue = nil + q.transactions = nil } } - -// txToSubmit represents a transaction being tracked in submissionQueue -type txToSubmit struct { - MinAccSeqNum uint64 // minimum account sequence required to send the transaction - MaxAccSeqNum uint64 // maximum account sequence required to send the transaction - Chan chan error -} - -// submissionQueue is a priority queue (implemented as a sorted slice), -// used to track the submission order of transactions. -// Lower indices have higher submission priority. -type submissionQueue []txToSubmit - -func (sq *submissionQueue) insert(tx txToSubmit) { - txHasLessPriorityThanIth := func(i int) bool { - // To maximize transaction submission opportunity, we prioritize transactions by the account sequence - // which would result from successful submission (i.e. MaxAccSeqNum+1) but, - // if those are the same, by higher minimum sequence since there is less margin to send those - // (a smaller interval). - if tx.MaxAccSeqNum != (*sq)[i].MaxAccSeqNum { - return tx.MaxAccSeqNum < (*sq)[i].MaxAccSeqNum - } - return tx.MinAccSeqNum > (*sq)[i].MinAccSeqNum - } - i := sort.Search(len(*sq), txHasLessPriorityThanIth) - if len(*sq) == i { - *sq = append(*sq, tx) - return - } - *sq = append((*sq)[:i+1], (*sq)[i:]...) - (*sq)[i] = tx -} - -func (sq *submissionQueue) remove(i int) { - *sq = append((*sq)[:i], (*sq)[i+1:]...) -} diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go index 2f16a65409..0b4497f2fd 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go @@ -23,39 +23,6 @@ func (suite *QueueTestSuite) SetupTest() { suite.queue = NewAccountTxSubmissionQueue() } -//Push adds the provided channel on to the priority queue -func (suite *QueueTestSuite) TestQueue_Push() { - ctx := test.Context() - _ = ctx - - assert.Equal(suite.T(), 0, suite.queue.Size()) - - suite.queue.Push(6, nil) - assert.Equal(suite.T(), 1, suite.queue.Size()) - entry := suite.queue.queue[0] - assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum) - - min := uint64(1) - suite.queue.Push(5, &min) - assert.Equal(suite.T(), 2, suite.queue.Size()) - entry = suite.queue.queue[0] - assert.Equal(suite.T(), uint64(1), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum) - - suite.queue.Push(5, nil) - assert.Equal(suite.T(), 3, suite.queue.Size()) - entry = suite.queue.queue[0] - assert.Equal(suite.T(), uint64(4), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum) - - suite.queue.Push(4, nil) - assert.Equal(suite.T(), 4, suite.queue.Size()) - entry = suite.queue.queue[0] - assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum) -} - // Tests the NotifyLastAccountSequence method func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() { // NotifyLastAccountSequence removes sequences that are submittable or in the past @@ -71,11 +38,11 @@ func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() { suite.queue.NotifyLastAccountSequence(2) // the update above signifies that 2 is the accounts current sequence, - // meaning that 3 is submittable, and so only 4 (Min/MaxAccSeqNum=3) should remain + // meaning that 3 is submittable, and so only 4 (Min/maxAccSeqNum=3) should remain assert.Equal(suite.T(), 1, suite.queue.Size()) - entry := suite.queue.queue[0] - assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum) - assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum) + entry := suite.queue.transactions[0] + assert.Equal(suite.T(), uint64(3), entry.minAccSeqNum) + assert.Equal(suite.T(), uint64(3), entry.maxAccSeqNum) suite.queue.NotifyLastAccountSequence(4) assert.Equal(suite.T(), 0, suite.queue.Size()) From 87cc2e630321ffbe9519a0d43cdb03b1357bfc8e Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 24 Mar 2022 15:30:56 +0100 Subject: [PATCH 4/4] Address review feedback --- .../sequence/account_tx_submission_queue.go | 2 +- services/horizon/internal/txsub/system.go | 12 +++++++-- xdr/transaction_envelope.go | 26 +++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go index 03be11e92d..cfe53df68e 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go @@ -76,7 +76,7 @@ func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-ch // This function is monotonic... calling it with a sequence number lower than // the latest seen sequence number is a noop. func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { - if q.lastSeenAccountSequence <= sequence { + if q.lastSeenAccountSequence < sequence { q.lastSeenAccountSequence = sequence } diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 44b0876383..4d9716ba4b 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -113,7 +113,10 @@ func (sys *System) Submit( "tx": rawTx, }).Info("Processing transaction") - if envelope.SeqNum() < 0 { + seqNum := envelope.SeqNum() + minSeqNum := envelope.MinSeqNum() + // Ensure sequence numbers make sense + if seqNum < 0 || (minSeqNum != nil && (*minSeqNum < 0 || *minSeqNum >= seqNum)) { sys.finish(ctx, hash, response, Result{Err: ErrBadSequence}) return } @@ -132,7 +135,12 @@ func (sys *System) Submit( // queue the submission and get the channel that will emit when // submission is valid - seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum()), nil) + var pMinSeqNum *uint64 + if minSeqNum != nil { + uMinSeqNum := uint64(*minSeqNum) + pMinSeqNum = &uMinSeqNum + } + seq := sys.SubmissionQueue.Push(sourceAddress, uint64(seqNum), pMinSeqNum) // update the submission queue with the source accounts current sequence value // which will cause the channel returned by Push() to emit if possible. diff --git a/xdr/transaction_envelope.go b/xdr/transaction_envelope.go index e9a4ce0a89..2c2d5cd94e 100644 --- a/xdr/transaction_envelope.go +++ b/xdr/transaction_envelope.go @@ -87,6 +87,32 @@ func (e TransactionEnvelope) SeqNum() int64 { } } +// MinSeqNum returns the minimum sequence number set in the transaction envelope +// +// Note for fee bump transactions, MinSeqNum() returns the sequence number +// of the inner transaction +func (e TransactionEnvelope) MinSeqNum() *int64 { + var p Preconditions + switch e.Type { + case EnvelopeTypeEnvelopeTypeTxFeeBump: + p = e.FeeBump.Tx.InnerTx.V1.Tx.Cond + case EnvelopeTypeEnvelopeTypeTx: + p = e.V1.Tx.Cond + case EnvelopeTypeEnvelopeTypeTxV0: + return nil + default: + panic("unsupported transaction type: " + e.Type.String()) + } + if p.Type != PreconditionTypePrecondV2 { + return nil + } + if p.V2.MinSeqNum == nil { + return nil + } + ret := int64(*p.V2.MinSeqNum) + return &ret +} + // TimeBounds returns the time bounds set in the transaction envelope // Note for fee bump transactions, TimeBounds() returns the time bounds // of the inner transaction