Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Update txsub queue to account for new CAP-21 preconditions #4301

Merged
merged 4 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sequence

import (
"container/heap"
"sort"
"time"
)

Expand All @@ -14,28 +14,31 @@ import (
// being managed, queued submissions that can be acted upon will be unblocked.
//
type AccountTxSubmissionQueue struct {
lastActiveAt time.Time
timeout time.Duration
nextSequence uint64
queue pqueue
lastActiveAt time.Time
timeout time.Duration
lastSeenAccountSequence uint64
transactions []txToSubmit
}

// txToSubmit represents a transaction being tracked by the queue
type txToSubmit struct {
minAccSeqNum uint64 // minimum account sequence required to send the transaction
maxAccSeqNum uint64 // maximum account sequence required to send the transaction
notifyBackChan chan error // submission notification channel
}

// NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue
func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue {
result := &AccountTxSubmissionQueue{
lastActiveAt: time.Now(),
timeout: 10 * time.Second,
queue: nil,
}

heap.Init(&result.queue)

return result
}

// Size returns the count of currently buffered submissions in the queue.
func (q *AccountTxSubmissionQueue) Size() int {
return len(q.queue)
return len(q.transactions)
}

// Push enqueues the intent to submit a transaction at the provided sequence
Expand All @@ -51,9 +54,19 @@ func (q *AccountTxSubmissionQueue) Size() int {
// 2. Load the current sequence number for the source account from the DB
// 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if
// possible
func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error {
func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error {
// From CAP 21: If minSeqNum is nil, the txToSubmit is only valid when sourceAccount's sequence number is seqNum - 1.
// Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < txToSubmit.seqNum.
effectiveMinSeqNum := sequence - 1
if minSeqNum != nil {
effectiveMinSeqNum = *minSeqNum
}
ch := make(chan error, 1)
heap.Push(&q.queue, item{sequence, ch})
q.transactions = append(q.transactions, txToSubmit{
minAccSeqNum: effectiveMinSeqNum,
maxAccSeqNum: sequence - 1,
notifyBackChan: ch,
})
2opremio marked this conversation as resolved.
Show resolved Hide resolved
return ch
}

Expand All @@ -63,99 +76,58 @@ func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error {
// This function is monotonic... calling it with a sequence number lower than
// the latest seen sequence number is a noop.
func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) {
if q.nextSequence <= sequence {
q.nextSequence = sequence + 1
if q.lastSeenAccountSequence < sequence {
q.lastSeenAccountSequence = sequence
}

wasChanged := false

for {
if q.Size() == 0 {
break
queueWasChanged := false

txsToSubmit := make([]txToSubmit, 0, len(q.transactions))
// Extract transactions ready to submit and notify those which are un-submittable.
for i := 0; i < len(q.transactions); {
candidate := q.transactions[i]
removeCandidateFromQueue := false
if q.lastSeenAccountSequence > candidate.maxAccSeqNum {
2opremio marked this conversation as resolved.
Show resolved Hide resolved
// this transaction can never be submitted because account sequence numbers only grow
candidate.notifyBackChan <- ErrBadSequence
close(candidate.notifyBackChan)
removeCandidateFromQueue = true
} else if q.lastSeenAccountSequence >= candidate.minAccSeqNum {
txsToSubmit = append(txsToSubmit, candidate)
removeCandidateFromQueue = true
}

ch, hseq := q.head()
// if the next queued transaction has a sequence higher than the account's
// current sequence, stop removing entries
if hseq > q.nextSequence {
break
if removeCandidateFromQueue {
q.transactions = append(q.transactions[:i], q.transactions[i+1:]...)
queueWasChanged = true
} else {
// only increment the index if there was no removal
i++
}
}

// since this entry is unlocked (i.e. it's sequence is the next available
// or in the past we can remove it an mark the queue as changed
q.pop()
wasChanged = true

if hseq < q.nextSequence {
ch <- ErrBadSequence
close(ch)
} else if hseq == q.nextSequence {
ch <- nil
close(ch)
}
// To maximize successful submission opportunity, submit transactions by the account sequence
// which would result from a successful submission (i.e. maxAccSeqNum+1)
sort.Slice(txsToSubmit, func(i, j int) bool {
return txsToSubmit[i].maxAccSeqNum < txsToSubmit[j].maxAccSeqNum
})
2opremio marked this conversation as resolved.
Show resolved Hide resolved
for _, tx := range txsToSubmit {
tx.notifyBackChan <- nil
close(tx.notifyBackChan)
}

// if we modified the queue, bump the timeout for this queue
if wasChanged {
if queueWasChanged {
q.lastActiveAt = time.Now()
return
}

// if the queue wasn't changed, see if it is too old, clear
// it and make room for other's
// it and make room for other submissions
if time.Since(q.lastActiveAt) > q.timeout {
for q.Size() > 0 {
ch, _ := q.pop()
ch <- ErrBadSequence
close(ch)
for _, tx := range q.transactions {
tx.notifyBackChan <- ErrBadSequence
close(tx.notifyBackChan)
}
q.transactions = nil
}
}

// helper function for interacting with the priority queue
func (q *AccountTxSubmissionQueue) head() (chan error, uint64) {
if len(q.queue) == 0 {
return nil, uint64(0)
}

return q.queue[0].Chan, q.queue[0].Sequence
}

// helper function for interacting with the priority queue
func (q *AccountTxSubmissionQueue) pop() (chan error, uint64) {
i := heap.Pop(&q.queue).(item)

return i.Chan, i.Sequence
}

// item is a element of the priority queue
type item struct {
Sequence uint64
Chan chan error
}

// pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It
// implements heap.Interface.
type pqueue []item

func (pq pqueue) Len() int { return len(pq) }

func (pq pqueue) Less(i, j int) bool {
return pq[i].Sequence < pq[j].Sequence
}

func (pq pqueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

func (pq *pqueue) Push(x interface{}) {
*pq = append(*pq, x.(item))
}

func (pq *pqueue) Pop() interface{} {
old := *pq
n := len(old)
result := old[n-1]
*pq = old[0 : n-1]
return result
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,26 @@ func (suite *QueueTestSuite) SetupTest() {
suite.queue = NewAccountTxSubmissionQueue()
}

//Push adds the provided channel on to the priority queue
func (suite *QueueTestSuite) TestQueue_Push() {
ctx := test.Context()
_ = ctx

assert.Equal(suite.T(), 0, suite.queue.Size())

suite.queue.Push(2)
assert.Equal(suite.T(), 1, suite.queue.Size())
_, s := suite.queue.head()
assert.Equal(suite.T(), uint64(2), s)

suite.queue.Push(1)
assert.Equal(suite.T(), 2, suite.queue.Size())
_, s = suite.queue.head()
assert.Equal(suite.T(), uint64(1), s)
}

// Tests the update method
func (suite *QueueTestSuite) TestQueue_Update() {
// Tests the NotifyLastAccountSequence method
func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() {
// NotifyLastAccountSequence removes sequences that are submittable or in the past
lowMin := uint64(1)
results := []<-chan error{
suite.queue.Push(1),
suite.queue.Push(2),
suite.queue.Push(3),
suite.queue.Push(4),
suite.queue.Push(1, nil),
suite.queue.Push(2, nil),
suite.queue.Push(3, nil),
suite.queue.Push(4, nil),
suite.queue.Push(4, &lowMin),
}

suite.queue.NotifyLastAccountSequence(2)

// the update above signifies that 2 is the accounts current sequence,
// meaning that 3 is submittable, and so only 4 should still be queued
// meaning that 3 is submittable, and so only 4 (Min/maxAccSeqNum=3) should remain
assert.Equal(suite.T(), 1, suite.queue.Size())
_, s := suite.queue.head()
assert.Equal(suite.T(), uint64(4), s)
entry := suite.queue.transactions[0]
assert.Equal(suite.T(), uint64(3), entry.minAccSeqNum)
assert.Equal(suite.T(), uint64(3), entry.maxAccSeqNum)

suite.queue.NotifyLastAccountSequence(4)
assert.Equal(suite.T(), 0, suite.queue.Size())
Expand All @@ -66,10 +51,11 @@ func (suite *QueueTestSuite) TestQueue_Update() {
assert.Equal(suite.T(), ErrBadSequence, <-results[1])
assert.Equal(suite.T(), nil, <-results[2])
assert.Equal(suite.T(), ErrBadSequence, <-results[3])
assert.Equal(suite.T(), nil, <-results[4])

// NotifyLastAccountSequence clears the queue if the head has not been released within the time limit
suite.queue.timeout = 1 * time.Millisecond
result := suite.queue.Push(2)
result := suite.queue.Push(2, nil)
<-time.After(10 * time.Millisecond)
suite.queue.NotifyLastAccountSequence(0)

Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/txsub/sequence/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (m *Manager) String() string {
var addys []string

for addy, q := range m.queues {
addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.nextSequence))
addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.lastSeenAccountSequence))
}

return "[ " + strings.Join(addys, ",") + " ]"
Expand Down Expand Up @@ -60,7 +60,7 @@ func (m *Manager) Addresses() []string {
// Push registers an intent to submit a transaction for the provided address at
// the provided sequence. A channel is returned that will be written to when
// the requester should attempt the submission.
func (m *Manager) Push(address string, sequence uint64) <-chan error {
func (m *Manager) Push(address string, sequence uint64, minSeqNum *uint64) <-chan error {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -74,12 +74,12 @@ func (m *Manager) Push(address string, sequence uint64) <-chan error {
m.queues[address] = aq
}

return aq.Push(sequence)
return aq.Push(sequence, minSeqNum)
}

// Update notifies the manager of newly loaded account sequence information. The manager uses this information
// NotifyLastAccountSequences notifies the manager of newly loaded account sequence information. The manager uses this information
// to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic.
func (m *Manager) Update(updates map[string]uint64) {
func (m *Manager) NotifyLastAccountSequences(updates map[string]uint64) {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand Down
26 changes: 14 additions & 12 deletions services/horizon/internal/txsub/sequence/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,28 @@ import (
func TestManager_Push(t *testing.T) {
mgr := NewManager()

mgr.Push("1", 2)
mgr.Push("1", 2)
mgr.Push("1", 3)
mgr.Push("2", 2)
minSeq := uint64(1)
mgr.Push("1", 2, nil)
mgr.Push("1", 2, nil)
mgr.Push("1", 3, &minSeq)
mgr.Push("2", 2, nil)

assert.Equal(t, 4, mgr.Size())
assert.Equal(t, 3, mgr.queues["1"].Size())
assert.Equal(t, 1, mgr.queues["2"].Size())
}

// Test the NotifyLastAccountSequence method
func TestManager_Update(t *testing.T) {
// Test the NotifyLastAccountSequences method
func TestManager_NotifyLastAccountSequences(t *testing.T) {
mgr := NewManager()
minSeq := uint64(1)
results := []<-chan error{
mgr.Push("1", 2),
mgr.Push("1", 3),
mgr.Push("2", 2),
mgr.Push("1", 4, &minSeq),
mgr.Push("1", 3, nil),
mgr.Push("2", 2, nil),
}

mgr.Update(map[string]uint64{
mgr.NotifyLastAccountSequences(map[string]uint64{
"1": 1,
"2": 1,
})
Expand All @@ -47,9 +49,9 @@ func TestManager_Update(t *testing.T) {
func TestManager_PushNoMoreRoom(t *testing.T) {
mgr := NewManager()
for i := 0; i < mgr.MaxSize; i++ {
mgr.Push("1", 2)
mgr.Push("1", 2, nil)
}

assert.Equal(t, 1024, mgr.Size())
assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2))
assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2, nil))
}
Loading