diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go index b43e60d908..cfe53df68e 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go @@ -1,7 +1,7 @@ package sequence import ( - "container/heap" + "sort" "time" ) @@ -14,10 +14,17 @@ import ( // being managed, queued submissions that can be acted upon will be unblocked. // type AccountTxSubmissionQueue struct { - lastActiveAt time.Time - timeout time.Duration - nextSequence uint64 - queue pqueue + lastActiveAt time.Time + timeout time.Duration + lastSeenAccountSequence uint64 + transactions []txToSubmit +} + +// txToSubmit represents a transaction being tracked by the queue +type txToSubmit struct { + minAccSeqNum uint64 // minimum account sequence required to send the transaction + maxAccSeqNum uint64 // maximum account sequence required to send the transaction + notifyBackChan chan error // submission notification channel } // NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue @@ -25,17 +32,13 @@ func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue { result := &AccountTxSubmissionQueue{ lastActiveAt: time.Now(), timeout: 10 * time.Second, - queue: nil, } - - heap.Init(&result.queue) - return result } // Size returns the count of currently buffered submissions in the queue. func (q *AccountTxSubmissionQueue) Size() int { - return len(q.queue) + return len(q.transactions) } // Push enqueues the intent to submit a transaction at the provided sequence @@ -51,9 +54,19 @@ func (q *AccountTxSubmissionQueue) Size() int { // 2. Load the current sequence number for the source account from the DB // 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if // possible -func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error { +func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error { + // From CAP 21: If minSeqNum is nil, the txToSubmit is only valid when sourceAccount's sequence number is seqNum - 1. + // Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < txToSubmit.seqNum. + effectiveMinSeqNum := sequence - 1 + if minSeqNum != nil { + effectiveMinSeqNum = *minSeqNum + } ch := make(chan error, 1) - heap.Push(&q.queue, item{sequence, ch}) + q.transactions = append(q.transactions, txToSubmit{ + minAccSeqNum: effectiveMinSeqNum, + maxAccSeqNum: sequence - 1, + notifyBackChan: ch, + }) return ch } @@ -63,99 +76,58 @@ func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error { // This function is monotonic... calling it with a sequence number lower than // the latest seen sequence number is a noop. func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { - if q.nextSequence <= sequence { - q.nextSequence = sequence + 1 + if q.lastSeenAccountSequence < sequence { + q.lastSeenAccountSequence = sequence } - wasChanged := false - - for { - if q.Size() == 0 { - break + queueWasChanged := false + + txsToSubmit := make([]txToSubmit, 0, len(q.transactions)) + // Extract transactions ready to submit and notify those which are un-submittable. + for i := 0; i < len(q.transactions); { + candidate := q.transactions[i] + removeCandidateFromQueue := false + if q.lastSeenAccountSequence > candidate.maxAccSeqNum { + // this transaction can never be submitted because account sequence numbers only grow + candidate.notifyBackChan <- ErrBadSequence + close(candidate.notifyBackChan) + removeCandidateFromQueue = true + } else if q.lastSeenAccountSequence >= candidate.minAccSeqNum { + txsToSubmit = append(txsToSubmit, candidate) + removeCandidateFromQueue = true } - - ch, hseq := q.head() - // if the next queued transaction has a sequence higher than the account's - // current sequence, stop removing entries - if hseq > q.nextSequence { - break + if removeCandidateFromQueue { + q.transactions = append(q.transactions[:i], q.transactions[i+1:]...) + queueWasChanged = true + } else { + // only increment the index if there was no removal + i++ } + } - // since this entry is unlocked (i.e. it's sequence is the next available - // or in the past we can remove it an mark the queue as changed - q.pop() - wasChanged = true - - if hseq < q.nextSequence { - ch <- ErrBadSequence - close(ch) - } else if hseq == q.nextSequence { - ch <- nil - close(ch) - } + // To maximize successful submission opportunity, submit transactions by the account sequence + // which would result from a successful submission (i.e. maxAccSeqNum+1) + sort.Slice(txsToSubmit, func(i, j int) bool { + return txsToSubmit[i].maxAccSeqNum < txsToSubmit[j].maxAccSeqNum + }) + for _, tx := range txsToSubmit { + tx.notifyBackChan <- nil + close(tx.notifyBackChan) } // if we modified the queue, bump the timeout for this queue - if wasChanged { + if queueWasChanged { q.lastActiveAt = time.Now() return } // if the queue wasn't changed, see if it is too old, clear - // it and make room for other's + // it and make room for other submissions if time.Since(q.lastActiveAt) > q.timeout { - for q.Size() > 0 { - ch, _ := q.pop() - ch <- ErrBadSequence - close(ch) + for _, tx := range q.transactions { + tx.notifyBackChan <- ErrBadSequence + close(tx.notifyBackChan) } + q.transactions = nil } } - -// helper function for interacting with the priority queue -func (q *AccountTxSubmissionQueue) head() (chan error, uint64) { - if len(q.queue) == 0 { - return nil, uint64(0) - } - - return q.queue[0].Chan, q.queue[0].Sequence -} - -// helper function for interacting with the priority queue -func (q *AccountTxSubmissionQueue) pop() (chan error, uint64) { - i := heap.Pop(&q.queue).(item) - - return i.Chan, i.Sequence -} - -// item is a element of the priority queue -type item struct { - Sequence uint64 - Chan chan error -} - -// pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It -// implements heap.Interface. -type pqueue []item - -func (pq pqueue) Len() int { return len(pq) } - -func (pq pqueue) Less(i, j int) bool { - return pq[i].Sequence < pq[j].Sequence -} - -func (pq pqueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] -} - -func (pq *pqueue) Push(x interface{}) { - *pq = append(*pq, x.(item)) -} - -func (pq *pqueue) Pop() interface{} { - old := *pq - n := len(old) - result := old[n-1] - *pq = old[0 : n-1] - return result -} diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go index d7db456aed..0b4497f2fd 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go @@ -23,41 +23,26 @@ func (suite *QueueTestSuite) SetupTest() { suite.queue = NewAccountTxSubmissionQueue() } -//Push adds the provided channel on to the priority queue -func (suite *QueueTestSuite) TestQueue_Push() { - ctx := test.Context() - _ = ctx - - assert.Equal(suite.T(), 0, suite.queue.Size()) - - suite.queue.Push(2) - assert.Equal(suite.T(), 1, suite.queue.Size()) - _, s := suite.queue.head() - assert.Equal(suite.T(), uint64(2), s) - - suite.queue.Push(1) - assert.Equal(suite.T(), 2, suite.queue.Size()) - _, s = suite.queue.head() - assert.Equal(suite.T(), uint64(1), s) -} - -// Tests the update method -func (suite *QueueTestSuite) TestQueue_Update() { +// Tests the NotifyLastAccountSequence method +func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() { // NotifyLastAccountSequence removes sequences that are submittable or in the past + lowMin := uint64(1) results := []<-chan error{ - suite.queue.Push(1), - suite.queue.Push(2), - suite.queue.Push(3), - suite.queue.Push(4), + suite.queue.Push(1, nil), + suite.queue.Push(2, nil), + suite.queue.Push(3, nil), + suite.queue.Push(4, nil), + suite.queue.Push(4, &lowMin), } suite.queue.NotifyLastAccountSequence(2) // the update above signifies that 2 is the accounts current sequence, - // meaning that 3 is submittable, and so only 4 should still be queued + // meaning that 3 is submittable, and so only 4 (Min/maxAccSeqNum=3) should remain assert.Equal(suite.T(), 1, suite.queue.Size()) - _, s := suite.queue.head() - assert.Equal(suite.T(), uint64(4), s) + entry := suite.queue.transactions[0] + assert.Equal(suite.T(), uint64(3), entry.minAccSeqNum) + assert.Equal(suite.T(), uint64(3), entry.maxAccSeqNum) suite.queue.NotifyLastAccountSequence(4) assert.Equal(suite.T(), 0, suite.queue.Size()) @@ -66,10 +51,11 @@ func (suite *QueueTestSuite) TestQueue_Update() { assert.Equal(suite.T(), ErrBadSequence, <-results[1]) assert.Equal(suite.T(), nil, <-results[2]) assert.Equal(suite.T(), ErrBadSequence, <-results[3]) + assert.Equal(suite.T(), nil, <-results[4]) // NotifyLastAccountSequence clears the queue if the head has not been released within the time limit suite.queue.timeout = 1 * time.Millisecond - result := suite.queue.Push(2) + result := suite.queue.Push(2, nil) <-time.After(10 * time.Millisecond) suite.queue.NotifyLastAccountSequence(0) diff --git a/services/horizon/internal/txsub/sequence/manager.go b/services/horizon/internal/txsub/sequence/manager.go index 566239d509..1a776f44d8 100644 --- a/services/horizon/internal/txsub/sequence/manager.go +++ b/services/horizon/internal/txsub/sequence/manager.go @@ -31,7 +31,7 @@ func (m *Manager) String() string { var addys []string for addy, q := range m.queues { - addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.nextSequence)) + addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.lastSeenAccountSequence)) } return "[ " + strings.Join(addys, ",") + " ]" @@ -60,7 +60,7 @@ func (m *Manager) Addresses() []string { // Push registers an intent to submit a transaction for the provided address at // the provided sequence. A channel is returned that will be written to when // the requester should attempt the submission. -func (m *Manager) Push(address string, sequence uint64) <-chan error { +func (m *Manager) Push(address string, sequence uint64, minSeqNum *uint64) <-chan error { m.mutex.Lock() defer m.mutex.Unlock() @@ -74,12 +74,12 @@ func (m *Manager) Push(address string, sequence uint64) <-chan error { m.queues[address] = aq } - return aq.Push(sequence) + return aq.Push(sequence, minSeqNum) } -// Update notifies the manager of newly loaded account sequence information. The manager uses this information +// NotifyLastAccountSequences notifies the manager of newly loaded account sequence information. The manager uses this information // to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic. -func (m *Manager) Update(updates map[string]uint64) { +func (m *Manager) NotifyLastAccountSequences(updates map[string]uint64) { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/services/horizon/internal/txsub/sequence/manager_test.go b/services/horizon/internal/txsub/sequence/manager_test.go index eff0e4ec9e..e69b8be6fc 100644 --- a/services/horizon/internal/txsub/sequence/manager_test.go +++ b/services/horizon/internal/txsub/sequence/manager_test.go @@ -10,26 +10,28 @@ import ( func TestManager_Push(t *testing.T) { mgr := NewManager() - mgr.Push("1", 2) - mgr.Push("1", 2) - mgr.Push("1", 3) - mgr.Push("2", 2) + minSeq := uint64(1) + mgr.Push("1", 2, nil) + mgr.Push("1", 2, nil) + mgr.Push("1", 3, &minSeq) + mgr.Push("2", 2, nil) assert.Equal(t, 4, mgr.Size()) assert.Equal(t, 3, mgr.queues["1"].Size()) assert.Equal(t, 1, mgr.queues["2"].Size()) } -// Test the NotifyLastAccountSequence method -func TestManager_Update(t *testing.T) { +// Test the NotifyLastAccountSequences method +func TestManager_NotifyLastAccountSequences(t *testing.T) { mgr := NewManager() + minSeq := uint64(1) results := []<-chan error{ - mgr.Push("1", 2), - mgr.Push("1", 3), - mgr.Push("2", 2), + mgr.Push("1", 4, &minSeq), + mgr.Push("1", 3, nil), + mgr.Push("2", 2, nil), } - mgr.Update(map[string]uint64{ + mgr.NotifyLastAccountSequences(map[string]uint64{ "1": 1, "2": 1, }) @@ -47,9 +49,9 @@ func TestManager_Update(t *testing.T) { func TestManager_PushNoMoreRoom(t *testing.T) { mgr := NewManager() for i := 0; i < mgr.MaxSize; i++ { - mgr.Push("1", 2) + mgr.Push("1", 2, nil) } assert.Equal(t, 1024, mgr.Size()) - assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2)) + assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2, nil)) } diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 70d0a67aa5..4d9716ba4b 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -113,7 +113,10 @@ func (sys *System) Submit( "tx": rawTx, }).Info("Processing transaction") - if envelope.SeqNum() < 0 { + seqNum := envelope.SeqNum() + minSeqNum := envelope.MinSeqNum() + // Ensure sequence numbers make sense + if seqNum < 0 || (minSeqNum != nil && (*minSeqNum < 0 || *minSeqNum >= seqNum)) { sys.finish(ctx, hash, response, Result{Err: ErrBadSequence}) return } @@ -132,11 +135,16 @@ func (sys *System) Submit( // queue the submission and get the channel that will emit when // submission is valid - seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum())) + var pMinSeqNum *uint64 + if minSeqNum != nil { + uMinSeqNum := uint64(*minSeqNum) + pMinSeqNum = &uMinSeqNum + } + seq := sys.SubmissionQueue.Push(sourceAddress, uint64(seqNum), pMinSeqNum) // update the submission queue with the source accounts current sequence value // which will cause the channel returned by Push() to emit if possible. - sys.SubmissionQueue.Update(map[string]uint64{ + sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ sourceAddress: sequenceNumber, }) @@ -160,7 +168,7 @@ func (sys *System) Submit( // add transactions to open list sys.Pending.Add(ctx, hash, response) // update the submission queue, allowing the next submission to proceed - sys.SubmissionQueue.Update(map[string]uint64{ + sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ sourceAddress: uint64(envelope.SeqNum()), }) return @@ -326,7 +334,7 @@ func (sys *System) Tick(ctx context.Context) { logger.WithStack(err).Error(err) return } else { - sys.SubmissionQueue.Update(curSeq) + sys.SubmissionQueue.NotifyLastAccountSequences(curSeq) } } diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index 6f01134f39..acaf6746fa 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -370,7 +370,7 @@ func (suite *SystemTestSuite) TestTick_Deadlock() { suite.db.On("Rollback").Return(nil).Once() // Start first Tick - suite.system.SubmissionQueue.Push("address", 0) + suite.system.SubmissionQueue.Push("address", 0, nil) suite.db.On("GetSequenceNumbers", suite.ctx, []string{"address"}). Return(map[string]uint64{}, nil). Run(func(args mock.Arguments) { diff --git a/xdr/transaction_envelope.go b/xdr/transaction_envelope.go index e9a4ce0a89..2c2d5cd94e 100644 --- a/xdr/transaction_envelope.go +++ b/xdr/transaction_envelope.go @@ -87,6 +87,32 @@ func (e TransactionEnvelope) SeqNum() int64 { } } +// MinSeqNum returns the minimum sequence number set in the transaction envelope +// +// Note for fee bump transactions, MinSeqNum() returns the sequence number +// of the inner transaction +func (e TransactionEnvelope) MinSeqNum() *int64 { + var p Preconditions + switch e.Type { + case EnvelopeTypeEnvelopeTypeTxFeeBump: + p = e.FeeBump.Tx.InnerTx.V1.Tx.Cond + case EnvelopeTypeEnvelopeTypeTx: + p = e.V1.Tx.Cond + case EnvelopeTypeEnvelopeTypeTxV0: + return nil + default: + panic("unsupported transaction type: " + e.Type.String()) + } + if p.Type != PreconditionTypePrecondV2 { + return nil + } + if p.V2.MinSeqNum == nil { + return nil + } + ret := int64(*p.V2.MinSeqNum) + return &ret +} + // TimeBounds returns the time bounds set in the transaction envelope // Note for fee bump transactions, TimeBounds() returns the time bounds // of the inner transaction