Skip to content

Commit

Permalink
Use an unsorted slice
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Mar 23, 2022
1 parent 00d541d commit f6f4e82
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,28 @@ type AccountTxSubmissionQueue struct {
lastActiveAt time.Time
timeout time.Duration
lastSeenAccountSequence uint64
queue submissionQueue
transactions []txToSubmit
}

// txToSubmit represents a transaction being tracked by the queue
type txToSubmit struct {
minAccSeqNum uint64 // minimum account sequence required to send the transaction
maxAccSeqNum uint64 // maximum account sequence required to send the transaction
notifyBackChan chan error // submission notification channel
}

// NewAccountTxSubmissionQueue creates a new *AccountTxSubmissionQueue
func NewAccountTxSubmissionQueue() *AccountTxSubmissionQueue {
result := &AccountTxSubmissionQueue{
lastActiveAt: time.Now(),
timeout: 10 * time.Second,
queue: nil,
}
return result
}

// Size returns the count of currently buffered submissions in the queue.
func (q *AccountTxSubmissionQueue) Size() int {
return len(q.queue)
return len(q.transactions)
}

// Push enqueues the intent to submit a transaction at the provided sequence
Expand All @@ -49,17 +55,17 @@ func (q *AccountTxSubmissionQueue) Size() int {
// 3. Call NotifyLastAccountSequence() with the result from step 2 to trigger the submission if
// possible
func (q *AccountTxSubmissionQueue) Push(sequence uint64, minSeqNum *uint64) <-chan error {
ch := make(chan error, 1)
// From CAP 21: If minSeqNum is nil, the txToSubmit is only valid when sourceAccount's sequence number is seqNum - 1.
// Otherwise, valid when sourceAccount's sequence number n satisfies minSeqNum <= n < txToSubmit.seqNum.
effectiveMinSeqNum := sequence - 1
if minSeqNum != nil {
effectiveMinSeqNum = *minSeqNum
}
q.queue.insert(txToSubmit{
MinAccSeqNum: effectiveMinSeqNum,
MaxAccSeqNum: sequence - 1,
Chan: ch,
ch := make(chan error, 1)
q.transactions = append(q.transactions, txToSubmit{
minAccSeqNum: effectiveMinSeqNum,
maxAccSeqNum: sequence - 1,
notifyBackChan: ch,
})
return ch
}
Expand All @@ -74,83 +80,54 @@ func (q *AccountTxSubmissionQueue) NotifyLastAccountSequence(sequence uint64) {
q.lastSeenAccountSequence = sequence
}

wasChanged := false

// We need to traverse the full queue (ordered by MaxAccSeqNum)
// in case there is a transaction with a submittable MinSeqNum we can use later on.
for i := 0; i < len(q.queue); {
tx := q.queue[i]

removeWithErr := func(err error) {
tx.Chan <- err
close(tx.Chan)
wasChanged = true
q.queue.remove(i)
queueWasChanged := false

txsToSubmit := make([]txToSubmit, 0, len(q.transactions))
// Extract transactions ready to submit and notify those which are un-submittable.
for i := 0; i < len(q.transactions); {
candidate := q.transactions[i]
removeCandidateFromQueue := false
if q.lastSeenAccountSequence > candidate.maxAccSeqNum {
// this transaction can never be submitted because account sequence numbers only grow
candidate.notifyBackChan <- ErrBadSequence
close(candidate.notifyBackChan)
removeCandidateFromQueue = true
} else if q.lastSeenAccountSequence >= candidate.minAccSeqNum {
txsToSubmit = append(txsToSubmit, candidate)
removeCandidateFromQueue = true
}

if q.lastSeenAccountSequence > tx.MaxAccSeqNum {
// The transaction and account sequences will never match
removeWithErr(ErrBadSequence)
} else if q.lastSeenAccountSequence >= tx.MinAccSeqNum {
// within range, ready to submit!
removeWithErr(nil)
if removeCandidateFromQueue {
q.transactions = append(q.transactions[:i], q.transactions[i+1:]...)
queueWasChanged = true
} else {
// we only need to increment the index when we don't remove
// a transaction
// only increment the index if there was no removal
i++
}
}

// To maximize successful submission opportunity, submit transactions by the account sequence
// which would result from a successful submission (i.e. maxAccSeqNum+1)
sort.Slice(txsToSubmit, func(i, j int) bool {
return txsToSubmit[i].maxAccSeqNum < txsToSubmit[j].maxAccSeqNum
})
for _, tx := range txsToSubmit {
tx.notifyBackChan <- nil
close(tx.notifyBackChan)
}

// if we modified the queue, bump the timeout for this queue
if wasChanged {
if queueWasChanged {
q.lastActiveAt = time.Now()
return
}

// if the queue wasn't changed, see if it is too old, clear
// it and make room for other submissions
if time.Since(q.lastActiveAt) > q.timeout {
for i := 0; i < len(q.queue); i++ {
c := q.queue[i].Chan
c <- ErrBadSequence
close(c)
for _, tx := range q.transactions {
tx.notifyBackChan <- ErrBadSequence
close(tx.notifyBackChan)
}
q.queue = nil
q.transactions = nil
}
}

// txToSubmit represents a transaction being tracked in submissionQueue
type txToSubmit struct {
MinAccSeqNum uint64 // minimum account sequence required to send the transaction
MaxAccSeqNum uint64 // maximum account sequence required to send the transaction
Chan chan error
}

// submissionQueue is a priority queue (implemented as a sorted slice),
// used to track the submission order of transactions.
// Lower indices have higher submission priority.
type submissionQueue []txToSubmit

func (sq *submissionQueue) insert(tx txToSubmit) {
txHasLessPriorityThanIth := func(i int) bool {
// To maximize transaction submission opportunity, we prioritize transactions by the account sequence
// which would result from successful submission (i.e. MaxAccSeqNum+1) but,
// if those are the same, by higher minimum sequence since there is less margin to send those
// (a smaller interval).
if tx.MaxAccSeqNum != (*sq)[i].MaxAccSeqNum {
return tx.MaxAccSeqNum < (*sq)[i].MaxAccSeqNum
}
return tx.MinAccSeqNum > (*sq)[i].MinAccSeqNum
}
i := sort.Search(len(*sq), txHasLessPriorityThanIth)
if len(*sq) == i {
*sq = append(*sq, tx)
return
}
*sq = append((*sq)[:i+1], (*sq)[i:]...)
(*sq)[i] = tx
}

func (sq *submissionQueue) remove(i int) {
*sq = append((*sq)[:i], (*sq)[i+1:]...)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,6 @@ func (suite *QueueTestSuite) SetupTest() {
suite.queue = NewAccountTxSubmissionQueue()
}

//Push adds the provided channel on to the priority queue
func (suite *QueueTestSuite) TestQueue_Push() {
ctx := test.Context()
_ = ctx

assert.Equal(suite.T(), 0, suite.queue.Size())

suite.queue.Push(6, nil)
assert.Equal(suite.T(), 1, suite.queue.Size())
entry := suite.queue.queue[0]
assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(5), entry.MinAccSeqNum)

min := uint64(1)
suite.queue.Push(5, &min)
assert.Equal(suite.T(), 2, suite.queue.Size())
entry = suite.queue.queue[0]
assert.Equal(suite.T(), uint64(1), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum)

suite.queue.Push(5, nil)
assert.Equal(suite.T(), 3, suite.queue.Size())
entry = suite.queue.queue[0]
assert.Equal(suite.T(), uint64(4), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(4), entry.MaxAccSeqNum)

suite.queue.Push(4, nil)
assert.Equal(suite.T(), 4, suite.queue.Size())
entry = suite.queue.queue[0]
assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum)
}

// Tests the NotifyLastAccountSequence method
func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() {
// NotifyLastAccountSequence removes sequences that are submittable or in the past
Expand All @@ -71,11 +38,11 @@ func (suite *QueueTestSuite) TestQueue_NotifyLastAccountSequence() {
suite.queue.NotifyLastAccountSequence(2)

// the update above signifies that 2 is the accounts current sequence,
// meaning that 3 is submittable, and so only 4 (Min/MaxAccSeqNum=3) should remain
// meaning that 3 is submittable, and so only 4 (Min/maxAccSeqNum=3) should remain
assert.Equal(suite.T(), 1, suite.queue.Size())
entry := suite.queue.queue[0]
assert.Equal(suite.T(), uint64(3), entry.MinAccSeqNum)
assert.Equal(suite.T(), uint64(3), entry.MaxAccSeqNum)
entry := suite.queue.transactions[0]
assert.Equal(suite.T(), uint64(3), entry.minAccSeqNum)
assert.Equal(suite.T(), uint64(3), entry.maxAccSeqNum)

suite.queue.NotifyLastAccountSequence(4)
assert.Equal(suite.T(), 0, suite.queue.Size())
Expand Down

0 comments on commit f6f4e82

Please sign in to comment.