Skip to content

Commit

Permalink
Update txsub queue to account for new CAP-21 preconditions
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Mar 22, 2022
1 parent 5580e6e commit b5d5182
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
// being managed, queued submissions that can be acted upon will be unblocked.
//
type AccountTxSubmissionQueue struct {
lastActiveAt time.Time
timeout time.Duration
nextSequence uint64
queue pqueue
lastActiveAt time.Time
timeout time.Duration
lastSeenAccountSequence uint64
queue pqueue
}

// NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue
Expand Down Expand Up @@ -51,9 +51,19 @@ func (q *AccountTxSubmissionQueue) Size() int {
// 2. Load the current sequence number for the source account from the DB
// 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if
// possible
func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error {
func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error {
ch := make(chan error, 1)
heap.Push(&q.queue, item{sequence, ch})
// From CAP 19: If minSeqNum is nil, the tx is only valid when sourceAccount's sequence number is seqNum - 1.
// Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < tx.seqNum.
effectiveMinSeqNum := sequence - 1
if minSeqNum != nil {
effectiveMinSeqNum = *minSeqNum
}
heap.Push(&q.queue, item{
MinAccSeqNum: effectiveMinSeqNum,
MaxAccSeqNum: sequence - 1,
Chan: ch,
})
return ch
}

Expand All @@ -63,35 +73,34 @@ func (q *AccountTxSubmissionQueue) Push(sequence uint64) <-chan error {
// This function is monotonic... calling it with a sequence number lower than
// the latest seen sequence number is a noop.
func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) {
if q.nextSequence <= sequence {
q.nextSequence = sequence + 1
if q.lastSeenAccountSequence <= sequence {
q.lastSeenAccountSequence = sequence
}

wasChanged := false

for {
if q.Size() == 0 {
break
}
// We need to traverse the full queue (ordered by MaxAccSeqNum)
// in case there is a transaction with a submittable MinSeqNum we can use later on.
for i := 0; i < q.Size(); {
tx := q.queue[i]

ch, hseq := q.head()
// if the next queued transaction has a sequence higher than the account's
// current sequence, stop removing entries
if hseq > q.nextSequence {
break
removeWithErr := func(err error) {
tx.Chan <- err
close(tx.Chan)
wasChanged = true
heap.Remove(&q.queue, i)
}

// since this entry is unlocked (i.e. it's sequence is the next available
// or in the past we can remove it an mark the queue as changed
q.pop()
wasChanged = true

if hseq < q.nextSequence {
ch <- ErrBadSequence
close(ch)
} else if hseq == q.nextSequence {
ch <- nil
close(ch)
if q.lastSeenAccountSequence > tx.MaxAccSeqNum {
// The transaction and account sequences will never match
removeWithErr(ErrBadSequence)
} else if q.lastSeenAccountSequence >= tx.MinAccSeqNum {
// within range, ready to submit!
removeWithErr(nil)
} else {
// we only need to increment the heap index when we don't remove
// an item
i++
}
}

Expand All @@ -105,33 +114,18 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) {
// it and make room for other's
if time.Since(q.lastActiveAt) > q.timeout {
for q.Size() > 0 {
ch, _ := q.pop()
ch <- ErrBadSequence
close(ch)
entry := q.queue.Pop().(item)
entry.Chan <- ErrBadSequence
close(entry.Chan)
}
}
}

// helper function for interacting with the priority queue
func (q *AccountTxSubmissionQueue) head() (chan error, uint64) {
if len(q.queue) == 0 {
return nil, uint64(0)
}

return q.queue[0].Chan, q.queue[0].Sequence
}

// helper function for interacting with the priority queue
func (q *AccountTxSubmissionQueue) pop() (chan error, uint64) {
i := heap.Pop(&q.queue).(item)

return i.Chan, i.Sequence
}

// item is a element of the priority queue
type item struct {
Sequence uint64
Chan chan error
MinAccSeqNum uint64 // minimum account sequence required to send the transaction
MaxAccSeqNum uint64 // maximum account sequence required to send the transaction
Chan chan error
}

// pqueue is a priority queue used by AccountTxSubmissionQueue to manage buffered submissions. It
Expand All @@ -141,7 +135,14 @@ type pqueue []item
func (pq pqueue) Len() int { return len(pq) }

func (pq pqueue) Less(i, j int) bool {
return pq[i].Sequence < pq[j].Sequence
// To maximize tx submission opportunity, order transactions by the account sequence
// which would result from successful submission (MaxAccSeqNum+1) but,
// if those are the same, by higher minimum sequence since there is less margin to send those
// (a smaller interval).
if pq[i].MaxAccSeqNum != pq[j].MaxAccSeqNum {
return pq[i].MaxAccSeqNum < pq[j].MaxAccSeqNum
}
return pq[i].MinAccSeqNum > pq[j].MinAccSeqNum
}

func (pq pqueue) Swap(i, j int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,52 @@ func (suite *QueueTestSuite) TestQueue_Push() {

assert.Equal(suite.T(), 0, suite.queue.Size())

suite.queue.Push(2)
suite.queue.Push(6, nil)
assert.Equal(suite.T(), 1, suite.queue.Size())
_, s := suite.queue.head()
assert.Equal(suite.T(), uint64(2), s)
entry := suite.queue.queue[0]
assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum)

suite.queue.Push(1)
min := uint64(1)
suite.queue.Push(5, &min)
assert.Equal(suite.T(), 2, suite.queue.Size())
_, s = suite.queue.head()
assert.Equal(suite.T(), uint64(1), s)
entry = suite.queue.queue[0]
assert.Equal(suite.T(), uint64(1), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum)

suite.queue.Push(5, nil)
assert.Equal(suite.T(), 3, suite.queue.Size())
entry = suite.queue.queue[0]
assert.Equal(suite.T(), uint64(4), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum)

suite.queue.Push(4, nil)
assert.Equal(suite.T(), 4, suite.queue.Size())
entry = suite.queue.queue[0]
assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum)
}

// Tests the update method
func (suite *QueueTestSuite) TestQueue_Update() {
// Tests the NotifyLastAccountSequence method
func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() {
// NotifyLastAccountSequence removes sequences that are submittable or in the past
lowMin := uint64(1)
results := []<-chan error{
suite.queue.Push(1),
suite.queue.Push(2),
suite.queue.Push(3),
suite.queue.Push(4),
suite.queue.Push(1, nil),
suite.queue.Push(2, nil),
suite.queue.Push(3, nil),
suite.queue.Push(4, nil),
suite.queue.Push(4, &lowMin),
}

suite.queue.NotifyLastAccountSequence(2)

// the update above signifies that 2 is the accounts current sequence,
// meaning that 3 is submittable, and so only 4 should still be queued
// meaning that 3 is submittable, and so only 4 (Min/MaxAccSeqNum=3) should remain
assert.Equal(suite.T(), 1, suite.queue.Size())
_, s := suite.queue.head()
assert.Equal(suite.T(), uint64(4), s)
entry := suite.queue.queue[0]
assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum)

suite.queue.NotifyLastAccountSequence(4)
assert.Equal(suite.T(), 0, suite.queue.Size())
Expand All @@ -66,10 +84,11 @@ func (suite *QueueTestSuite) TestQueue_Update() {
assert.Equal(suite.T(), ErrBadSequence, <-results[1])
assert.Equal(suite.T(), nil, <-results[2])
assert.Equal(suite.T(), ErrBadSequence, <-results[3])
assert.Equal(suite.T(), nil, <-results[4])

// NotifyLastAccountSequence clears the queue if the head has not been released within the time limit
suite.queue.timeout = 1 * time.Millisecond
result := suite.queue.Push(2)
result := suite.queue.Push(2, nil)
<-time.After(10 * time.Millisecond)
suite.queue.NotifyLastAccountSequence(0)

Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/txsub/sequence/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (m *Manager) String() string {
var addys []string

for addy, q := range m.queues {
addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.nextSequence))
addys = append(addys, fmt.Sprintf("%5s:%d", addy, q.lastSeenAccountSequence))
}

return "[ " + strings.Join(addys, ",") + " ]"
Expand Down Expand Up @@ -60,7 +60,7 @@ func (m *Manager) Addresses() []string {
// Push registers an intent to submit a transaction for the provided address at
// the provided sequence. A channel is returned that will be written to when
// the requester should attempt the submission.
func (m *Manager) Push(address string, sequence uint64) <-chan error {
func (m *Manager) Push(address string, sequence uint64, minSeqNum *uint64) <-chan error {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -74,12 +74,12 @@ func (m *Manager) Push(address string, sequence uint64) <-chan error {
m.queues[address] = aq
}

return aq.Push(sequence)
return aq.Push(sequence, minSeqNum)
}

// Update notifies the manager of newly loaded account sequence information. The manager uses this information
// NotifyLastAccountSequences notifies the manager of newly loaded account sequence information. The manager uses this information
// to notify requests to submit that they should proceed. See AccountTxSubmissionQueue#NotifyLastAccountSequence for the actual meat of the logic.
func (m *Manager) Update(updates map[string]uint64) {
func (m *Manager) NotifyLastAccountSequences(updates map[string]uint64) {
m.mutex.Lock()
defer m.mutex.Unlock()

Expand Down
26 changes: 14 additions & 12 deletions services/horizon/internal/txsub/sequence/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,28 @@ import (
func TestManager_Push(t *testing.T) {
mgr := NewManager()

mgr.Push("1", 2)
mgr.Push("1", 2)
mgr.Push("1", 3)
mgr.Push("2", 2)
minSeq := uint64(1)
mgr.Push("1", 2, nil)
mgr.Push("1", 2, nil)
mgr.Push("1", 3, &minSeq)
mgr.Push("2", 2, nil)

assert.Equal(t, 4, mgr.Size())
assert.Equal(t, 3, mgr.queues["1"].Size())
assert.Equal(t, 1, mgr.queues["2"].Size())
}

// Test the NotifyLastAccountSequence method
func TestManager_Update(t *testing.T) {
// Test the NotifyLastAccountSequences method
func TestManager_NotifyLastAccountSequences(t *testing.T) {
mgr := NewManager()
minSeq := uint64(1)
results := []<-chan error{
mgr.Push("1", 2),
mgr.Push("1", 3),
mgr.Push("2", 2),
mgr.Push("1", 4, &minSeq),
mgr.Push("1", 3, nil),
mgr.Push("2", 2, nil),
}

mgr.Update(map[string]uint64{
mgr.NotifyLastAccountSequences(map[string]uint64{
"1": 1,
"2": 1,
})
Expand All @@ -47,9 +49,9 @@ func TestManager_Update(t *testing.T) {
func TestManager_PushNoMoreRoom(t *testing.T) {
mgr := NewManager()
for i := 0; i < mgr.MaxSize; i++ {
mgr.Push("1", 2)
mgr.Push("1", 2, nil)
}

assert.Equal(t, 1024, mgr.Size())
assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2))
assert.Equal(t, ErrNoMoreRoom, <-mgr.Push("1", 2, nil))
}
8 changes: 4 additions & 4 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ func (sys *System) Submit(

// queue the submission and get the channel that will emit when
// submission is valid
seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum()))
seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum()), nil)

// update the submission queue with the source accounts current sequence value
// which will cause the channel returned by Push() to emit if possible.
sys.SubmissionQueue.Update(map[string]uint64{
sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{
sourceAddress: sequenceNumber,
})

Expand All @@ -160,7 +160,7 @@ func (sys *System) Submit(
// add transactions to open list
sys.Pending.Add(ctx, hash, response)
// update the submission queue, allowing the next submission to proceed
sys.SubmissionQueue.Update(map[string]uint64{
sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{
sourceAddress: uint64(envelope.SeqNum()),
})
return
Expand Down Expand Up @@ -326,7 +326,7 @@ func (sys *System) Tick(ctx context.Context) {
logger.WithStack(err).Error(err)
return
} else {
sys.SubmissionQueue.Update(curSeq)
sys.SubmissionQueue.NotifyLastAccountSequences(curSeq)
}
}

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (suite *SystemTestSuite) TestTick_Deadlock() {
suite.db.On("Rollback").Return(nil).Once()

// Start first Tick
suite.system.SubmissionQueue.Push("address", 0)
suite.system.SubmissionQueue.Push("address", 0, nil)
suite.db.On("GetSequenceNumbers", suite.ctx, []string{"address"}).
Return(map[string]uint64{}, nil).
Run(func(args mock.Arguments) {
Expand Down

0 comments on commit b5d5182

Please sign in to comment.