Skip to content

Commit

Permalink
horizon: Small transaction submission refactoring (#4344)
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored Apr 21, 2022
1 parent b4d5609 commit 18ab1d8
Showing 1 changed file with 40 additions and 42 deletions.
82 changes: 40 additions & 42 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func (sys *System) Submit(
rawTx string,
envelope xdr.TransactionEnvelope,
hash string,
) (result <-chan Result) {
) (resultReadCh <-chan Result) {
sys.Init()
response := make(chan Result, 1)
result = response
resultCh := make(chan Result, 1)
resultReadCh = resultCh

db := sys.DB(ctx)
// The database doesn't (yet) store muxed accounts, so we query
Expand All @@ -117,19 +117,19 @@ func (sys *System) Submit(
minSeqNum := envelope.MinSeqNum()
// Ensure sequence numbers make sense
if seqNum < 0 || (minSeqNum != nil && (*minSeqNum < 0 || *minSeqNum >= seqNum)) {
sys.finish(ctx, hash, response, Result{Err: ErrBadSequence})
sys.finish(ctx, hash, resultCh, Result{Err: ErrBadSequence})
return
}

tx, sequenceNumber, err := checkTxAlreadyExists(ctx, db, hash, sourceAddress)
if err == nil {
sys.Log.Ctx(ctx).WithField("hash", hash).Info("Found submission result in a DB")
sys.finish(ctx, hash, response, Result{Transaction: tx})
sys.finish(ctx, hash, resultCh, Result{Transaction: tx})
return
}
if err != ErrNoResults {
sys.Log.Ctx(ctx).WithField("hash", hash).Info("Error getting submission result from a DB")
sys.finish(ctx, hash, response, Result{Transaction: tx, Err: err})
sys.finish(ctx, hash, resultCh, Result{Transaction: tx, Err: err})
return
}

Expand All @@ -140,7 +140,7 @@ func (sys *System) Submit(
uMinSeqNum := uint64(*minSeqNum)
pMinSeqNum = &uMinSeqNum
}
seq := sys.SubmissionQueue.Push(sourceAddress, uint64(seqNum), pMinSeqNum)
submissionWait := sys.SubmissionQueue.Push(sourceAddress, uint64(seqNum), pMinSeqNum)

// update the submission queue with the source accounts current sequence value
// which will cause the channel returned by Push() to emit if possible.
Expand All @@ -149,59 +149,57 @@ func (sys *System) Submit(
})

select {
case err := <-seq:
case err := <-submissionWait:
if err == sequence.ErrBadSequence {
// convert the internal only ErrBadSequence into the FailedTransactionError
err = ErrBadSequence
}

if err != nil {
sys.finish(ctx, hash, response, Result{Err: err})
sys.finish(ctx, hash, resultCh, Result{Err: err})
return
}

sr := sys.submitOnce(ctx, rawTx)
sys.updateTransactionTypeMetrics(envelope)

// if submission succeeded
if sr.Err == nil {
// add transactions to open list
sys.Pending.Add(ctx, hash, response)
// update the submission queue, allowing the next submission to proceed
sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{
sourceAddress: uint64(envelope.SeqNum()),
})
return
}
if sr.Err != nil {
// any error other than "txBAD_SEQ" is a failure
isBad, err := sr.IsBadSeq()
if err != nil {
sys.finish(ctx, hash, resultCh, Result{Err: err})
return
}
if !isBad {
sys.finish(ctx, hash, resultCh, Result{Err: sr.Err})
return
}

// any error other than "txBAD_SEQ" is a failure
isBad, err := sr.IsBadSeq()
if err != nil {
sys.finish(ctx, hash, response, Result{Err: err})
return
}
if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil {
sys.finish(ctx, hash, resultCh, Result{Err: err})
return
}

if !isBad {
sys.finish(ctx, hash, response, Result{Err: sr.Err})
return
}
if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil {
sys.finish(ctx, hash, response, Result{Err: err})
// If error is txBAD_SEQ, check for the result again
tx, err = txResultByHash(ctx, db, hash)
if err != nil {
// finally, return the bad_seq error if no result was found on 2nd attempt
sys.finish(ctx, hash, resultCh, Result{Err: sr.Err})
return
}
// If we found the result, use it as the result
sys.finish(ctx, hash, resultCh, Result{Transaction: tx})
return
}

// If error is txBAD_SEQ, check for the result again
tx, err = txResultByHash(ctx, db, hash)
if err == nil {
// If the found use it as the result
sys.finish(ctx, hash, response, Result{Transaction: tx})
} else {
// finally, return the bad_seq error if no result was found on 2nd attempt
sys.finish(ctx, hash, response, Result{Err: sr.Err})
}

// add transactions to open list
sys.Pending.Add(ctx, hash, resultCh)
// update the submission queue, allowing the next submission to proceed
sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{
sourceAddress: uint64(envelope.SeqNum()),
})
case <-ctx.Done():
sys.finish(ctx, hash, response, Result{Err: sys.deriveTxSubError(ctx)})
sys.finish(ctx, hash, resultCh, Result{Err: sys.deriveTxSubError(ctx)})
}

return
Expand Down

0 comments on commit 18ab1d8

Please sign in to comment.