diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go index b43e60d908..30e0a4cd0d 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue.go @@ -14,10 +14,10 @@ import ( // being managed, queued submissions that can be acted upon will be unblocked. // type AccountTxSubmissionQueue struct { - lastActiveAt time.Time - timeout time.Duration - nextSequence uint64 - queue pqueue + lastActiveAt time.Time + timeout time.Duration + lastSeenAccountSequence uint64 + queue pqueue } // NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue @@ -51,9 +51,19 @@ func (q *AccountTxSubmissionQueue) Size() int { // 2. Load the current sequence number for the source account from the DB // 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if // possible -func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error { +func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error { ch := make(chan error, 1) - heap.Push(&q.queue, item{sequence, ch}) + // From CAP 19: If minSeqNum is nil, the tx is only valid when sourceAccount's sequence number is seqNum - 1. + // Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < tx.seqNum. + effectiveMinSeqNum := sequence - 1 + if minSeqNum != nil { + effectiveMinSeqNum = *minSeqNum + } + heap.Push(&q.queue, item{ + MinAccSeqNum: effectiveMinSeqNum, + MaxAccSeqNum: sequence - 1, + Chan: ch, + }) return ch } @@ -63,35 +73,34 @@ func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error { // This function is monotonic... calling it with a sequence number lower than // the latest seen sequence number is a noop. func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { - if q.nextSequence <= sequence { - q.nextSequence = sequence + 1 + if q.lastSeenAccountSequence <= sequence { + q.lastSeenAccountSequence = sequence } wasChanged := false - for { - if q.Size() == 0 { - break - } + // We need to traverse the full queue (ordered by MaxAccSeqNum) + // in case there is a transaction with a submittable MinSeqNum we can use later on. + for i := 0; i < q.Size(); { + tx := q.queue[i] - ch, hseq := q.head() - // if the next queued transaction has a sequence higher than the account's - // current sequence, stop removing entries - if hseq > q.nextSequence { - break + removeWithErr := func(err error) { + tx.Chan <- err + close(tx.Chan) + wasChanged = true + heap.Remove(&q.queue, i) } - // since this entry is unlocked (i.e. it's sequence is the next available - // or in the past we can remove it an mark the queue as changed - q.pop() - wasChanged = true - - if hseq < q.nextSequence { - ch <- ErrBadSequence - close(ch) - } else if hseq == q.nextSequence { - ch <- nil - close(ch) + if q.lastSeenAccountSequence > tx.MaxAccSeqNum { + // The transaction and account sequences will never match + removeWithErr(ErrBadSequence) + } else if q.lastSeenAccountSequence >= tx.MinAccSeqNum { + // within range, ready to submit! + removeWithErr(nil) + } else { + // we only need to increment the heap index when we don't remove + // an item + i++ } } @@ -105,33 +114,18 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) { // it and make room for other's if time.Since(q.lastActiveAt) > q.timeout { for q.Size() > 0 { - ch, _ := q.pop() - ch <- ErrBadSequence - close(ch) + entry := q.queue.Pop().(item) + entry.Chan <- ErrBadSequence + close(entry.Chan) } } } -// helper function for interacting with the priority queue -func (q *AccountTxSubmissionQueue) head() (chan error, uint64) { - if len(q.queue) == 0 { - return nil, uint64(0) - } - - return q.queue[0].Chan, q.queue[0].Sequence -} - -// helper function for interacting with the priority queue -func (q *AccountTxSubmissionQueue) pop() (chan error, uint64) { - i := heap.Pop(&q.queue).(item) - - return i.Chan, i.Sequence -} - // item is a element of the priority queue type item struct { - Sequence uint64 - Chan chan error + MinAccSeqNum uint64 // minimum account sequence required to send the transaction + MaxAccSeqNum uint64 // maximum account sequence required to send the transaction + Chan chan error } // pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It @@ -141,7 +135,14 @@ type pqueue []item func (pq pqueue) Len() int { return len(pq) } func (pq pqueue) Less(i, j int) bool { - return pq[i].Sequence < pq[j].Sequence + // To maximize tx submission opportunity, order transactions by the account sequence + // which would result from successful submission (MaxAccSeqNum+1) but, + // if those are the same, by higher minimum sequence since there is less margin to send those + // (a smaller interval). + if pq[i].MaxAccSeqNum != pq[j].MaxAccSeqNum { + return pq[i].MaxAccSeqNum < pq[j].MaxAccSeqNum + } + return pq[i].MinAccSeqNum > pq[j].MinAccSeqNum } func (pq pqueue) Swap(i, j int) { diff --git a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go index d7db456aed..2f16a65409 100644 --- a/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go +++ b/services/horizon/internal/txsub/sequence/account_tx_submission_queue_test.go @@ -30,34 +30,52 @@ func (suite *QueueTestSuite) TestQueue_Push() { assert.Equal(suite.T(), 0, suite.queue.Size()) - suite.queue.Push(2) + suite.queue.Push(6, nil) assert.Equal(suite.T(), 1, suite.queue.Size()) - _, s := suite.queue.head() - assert.Equal(suite.T(), uint64(2), s) + entry := suite.queue.queue[0] + assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum) - suite.queue.Push(1) + min := uint64(1) + suite.queue.Push(5, &min) assert.Equal(suite.T(), 2, suite.queue.Size()) - _, s = suite.queue.head() - assert.Equal(suite.T(), uint64(1), s) + entry = suite.queue.queue[0] + assert.Equal(suite.T(), uint64(1), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum) + + suite.queue.Push(5, nil) + assert.Equal(suite.T(), 3, suite.queue.Size()) + entry = suite.queue.queue[0] + assert.Equal(suite.T(), uint64(4), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum) + + suite.queue.Push(4, nil) + assert.Equal(suite.T(), 4, suite.queue.Size()) + entry = suite.queue.queue[0] + assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum) } -// Tests the update method -func (suite *QueueTestSuite) TestQueue_Update() { +// Tests the NotifyLastAccountSequence method +func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() { // NotifyLastAccountSequence removes sequences that are submittable or in the past + lowMin := uint64(1) results := []<-chan error{ - suite.queue.Push(1), - suite.queue.Push(2), - suite.queue.Push(3), - suite.queue.Push(4), + suite.queue.Push(1, nil), + suite.queue.Push(2, nil), + suite.queue.Push(3, nil), + suite.queue.Push(4, nil), + suite.queue.Push(4, &lowMin), } suite.queue.NotifyLastAccountSequence(2) // the update above signifies that 2 is the accounts current sequence, - // meaning that 3 is submittable, and so only 4 should still be queued + // meaning that 3 is submittable, and so only 4 (Min/MaxAccSeqNum=3) should remain assert.Equal(suite.T(), 1, suite.queue.Size()) - _, s := suite.queue.head() - assert.Equal(suite.T(), uint64(4), s) + entry := suite.queue.queue[0] + assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum) + assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum) suite.queue.NotifyLastAccountSequence(4) assert.Equal(suite.T(), 0, suite.queue.Size()) @@ -66,10 +84,11 @@ func (suite *QueueTestSuite) TestQueue_Update() { assert.Equal(suite.T(), ErrBadSequence, <-results[1]) assert.Equal(suite.T(), nil, <-results[2]) assert.Equal(suite.T(), ErrBadSequence, <-results[3]) + assert.Equal(suite.T(), nil, <-results[4]) // NotifyLastAccountSequence clears the queue if the head has not been released within the time limit suite.queue.timeout = 1 * time.Millisecond - result := suite.queue.Push(2) + result := suite.queue.Push(2, nil) <-time.After(10 * time.Millisecond) suite.queue.NotifyLastAccountSequence(0) diff --git a/services/horizon/internal/txsub/sequence/manager.go b/services/horizon/internal/txsub/sequence/manager.go index 566239d509..1a776f44d8 100644 --- a/services/horizon/internal/txsub/sequence/manager.go +++ b/services/horizon/internal/txsub/sequence/manager.go @@ -31,7 +31,7 @@ func (m *Manager) String() string { var addys []string for addy, q := range m.queues { - addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.nextSequence)) + addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.lastSeenAccountSequence)) } return "[ " + strings.Join(addys, ",") + " ]" @@ -60,7 +60,7 @@ func (m *Manager) Addresses() []string { // Push registers an intent to submit a transaction for the provided address at // the provided sequence. A channel is returned that will be written to when // the requester should attempt the submission. -func (m *Manager) Push(address string, sequence uint64) <-chan error { +func (m *Manager) Push(address string, sequence uint64, minSeqNum *uint64) <-chan error { m.mutex.Lock() defer m.mutex.Unlock() @@ -74,12 +74,12 @@ func (m *Manager) Push(address string, sequence uint64) <-chan error { m.queues[address] = aq } - return aq.Push(sequence) + return aq.Push(sequence, minSeqNum) } -// Update notifies the manager of newly loaded account sequence information. The manager uses this information +// NotifyLastAccountSequences notifies the manager of newly loaded account sequence information. The manager uses this information // to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic. -func (m *Manager) Update(updates map[string]uint64) { +func (m *Manager) NotifyLastAccountSequences(updates map[string]uint64) { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/services/horizon/internal/txsub/sequence/manager_test.go b/services/horizon/internal/txsub/sequence/manager_test.go index eff0e4ec9e..e69b8be6fc 100644 --- a/services/horizon/internal/txsub/sequence/manager_test.go +++ b/services/horizon/internal/txsub/sequence/manager_test.go @@ -10,26 +10,28 @@ import ( func TestManager_Push(t *testing.T) { mgr := NewManager() - mgr.Push("1", 2) - mgr.Push("1", 2) - mgr.Push("1", 3) - mgr.Push("2", 2) + minSeq := uint64(1) + mgr.Push("1", 2, nil) + mgr.Push("1", 2, nil) + mgr.Push("1", 3, &minSeq) + mgr.Push("2", 2, nil) assert.Equal(t, 4, mgr.Size()) assert.Equal(t, 3, mgr.queues["1"].Size()) assert.Equal(t, 1, mgr.queues["2"].Size()) } -// Test the NotifyLastAccountSequence method -func TestManager_Update(t *testing.T) { +// Test the NotifyLastAccountSequences method +func TestManager_NotifyLastAccountSequences(t *testing.T) { mgr := NewManager() + minSeq := uint64(1) results := []<-chan error{ - mgr.Push("1", 2), - mgr.Push("1", 3), - mgr.Push("2", 2), + mgr.Push("1", 4, &minSeq), + mgr.Push("1", 3, nil), + mgr.Push("2", 2, nil), } - mgr.Update(map[string]uint64{ + mgr.NotifyLastAccountSequences(map[string]uint64{ "1": 1, "2": 1, }) @@ -47,9 +49,9 @@ func TestManager_Update(t *testing.T) { func TestManager_PushNoMoreRoom(t *testing.T) { mgr := NewManager() for i := 0; i < mgr.MaxSize; i++ { - mgr.Push("1", 2) + mgr.Push("1", 2, nil) } assert.Equal(t, 1024, mgr.Size()) - assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2)) + assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2, nil)) } diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 70d0a67aa5..44b0876383 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -132,11 +132,11 @@ func (sys *System) Submit( // queue the submission and get the channel that will emit when // submission is valid - seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum())) + seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum()), nil) // update the submission queue with the source accounts current sequence value // which will cause the channel returned by Push() to emit if possible. - sys.SubmissionQueue.Update(map[string]uint64{ + sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ sourceAddress: sequenceNumber, }) @@ -160,7 +160,7 @@ func (sys *System) Submit( // add transactions to open list sys.Pending.Add(ctx, hash, response) // update the submission queue, allowing the next submission to proceed - sys.SubmissionQueue.Update(map[string]uint64{ + sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ sourceAddress: uint64(envelope.SeqNum()), }) return @@ -326,7 +326,7 @@ func (sys *System) Tick(ctx context.Context) { logger.WithStack(err).Error(err) return } else { - sys.SubmissionQueue.Update(curSeq) + sys.SubmissionQueue.NotifyLastAccountSequences(curSeq) } } diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index 6f01134f39..acaf6746fa 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -370,7 +370,7 @@ func (suite *SystemTestSuite) TestTick_Deadlock() { suite.db.On("Rollback").Return(nil).Once() // Start first Tick - suite.system.SubmissionQueue.Push("address", 0) + suite.system.SubmissionQueue.Push("address", 0, nil) suite.db.On("GetSequenceNumbers", suite.ctx, []string{"address"}). Return(map[string]uint64{}, nil). Run(func(args mock.Arguments) {