Skip to content

Commit

Permalink
Fix for transaction submission timeout (stellar#5191)
Browse files Browse the repository at this point in the history
* Add check for ledger state in txsub

* Add test for badSeq

* Fix failing unittest

* Update system_test.go

* Small changes

* Update main.go
  • Loading branch information
aditya1702 authored and sreuland committed Feb 5, 2024
1 parent 2df2810 commit 2ea3c3d
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 4 deletions.
1 change: 1 addition & 0 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,6 @@ func initSubmissionSystem(app *App) {
DB: func(ctx context.Context) txsub.HorizonDB {
return &history.Q{SessionInterface: app.HorizonSession()}
},
LedgerState: app.ledgerState,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,45 @@ func TestNegativeSequenceTxSubmission(t *testing.T) {
tt.Equal("tx_bad_seq", codes.TransactionCode)

}

func TestBadSeqTxSubmission(t *testing.T) {
tt := assert.New(t)
itest := integration.NewTest(t, integration.Config{})
master := itest.Master()

account := itest.MasterAccount()
seqnum, err := account.GetSequenceNumber()
tt.NoError(err)

op2 := txnbuild.Payment{
Destination: master.Address(),
Amount: "10",
Asset: txnbuild.NativeAsset{},
}

// Submit a simple payment tx, but with a gapped sequence
// that is intentionally set more than one ahead of current account seq
// this should trigger a tx_bad_seq from core
account = &txnbuild.SimpleAccount{
AccountID: account.GetAccountID(),
Sequence: seqnum + 10,
}
txParams := txnbuild.TransactionParams{
SourceAccount: account,
Operations: []txnbuild.Operation{&op2},
BaseFee: txnbuild.MinBaseFee,
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewInfiniteTimeout()},
IncrementSequenceNum: false,
}
tx, err := txnbuild.NewTransaction(txParams)
tt.NoError(err)
tx, err = tx.Sign(integration.StandaloneNetworkPassphrase, master)
tt.NoError(err)
_, err = itest.Client().SubmitTransaction(tx)
tt.Error(err)
clientErr, ok := err.(*horizonclient.Error)
tt.True(ok)
codes, err := clientErr.ResultCodes()
tt.NoError(err)
tt.Equal("tx_bad_seq", codes.TransactionCode)
}
13 changes: 10 additions & 3 deletions services/horizon/internal/ledger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
package ledger

import (
"github.com/prometheus/client_golang/prometheus"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

// Status represents a snapshot of both horizon's and stellar-core's view of the
Expand All @@ -31,7 +30,7 @@ type HorizonStatus struct {
}

// State is an in-memory data structure which holds a snapshot of both
// horizon's and stellar-core's view of the the network
// horizon's and stellar-core's view of the network
type State struct {
sync.RWMutex
current Status
Expand All @@ -44,6 +43,14 @@ type State struct {
}
}

type StateInterface interface {
CurrentStatus() Status
SetStatus(next Status)
SetCoreStatus(next CoreStatus)
SetHorizonStatus(next HorizonStatus)
RegisterMetrics(registry *prometheus.Registry)
}

// CurrentStatus returns the cached snapshot of ledger state
func (c *State) CurrentStatus() Status {
c.RLock()
Expand Down
32 changes: 32 additions & 0 deletions services/horizon/internal/txsub/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package txsub
import (
"context"
"database/sql"
"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/services/horizon/internal/ledger"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -72,3 +74,33 @@ func (m *mockDBQ) TransactionByHash(ctx context.Context, dest interface{}, hash
args := m.Called(ctx, dest, hash)
return args.Error(0)
}

type MockLedgerState struct {
mock.Mock
}

// CurrentStatus mocks the CurrentStatus method.
func (m *MockLedgerState) CurrentStatus() ledger.Status {
args := m.Called()
return args.Get(0).(ledger.Status)
}

// SetStatus mocks the SetStatus method.
func (m *MockLedgerState) SetStatus(next ledger.Status) {
m.Called(next)
}

// SetCoreStatus mocks the SetCoreStatus method.
func (m *MockLedgerState) SetCoreStatus(next ledger.CoreStatus) {
m.Called(next)
}

// SetHorizonStatus mocks the SetHorizonStatus method.
func (m *MockLedgerState) SetHorizonStatus(next ledger.HorizonStatus) {
m.Called(next)
}

// RegisterMetrics mocks the RegisterMetrics method.
func (m *MockLedgerState) RegisterMetrics(registry *prometheus.Registry) {
m.Called(registry)
}
11 changes: 10 additions & 1 deletion services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"github.com/stellar/go/services/horizon/internal/ledger"
"sync"
"time"

Expand Down Expand Up @@ -40,6 +41,7 @@ type System struct {
Submitter Submitter
SubmissionTimeout time.Duration
Log *log.Entry
LedgerState ledger.StateInterface

Metrics struct {
// SubmissionDuration exposes timing metrics about the rate and latency of
Expand Down Expand Up @@ -190,7 +192,7 @@ func (sys *System) waitUntilAccountSequence(ctx context.Context, db HorizonDB, s
WithField("sourceAddress", sourceAddress).
Warn("missing sequence number for account")
}
if num >= seq {
if num >= seq || sys.isSyncedUp() {
return nil
}
}
Expand All @@ -204,6 +206,13 @@ func (sys *System) waitUntilAccountSequence(ctx context.Context, db HorizonDB, s
}
}

// isSyncedUp Check if Horizon and Core have synced up: If yes, then no need to wait for account sequence
// and send txBAD_SEQ right away.
func (sys *System) isSyncedUp() bool {
currentStatus := sys.LedgerState.CurrentStatus()
return int(currentStatus.CoreLatest) <= int(currentStatus.HistoryLatest)
}

func (sys *System) deriveTxSubError(ctx context.Context) error {
if ctx.Err() == context.Canceled {
return ErrCanceled
Expand Down
93 changes: 93 additions & 0 deletions services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql"
"errors"
"github.com/stellar/go/services/horizon/internal/ledger"
"testing"
"time"

Expand Down Expand Up @@ -155,6 +156,17 @@ func (suite *SystemTestSuite) TestTimeoutDuringSequenceLoop() {
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil)

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Twice()
suite.system.LedgerState = mockLedgerState

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
Expand Down Expand Up @@ -187,6 +199,17 @@ func (suite *SystemTestSuite) TestClientDisconnectedDuringSequenceLoop() {
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil)

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Once()
suite.system.LedgerState = mockLedgerState

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
Expand Down Expand Up @@ -253,6 +276,17 @@ func (suite *SystemTestSuite) TestSubmit_BadSeq() {
}).
Return(nil).Once()

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Twice()
suite.system.LedgerState = mockLedgerState

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
Expand Down Expand Up @@ -281,6 +315,64 @@ func (suite *SystemTestSuite) TestSubmit_BadSeqNotFound() {
Return(map[string]uint64{suite.unmuxedSource.Address(): 1}, nil).
Once()

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Times(3)
suite.system.LedgerState = mockLedgerState

// set poll interval to 1ms so we don't need to wait 3 seconds for the test to complete
suite.system.Init()
suite.system.accountSeqPollInterval = time.Millisecond

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
suite.successXDR,
suite.successTx.Transaction.TransactionHash,
)

assert.NotNil(suite.T(), r.Err)
assert.True(suite.T(), suite.submitter.WasSubmittedTo)
}

// If error is bad_seq and horizon and core are in sync, then return error
func (suite *SystemTestSuite) TestSubmit_BadSeqErrorWhenInSync() {
suite.submitter.R = suite.badSeq
suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Return(sql.ErrNoRows).Twice()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Return(sql.ErrNoRows).Twice()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice()
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil).
Twice()

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Once()
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 1,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Once()
suite.system.LedgerState = mockLedgerState

// set poll interval to 1ms so we don't need to wait 3 seconds for the test to complete
suite.system.Init()
suite.system.accountSeqPollInterval = time.Millisecond
Expand All @@ -293,6 +385,7 @@ func (suite *SystemTestSuite) TestSubmit_BadSeqNotFound() {
)

assert.NotNil(suite.T(), r.Err)
assert.Equal(suite.T(), r.Err.Error(), "tx failed: AAAAAAAAAAD////7AAAAAA==") // decodes to txBadSeq
assert.True(suite.T(), suite.submitter.WasSubmittedTo)
}

Expand Down

0 comments on commit 2ea3c3d

Please sign in to comment.