Skip to content

Commit

Permalink
services/horizon: Update txsub queue to account for new CAP-21 precon…
Browse files Browse the repository at this point in the history
…ditions (#4301)
  • Loading branch information
2opremio authored Mar 24, 2022
1 parent f6eca07 commit b6605cf
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 142 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sequence

import (
"container/heap"
"sort"
"time"
)

Expand All @@ -14,28 +14,31 @@ import (
// being managed, queued submissions that can be acted upon will be unblocked.
//
type AccountTxSubmissionQueue struct {
lastActiveAt time.Time
timeout time.Duration
nextSequence uint64
queue pqueue
lastActiveAt time.Time
timeout time.Duration
lastSeenAccountSequence uint64
transactions []txToSubmit
}

// txToSubmit represents a transaction being tracked by the queue
type txToSubmit struct {
minAccSeqNum uint64 // minimum account sequence required to send the transaction
maxAccSeqNum uint64 // maximum account sequence required to send the transaction
notifyBackChan chan error // submission notification channel
}

// NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue
func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue {
result := &AccountTxSubmissionQueue{
lastActiveAt: time.Now(),
timeout: 10 * time.Second,
queue: nil,
}

heap.Init(&result.queue)

return result
}

// Size returns the count of currently buffered submissions in the queue.
func (q *AccountTxSubmissionQueue) Size() int {
return len(q.queue)
return len(q.transactions)
}

// Push enqueues the intent to submit a transaction at the provided sequence
Expand All @@ -51,9 +54,19 @@ func (q *AccountTxSubmissionQueue) Size() int {
// 2. Load the current sequence number for the source account from the DB
// 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if
// possible
func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error {
func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error {
// From CAP 21: If minSeqNum is nil, the txToSubmit is only valid when sourceAccount's sequence number is seqNum - 1.
// Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < txToSubmit.seqNum.
effectiveMinSeqNum := sequence - 1
if minSeqNum != nil {
effectiveMinSeqNum = *minSeqNum
}
ch := make(chan error, 1)
heap.Push(&q.queue, item{sequence, ch})
q.transactions = append(q.transactions, txToSubmit{
minAccSeqNum: effectiveMinSeqNum,
maxAccSeqNum: sequence - 1,
notifyBackChan: ch,
})
return ch
}

Expand All @@ -63,99 +76,58 @@ func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error {
// This function is monotonic... calling it with a sequence number lower than
// the latest seen sequence number is a noop.
func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) {
if q.nextSequence <= sequence {
q.nextSequence = sequence + 1
if q.lastSeenAccountSequence < sequence {
q.lastSeenAccountSequence = sequence
}

wasChanged := false

for {
if q.Size() == 0 {
break
queueWasChanged := false

txsToSubmit := make([]txToSubmit, 0, len(q.transactions))
// Extract transactions ready to submit and notify those which are un-submittable.
for i := 0; i < len(q.transactions); {
candidate := q.transactions[i]
removeCandidateFromQueue := false
if q.lastSeenAccountSequence > candidate.maxAccSeqNum {
// this transaction can never be submitted because account sequence numbers only grow
candidate.notifyBackChan <- ErrBadSequence
close(candidate.notifyBackChan)
removeCandidateFromQueue = true
} else if q.lastSeenAccountSequence >= candidate.minAccSeqNum {
txsToSubmit = append(txsToSubmit, candidate)
removeCandidateFromQueue = true
}

ch, hseq := q.head()
// if the next queued transaction has a sequence higher than the account's
// current sequence, stop removing entries
if hseq > q.nextSequence {
break
if removeCandidateFromQueue {
q.transactions = append(q.transactions[:i], q.transactions[i+1:]...)
queueWasChanged = true
} else {
// only increment the index if there was no removal
i++
}
}

// since this entry is unlocked (i.e. it's sequence is the next available
// or in the past we can remove it an mark the queue as changed
q.pop()
wasChanged = true

if hseq < q.nextSequence {
ch <- ErrBadSequence
close(ch)
} else if hseq == q.nextSequence {
ch <- nil
close(ch)
}
// To maximize successful submission opportunity, submit transactions by the account sequence
// which would result from a successful submission (i.e. maxAccSeqNum+1)
sort.Slice(txsToSubmit, func(i, j int) bool {
return txsToSubmit[i].maxAccSeqNum < txsToSubmit[j].maxAccSeqNum
})
for _, tx := range txsToSubmit {
tx.notifyBackChan <- nil
close(tx.notifyBackChan)
}

// if we modified the queue, bump the timeout for this queue
if wasChanged {
if queueWasChanged {
q.lastActiveAt = time.Now()
return
}

// if the queue wasn't changed, see if it is too old, clear
// it and make room for other's
// it and make room for other submissions
if time.Since(q.lastActiveAt) > q.timeout {
for q.Size() > 0 {
ch, _ := q.pop()
ch <- ErrBadSequence
close(ch)
for _, tx := range q.transactions {
tx.notifyBackChan <- ErrBadSequence
close(tx.notifyBackChan)
}
q.transactions = nil
}
}

// helper function for interacting with the priority queue
func (q *AccountTxSubmissionQueue) head() (chan error, uint64) {
if len(q.queue) == 0 {
return nil, uint64(0)
}

return q.queue[0].Chan, q.queue[0].Sequence
}

// helper function for interacting with the priority queue
func (q *AccountTxSubmissionQueue) pop() (chan error, uint64) {
i := heap.Pop(&q.queue).(item)

return i.Chan, i.Sequence
}

// item is a element of the priority queue
type item struct {
Sequence uint64
Chan chan error
}

// pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It
// implements heap.Interface.
type pqueue []item

func (pq pqueue) Len() int { return len(pq) }

func (pq pqueue) Less(i, j int) bool {
return pq[i].Sequence < pq[j].Sequence
}

func (pq pqueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

func (pq *pqueue) Push(x interface{}) {
*pq = append(*pq, x.(item))
}

func (pq *pqueue) Pop() interface{} {
old := *pq
n := len(old)
result := old[n-1]
*pq = old[0 : n-1]
return result
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,26 @@ func (suite *QueueTestSuite) SetupTest() {
suite.queue = NewAccountTxSubmissionQueue()
}

//Push adds the provided channel on to the priority queue
func (suite *QueueTestSuite) TestQueue_Push() {
ctx := test.Context()
_ = ctx

assert.Equal(suite.T(), 0, suite.queue.Size())

suite.queue.Push(2)
assert.Equal(suite.T(), 1, suite.queue.Size())
_, s := suite.queue.head()
assert.Equal(suite.T(), uint64(2), s)

suite.queue.Push(1)
assert.Equal(suite.T(), 2, suite.queue.Size())
_, s = suite.queue.head()
assert.Equal(suite.T(), uint64(1), s)
}

// Tests the update method
func (suite *QueueTestSuite) TestQueue_Update() {
// Tests the NotifyLastAccountSequence method
func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() {
// NotifyLastAccountSequence removes sequences that are submittable or in the past
lowMin := uint64(1)
results := []<-chan error{
suite.queue.Push(1),
suite.queue.Push(2),
suite.queue.Push(3),
suite.queue.Push(4),
suite.queue.Push(1, nil),
suite.queue.Push(2, nil),
suite.queue.Push(3, nil),
suite.queue.Push(4, nil),
suite.queue.Push(4, &lowMin),
}

suite.queue.NotifyLastAccountSequence(2)

// the update above signifies that 2 is the accounts current sequence,
// meaning that 3 is submittable, and so only 4 should still be queued
// meaning that 3 is submittable, and so only 4 (Min/maxAccSeqNum=3) should remain
assert.Equal(suite.T(), 1, suite.queue.Size())
_, s := suite.queue.head()
assert.Equal(suite.T(), uint64(4), s)
entry := suite.queue.transactions[0]
assert.Equal(suite.T(), uint64(3), entry.minAccSeqNum)
assert.Equal(suite.T(), uint64(3), entry.maxAccSeqNum)

suite.queue.NotifyLastAccountSequence(4)
assert.Equal(suite.T(), 0, suite.queue.Size())
Expand All @@ -66,10 +51,11 @@ func (suite *QueueTestSuite) TestQueue_Update() {
assert.Equal(suite.T(), ErrBadSequence, <-results[1])
assert.Equal(suite.T(), nil, <-results[2])
assert.Equal(suite.T(), ErrBadSequence, <-results[3])
assert.Equal(suite.T(), nil, <-results[4])

// NotifyLastAccountSequence clears the queue if the head has not been released within the time limit
suite.queue.timeout = 1 * time.Millisecond
result := suite.queue.Push(2)
result := suite.queue.Push(2, nil)
<-time.After(10 * time.Millisecond)
suite.queue.NotifyLastAccountSequence(0)

Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/txsub/sequence/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (m *Manager) String() string {
var addys []string

for addy, q := range m.queues {
addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.nextSequence))
addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.lastSeenAccountSequence))
}

return "[ " + strings.Join(addys, ",") + " ]"
Expand Down Expand Up @@ -60,7 +60,7 @@ func (m *Manager) Addresses() []string {
// Push registers an intent to submit a transaction for the provided address at
// the provided sequence. A channel is returned that will be written to when
// the requester should attempt the submission.
func (m *Manager) Push(address string, sequence uint64) <-chan error {
func (m *Manager) Push(address string, sequence uint64, minSeqNum *uint64) <-chan error {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -74,12 +74,12 @@ func (m *Manager) Push(address string, sequence uint64) <-chan error {
m.queues[address] = aq
}

return aq.Push(sequence)
return aq.Push(sequence, minSeqNum)
}

// Update notifies the manager of newly loaded account sequence information. The manager uses this information
// NotifyLastAccountSequences notifies the manager of newly loaded account sequence information. The manager uses this information
// to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic.
func (m *Manager) Update(updates map[string]uint64) {
func (m *Manager) NotifyLastAccountSequences(updates map[string]uint64) {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand Down
26 changes: 14 additions & 12 deletions services/horizon/internal/txsub/sequence/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,28 @@ import (
func TestManager_Push(t *testing.T) {
mgr := NewManager()

mgr.Push("1", 2)
mgr.Push("1", 2)
mgr.Push("1", 3)
mgr.Push("2", 2)
minSeq := uint64(1)
mgr.Push("1", 2, nil)
mgr.Push("1", 2, nil)
mgr.Push("1", 3, &minSeq)
mgr.Push("2", 2, nil)

assert.Equal(t, 4, mgr.Size())
assert.Equal(t, 3, mgr.queues["1"].Size())
assert.Equal(t, 1, mgr.queues["2"].Size())
}

// Test the NotifyLastAccountSequence method
func TestManager_Update(t *testing.T) {
// Test the NotifyLastAccountSequences method
func TestManager_NotifyLastAccountSequences(t *testing.T) {
mgr := NewManager()
minSeq := uint64(1)
results := []<-chan error{
mgr.Push("1", 2),
mgr.Push("1", 3),
mgr.Push("2", 2),
mgr.Push("1", 4, &minSeq),
mgr.Push("1", 3, nil),
mgr.Push("2", 2, nil),
}

mgr.Update(map[string]uint64{
mgr.NotifyLastAccountSequences(map[string]uint64{
"1": 1,
"2": 1,
})
Expand All @@ -47,9 +49,9 @@ func TestManager_Update(t *testing.T) {
func TestManager_PushNoMoreRoom(t *testing.T) {
mgr := NewManager()
for i := 0; i < mgr.MaxSize; i++ {
mgr.Push("1", 2)
mgr.Push("1", 2, nil)
}

assert.Equal(t, 1024, mgr.Size())
assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2))
assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2, nil))
}
Loading

0 comments on commit b6605cf

Please sign in to comment.