From 0fa7d228536045bfc1dc1983d553e58aba3d32af Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Sat, 3 Feb 2024 00:13:37 -0500 Subject: [PATCH] Fix for transaction submission timeout (#5191) * Add check for ledger state in txsub * Add test for badSeq * Fix failing unittest * Update system_test.go * Small changes * Update main.go --- services/horizon/internal/init.go | 1 + ...eq_txsub_test.go => bad_seq_txsub_test.go} | 42 +++++++++ services/horizon/internal/ledger/main.go | 13 ++- .../horizon/internal/txsub/helpers_test.go | 32 +++++++ services/horizon/internal/txsub/system.go | 11 ++- .../horizon/internal/txsub/system_test.go | 93 +++++++++++++++++++ 6 files changed, 188 insertions(+), 4 deletions(-) rename services/horizon/internal/integration/{negative_seq_txsub_test.go => bad_seq_txsub_test.go} (63%) diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 4078c7ad00..d4b34f9f4d 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -235,5 +235,6 @@ func initSubmissionSystem(app *App) { DB: func(ctx context.Context) txsub.HorizonDB { return &history.Q{SessionInterface: app.HorizonSession()} }, + LedgerState: app.ledgerState, } } diff --git a/services/horizon/internal/integration/negative_seq_txsub_test.go b/services/horizon/internal/integration/bad_seq_txsub_test.go similarity index 63% rename from services/horizon/internal/integration/negative_seq_txsub_test.go rename to services/horizon/internal/integration/bad_seq_txsub_test.go index 787ad0645c..2a5f9d13fe 100644 --- a/services/horizon/internal/integration/negative_seq_txsub_test.go +++ b/services/horizon/internal/integration/bad_seq_txsub_test.go @@ -71,3 +71,45 @@ func TestNegativeSequenceTxSubmission(t *testing.T) { tt.Equal("tx_bad_seq", codes.TransactionCode) } + +func TestBadSeqTxSubmission(t *testing.T) { + tt := assert.New(t) + itest := integration.NewTest(t, integration.Config{}) + master := itest.Master() + + account := itest.MasterAccount() + seqnum, err := account.GetSequenceNumber() + tt.NoError(err) + + op2 := txnbuild.Payment{ + Destination: master.Address(), + Amount: "10", + Asset: txnbuild.NativeAsset{}, + } + + // Submit a simple payment tx, but with a gapped sequence + // that is intentionally set more than one ahead of current account seq + // this should trigger a tx_bad_seq from core + account = &txnbuild.SimpleAccount{ + AccountID: account.GetAccountID(), + Sequence: seqnum + 10, + } + txParams := txnbuild.TransactionParams{ + SourceAccount: account, + Operations: []txnbuild.Operation{&op2}, + BaseFee: txnbuild.MinBaseFee, + Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewInfiniteTimeout()}, + IncrementSequenceNum: false, + } + tx, err := txnbuild.NewTransaction(txParams) + tt.NoError(err) + tx, err = tx.Sign(integration.StandaloneNetworkPassphrase, master) + tt.NoError(err) + _, err = itest.Client().SubmitTransaction(tx) + tt.Error(err) + clientErr, ok := err.(*horizonclient.Error) + tt.True(ok) + codes, err := clientErr.ResultCodes() + tt.NoError(err) + tt.Equal("tx_bad_seq", codes.TransactionCode) +} diff --git a/services/horizon/internal/ledger/main.go b/services/horizon/internal/ledger/main.go index 1d17e09d67..2101048bad 100644 --- a/services/horizon/internal/ledger/main.go +++ b/services/horizon/internal/ledger/main.go @@ -6,10 +6,9 @@ package ledger import ( + "github.com/prometheus/client_golang/prometheus" "sync" "time" - - "github.com/prometheus/client_golang/prometheus" ) // Status represents a snapshot of both horizon's and stellar-core's view of the @@ -31,7 +30,7 @@ type HorizonStatus struct { } // State is an in-memory data structure which holds a snapshot of both -// horizon's and stellar-core's view of the the network +// horizon's and stellar-core's view of the network type State struct { sync.RWMutex current Status @@ -44,6 +43,14 @@ type State struct { } } +type StateInterface interface { + CurrentStatus() Status + SetStatus(next Status) + SetCoreStatus(next CoreStatus) + SetHorizonStatus(next HorizonStatus) + RegisterMetrics(registry *prometheus.Registry) +} + // CurrentStatus returns the cached snapshot of ledger state func (c *State) CurrentStatus() Status { c.RLock() diff --git a/services/horizon/internal/txsub/helpers_test.go b/services/horizon/internal/txsub/helpers_test.go index 0e5a63bca7..3c4cb6cb0b 100644 --- a/services/horizon/internal/txsub/helpers_test.go +++ b/services/horizon/internal/txsub/helpers_test.go @@ -9,6 +9,8 @@ package txsub import ( "context" "database/sql" + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/services/horizon/internal/ledger" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stretchr/testify/mock" @@ -72,3 +74,33 @@ func (m *mockDBQ) TransactionByHash(ctx context.Context, dest interface{}, hash args := m.Called(ctx, dest, hash) return args.Error(0) } + +type MockLedgerState struct { + mock.Mock +} + +// CurrentStatus mocks the CurrentStatus method. +func (m *MockLedgerState) CurrentStatus() ledger.Status { + args := m.Called() + return args.Get(0).(ledger.Status) +} + +// SetStatus mocks the SetStatus method. +func (m *MockLedgerState) SetStatus(next ledger.Status) { + m.Called(next) +} + +// SetCoreStatus mocks the SetCoreStatus method. +func (m *MockLedgerState) SetCoreStatus(next ledger.CoreStatus) { + m.Called(next) +} + +// SetHorizonStatus mocks the SetHorizonStatus method. +func (m *MockLedgerState) SetHorizonStatus(next ledger.HorizonStatus) { + m.Called(next) +} + +// RegisterMetrics mocks the RegisterMetrics method. +func (m *MockLedgerState) RegisterMetrics(registry *prometheus.Registry) { + m.Called(registry) +} diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 189f1619ff..31038135f3 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "github.com/stellar/go/services/horizon/internal/ledger" "sync" "time" @@ -40,6 +41,7 @@ type System struct { Submitter Submitter SubmissionTimeout time.Duration Log *log.Entry + LedgerState ledger.StateInterface Metrics struct { // SubmissionDuration exposes timing metrics about the rate and latency of @@ -190,7 +192,7 @@ func (sys *System) waitUntilAccountSequence(ctx context.Context, db HorizonDB, s WithField("sourceAddress", sourceAddress). Warn("missing sequence number for account") } - if num >= seq { + if num >= seq || sys.isSyncedUp() { return nil } } @@ -204,6 +206,13 @@ func (sys *System) waitUntilAccountSequence(ctx context.Context, db HorizonDB, s } } +// isSyncedUp Check if Horizon and Core have synced up: If yes, then no need to wait for account sequence +// and send txBAD_SEQ right away. +func (sys *System) isSyncedUp() bool { + currentStatus := sys.LedgerState.CurrentStatus() + return int(currentStatus.CoreLatest) <= int(currentStatus.HistoryLatest) +} + func (sys *System) deriveTxSubError(ctx context.Context) error { if ctx.Err() == context.Canceled { return ErrCanceled diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index 816cc28e66..b4a36fb522 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "errors" + "github.com/stellar/go/services/horizon/internal/ledger" "testing" "time" @@ -155,6 +156,17 @@ func (suite *SystemTestSuite) TestTimeoutDuringSequenceLoop() { suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}). Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil) + mockLedgerState := &MockLedgerState{} + mockLedgerState.On("CurrentStatus").Return(ledger.Status{ + CoreStatus: ledger.CoreStatus{ + CoreLatest: 3, + }, + HorizonStatus: ledger.HorizonStatus{ + HistoryLatest: 1, + }, + }).Twice() + suite.system.LedgerState = mockLedgerState + r := <-suite.system.Submit( suite.ctx, suite.successTx.Transaction.TxEnvelope, @@ -187,6 +199,17 @@ func (suite *SystemTestSuite) TestClientDisconnectedDuringSequenceLoop() { suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}). Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil) + mockLedgerState := &MockLedgerState{} + mockLedgerState.On("CurrentStatus").Return(ledger.Status{ + CoreStatus: ledger.CoreStatus{ + CoreLatest: 3, + }, + HorizonStatus: ledger.HorizonStatus{ + HistoryLatest: 1, + }, + }).Once() + suite.system.LedgerState = mockLedgerState + r := <-suite.system.Submit( suite.ctx, suite.successTx.Transaction.TxEnvelope, @@ -253,6 +276,17 @@ func (suite *SystemTestSuite) TestSubmit_BadSeq() { }). Return(nil).Once() + mockLedgerState := &MockLedgerState{} + mockLedgerState.On("CurrentStatus").Return(ledger.Status{ + CoreStatus: ledger.CoreStatus{ + CoreLatest: 3, + }, + HorizonStatus: ledger.HorizonStatus{ + HistoryLatest: 1, + }, + }).Twice() + suite.system.LedgerState = mockLedgerState + r := <-suite.system.Submit( suite.ctx, suite.successTx.Transaction.TxEnvelope, @@ -281,6 +315,64 @@ func (suite *SystemTestSuite) TestSubmit_BadSeqNotFound() { Return(map[string]uint64{suite.unmuxedSource.Address(): 1}, nil). Once() + mockLedgerState := &MockLedgerState{} + mockLedgerState.On("CurrentStatus").Return(ledger.Status{ + CoreStatus: ledger.CoreStatus{ + CoreLatest: 3, + }, + HorizonStatus: ledger.HorizonStatus{ + HistoryLatest: 1, + }, + }).Times(3) + suite.system.LedgerState = mockLedgerState + + // set poll interval to 1ms so we don't need to wait 3 seconds for the test to complete + suite.system.Init() + suite.system.accountSeqPollInterval = time.Millisecond + + r := <-suite.system.Submit( + suite.ctx, + suite.successTx.Transaction.TxEnvelope, + suite.successXDR, + suite.successTx.Transaction.TransactionHash, + ) + + assert.NotNil(suite.T(), r.Err) + assert.True(suite.T(), suite.submitter.WasSubmittedTo) +} + +// If error is bad_seq and horizon and core are in sync, then return error +func (suite *SystemTestSuite) TestSubmit_BadSeqErrorWhenInSync() { + suite.submitter.R = suite.badSeq + suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). + Return(sql.ErrNoRows).Twice() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice() + suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash). + Return(sql.ErrNoRows).Twice() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice() + suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}). + Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). + Twice() + + mockLedgerState := &MockLedgerState{} + mockLedgerState.On("CurrentStatus").Return(ledger.Status{ + CoreStatus: ledger.CoreStatus{ + CoreLatest: 3, + }, + HorizonStatus: ledger.HorizonStatus{ + HistoryLatest: 1, + }, + }).Once() + mockLedgerState.On("CurrentStatus").Return(ledger.Status{ + CoreStatus: ledger.CoreStatus{ + CoreLatest: 1, + }, + HorizonStatus: ledger.HorizonStatus{ + HistoryLatest: 1, + }, + }).Once() + suite.system.LedgerState = mockLedgerState + // set poll interval to 1ms so we don't need to wait 3 seconds for the test to complete suite.system.Init() suite.system.accountSeqPollInterval = time.Millisecond @@ -293,6 +385,7 @@ func (suite *SystemTestSuite) TestSubmit_BadSeqNotFound() { ) assert.NotNil(suite.T(), r.Err) + assert.Equal(suite.T(), r.Err.Error(), "tx failed: AAAAAAAAAAD////7AAAAAA==") // decodes to txBadSeq assert.True(suite.T(), suite.submitter.WasSubmittedTo) }