Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

horizon: Small transaction submission refactoring #4344

Merged
merged 5 commits into from
Apr 21, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 40 additions & 41 deletions services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ func (sys *System) Submit(
rawTx string,
envelope xdr.TransactionEnvelope,
hash string,
) (result <-chan Result) {
) (resultReadCh <-chan Result) {
sys.Init()
response := make(chan Result, 1)
result = response
resultCh := make(chan Result, 1)
resultReadCh = resultCh

db := sys.DB(ctx)
// The database doesn't (yet) store muxed accounts, so we query
Expand All @@ -114,25 +114,25 @@ func (sys *System) Submit(
}).Info("Processing transaction")

if envelope.SeqNum() < 0 {
sys.finish(ctx, hash, response, Result{Err: ErrBadSequence})
sys.finish(ctx, hash, resultCh, Result{Err: ErrBadSequence})
return
}

tx, sequenceNumber, err := checkTxAlreadyExists(ctx, db, hash, sourceAddress)
if err == nil {
sys.Log.Ctx(ctx).WithField("hash", hash).Info("Found submission result in a DB")
sys.finish(ctx, hash, response, Result{Transaction: tx})
sys.finish(ctx, hash, resultCh, Result{Transaction: tx})
return
}
if err != ErrNoResults {
sys.Log.Ctx(ctx).WithField("hash", hash).Info("Error getting submission result from a DB")
sys.finish(ctx, hash, response, Result{Transaction: tx, Err: err})
sys.finish(ctx, hash, resultCh, Result{Transaction: tx, Err: err})
return
}

// queue the submission and get the channel that will emit when
// submission is valid
seq := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum()))
submissionWait := sys.SubmissionQueue.Push(sourceAddress, uint64(envelope.SeqNum()))

// update the submission queue with the source accounts current sequence value
// which will cause the channel returned by Push() to emit if possible.
Expand All @@ -141,59 +141,58 @@ func (sys *System) Submit(
})

select {
case err := <-seq:
case err := <-submissionWait:
if err == sequence.ErrBadSequence {
// convert the internal only ErrBadSequence into the FailedTransactionError
err = ErrBadSequence
}

if err != nil {
sys.finish(ctx, hash, response, Result{Err: err})
sys.finish(ctx, hash, resultCh, Result{Err: err})
return
}

sr := sys.submitOnce(ctx, rawTx)
sys.updateTransactionTypeMetrics(envelope)

// if submission succeeded
if sr.Err == nil {
// add transactions to open list
sys.Pending.Add(ctx, hash, response)
// update the submission queue, allowing the next submission to proceed
sys.SubmissionQueue.Update(map[string]uint64{
sourceAddress: uint64(envelope.SeqNum()),
})
return
}
if sr.Err != nil {
// any error other than "txBAD_SEQ" is a failure
isBad, err := sr.IsBadSeq()
if err != nil {
sys.finish(ctx, hash, resultCh, Result{Err: err})
return
}
if !isBad {
sys.finish(ctx, hash, resultCh, Result{Err: sr.Err})
return
}

// any error other than "txBAD_SEQ" is a failure
isBad, err := sr.IsBadSeq()
if err != nil {
sys.finish(ctx, hash, response, Result{Err: err})
return
}
if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil {
sys.finish(ctx, hash, resultCh, Result{Err: err})
return
}

if !isBad {
sys.finish(ctx, hash, response, Result{Err: sr.Err})
return
}
if err = sys.waitUntilAccountSequence(ctx, db, sourceAddress, uint64(envelope.SeqNum())); err != nil {
sys.finish(ctx, hash, response, Result{Err: err})
// If error is txBAD_SEQ, check for the result again
tx, err = txResultByHash(ctx, db, hash)
if err == nil {
// If the found use it as the result
sys.finish(ctx, hash, resultCh, Result{Transaction: tx})
} else {
// finally, return the bad_seq error if no result was found on 2nd attempt
sys.finish(ctx, hash, resultCh, Result{Err: sr.Err})
}
return
}

// If error is txBAD_SEQ, check for the result again
tx, err = txResultByHash(ctx, db, hash)
if err == nil {
// If the found use it as the result
sys.finish(ctx, hash, response, Result{Transaction: tx})
} else {
// finally, return the bad_seq error if no result was found on 2nd attempt
sys.finish(ctx, hash, response, Result{Err: sr.Err})
}
// add transactions to open list
sys.Pending.Add(ctx, hash, resultCh)
// update the submission queue, allowing the next submission to proceed
sys.SubmissionQueue.Update(map[string]uint64{
sourceAddress: uint64(envelope.SeqNum()),
})

case <-ctx.Done():
sys.finish(ctx, hash, response, Result{Err: sys.deriveTxSubError(ctx)})
sys.finish(ctx, hash, resultCh, Result{Err: sys.deriveTxSubError(ctx)})
}

return
Expand Down