diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 4d9716ba4b..3b47c054c0 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -96,10 +96,10 @@ func (sys *System) Submit( rawTx string, envelope xdr.TransactionEnvelope, hash string, -) (result <-chan Result) { +) (resultReadCh <-chan Result) { sys.Init() - response := make(chan Result, 1) - result = response + resultCh := make(chan Result, 1) + resultReadCh = resultCh db := sys.DB(ctx) // The database doesn't (yet) store muxed accounts, so we query @@ -117,19 +117,19 @@ func (sys *System) Submit( minSeqNum := envelope.MinSeqNum() // Ensure sequence numbers make sense if seqNum < 0 || (minSeqNum != nil && (*minSeqNum < 0 || *minSeqNum >= seqNum)) { - sys.finish(ctx, hash, response, Result{Err: ErrBadSequence}) + sys.finish(ctx, hash, resultCh, Result{Err: ErrBadSequence}) return } tx, sequenceNumber, err := checkTxAlreadyExists(ctx, db, hash, sourceAddress) if err == nil { sys.Log.Ctx(ctx).WithField("hash", hash).Info("Found submission result in a DB") - sys.finish(ctx, hash, response, Result{Transaction: tx}) + sys.finish(ctx, hash, resultCh, Result{Transaction: tx}) return } if err != ErrNoResults { sys.Log.Ctx(ctx).WithField("hash", hash).Info("Error getting submission result from a DB") - sys.finish(ctx, hash, response, Result{Transaction: tx, Err: err}) + sys.finish(ctx, hash, resultCh, Result{Transaction: tx, Err: err}) return } @@ -140,7 +140,7 @@ func (sys *System) Submit( uMinSeqNum := uint64(*minSeqNum) pMinSeqNum = &uMinSeqNum } - seq := sys.SubmissionQueue.Push(sourceAddress, uint64(seqNum), pMinSeqNum) + submissionWait := sys.SubmissionQueue.Push(sourceAddress, uint64(seqNum), pMinSeqNum) // update the submission queue with the source accounts current sequence value // which will cause the channel returned by Push() to emit if possible. @@ -149,59 +149,57 @@ func (sys *System) Submit( }) select { - case err := <-seq: + case err := <-submissionWait: if err == sequence.ErrBadSequence { // convert the internal only ErrBadSequence into the FailedTransactionError err = ErrBadSequence } if err != nil { - sys.finish(ctx, hash, response, Result{Err: err}) + sys.finish(ctx, hash, resultCh, Result{Err: err}) return } sr := sys.submitOnce(ctx, rawTx) sys.updateTransactionTypeMetrics(envelope) - // if submission succeeded - if sr.Err == nil { - // add transactions to open list - sys.Pending.Add(ctx, hash, response) - // update the submission queue, allowing the next submission to proceed - sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ - sourceAddress: uint64(envelope.SeqNum()), - }) - return - } + if sr.Err != nil { + // any error other than "txBAD_SEQ" is a failure + isBad, err := sr.IsBadSeq() + if err != nil { + sys.finish(ctx, hash, resultCh, Result{Err: err}) + return + } + if !isBad { + sys.finish(ctx, hash, resultCh, Result{Err: sr.Err}) + return + } - // any error other than "txBAD_SEQ" is a failure - isBad, err := sr.IsBadSeq() - if err != nil { - sys.finish(ctx, hash, response, Result{Err: err}) - return - } + if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil { + sys.finish(ctx, hash, resultCh, Result{Err: err}) + return + } - if !isBad { - sys.finish(ctx, hash, response, Result{Err: sr.Err}) - return - } - if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil { - sys.finish(ctx, hash, response, Result{Err: err}) + // If error is txBAD_SEQ, check for the result again + tx, err = txResultByHash(ctx, db, hash) + if err != nil { + // finally, return the bad_seq error if no result was found on 2nd attempt + sys.finish(ctx, hash, resultCh, Result{Err: sr.Err}) + return + } + // If we found the result, use it as the result + sys.finish(ctx, hash, resultCh, Result{Transaction: tx}) return } - // If error is txBAD_SEQ, check for the result again - tx, err = txResultByHash(ctx, db, hash) - if err == nil { - // If the found use it as the result - sys.finish(ctx, hash, response, Result{Transaction: tx}) - } else { - // finally, return the bad_seq error if no result was found on 2nd attempt - sys.finish(ctx, hash, response, Result{Err: sr.Err}) - } - + // add transactions to open list + sys.Pending.Add(ctx, hash, resultCh) + // update the submission queue, allowing the next submission to proceed + sys.SubmissionQueue.NotifyLastAccountSequences(map[string]uint64{ + sourceAddress: uint64(envelope.SeqNum()), + }) case <-ctx.Done(): - sys.finish(ctx, hash, response, Result{Err: sys.deriveTxSubError(ctx)}) + sys.finish(ctx, hash, resultCh, Result{Err: sys.deriveTxSubError(ctx)}) } return