Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for transaction submission timeout #5191

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,6 @@ func initSubmissionSystem(app *App) {
DB: func(ctx context.Context) txsub.HorizonDB {
return &history.Q{SessionInterface: app.HorizonSession()}
},
LedgerState: app.ledgerState,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,45 @@ func TestNegativeSequenceTxSubmission(t *testing.T) {
tt.Equal("tx_bad_seq", codes.TransactionCode)

}

func TestBadSeqTxSubmission(t *testing.T) {
tt := assert.New(t)
itest := integration.NewTest(t, integration.Config{})
master := itest.Master()

account := itest.MasterAccount()
seqnum, err := account.GetSequenceNumber()
tt.NoError(err)

op2 := txnbuild.Payment{
Destination: master.Address(),
Amount: "10",
Asset: txnbuild.NativeAsset{},
}

// Submit a simple payment tx, but with a gapped sequence
// that is intentionally set more than one ahead of current account seq
// this should trigger a tx_bad_seq from core
account = &txnbuild.SimpleAccount{
AccountID: account.GetAccountID(),
Sequence: seqnum + 10,
}
txParams := txnbuild.TransactionParams{
SourceAccount: account,
Operations: []txnbuild.Operation{&op2},
BaseFee: txnbuild.MinBaseFee,
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewInfiniteTimeout()},
IncrementSequenceNum: false,
}
tx, err := txnbuild.NewTransaction(txParams)
tt.NoError(err)
tx, err = tx.Sign(integration.StandaloneNetworkPassphrase, master)
tt.NoError(err)
_, err = itest.Client().SubmitTransaction(tx)
tt.Error(err)
clientErr, ok := err.(*horizonclient.Error)
tt.True(ok)
codes, err := clientErr.ResultCodes()
tt.NoError(err)
tt.Equal("tx_bad_seq", codes.TransactionCode)
}
13 changes: 10 additions & 3 deletions services/horizon/internal/ledger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
package ledger

import (
"github.com/prometheus/client_golang/prometheus"

Check failure on line 9 in services/horizon/internal/ledger/main.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `goimports`-ed with -local github.com/golangci/golangci-lint (goimports)
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

// Status represents a snapshot of both horizon's and stellar-core's view of the
Expand All @@ -31,7 +30,7 @@
}

// State is an in-memory data structure which holds a snapshot of both
// horizon's and stellar-core's view of the the network
// horizon's and stellar-core's view of the network
type State struct {
sync.RWMutex
current Status
Expand All @@ -44,6 +43,14 @@
}
}

type StateInterface interface {
CurrentStatus() Status
SetStatus(next Status)
SetCoreStatus(next CoreStatus)
SetHorizonStatus(next HorizonStatus)
RegisterMetrics(registry *prometheus.Registry)
}

// CurrentStatus returns the cached snapshot of ledger state
func (c *State) CurrentStatus() Status {
c.RLock()
Expand Down
32 changes: 32 additions & 0 deletions services/horizon/internal/txsub/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package txsub
import (
"context"
"database/sql"
"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/services/horizon/internal/ledger"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -72,3 +74,33 @@ func (m *mockDBQ) TransactionByHash(ctx context.Context, dest interface{}, hash
args := m.Called(ctx, dest, hash)
return args.Error(0)
}

type MockLedgerState struct {
mock.Mock
}

// CurrentStatus mocks the CurrentStatus method.
func (m *MockLedgerState) CurrentStatus() ledger.Status {
args := m.Called()
return args.Get(0).(ledger.Status)
}

// SetStatus mocks the SetStatus method.
func (m *MockLedgerState) SetStatus(next ledger.Status) {
m.Called(next)
}

// SetCoreStatus mocks the SetCoreStatus method.
func (m *MockLedgerState) SetCoreStatus(next ledger.CoreStatus) {
m.Called(next)
}

// SetHorizonStatus mocks the SetHorizonStatus method.
func (m *MockLedgerState) SetHorizonStatus(next ledger.HorizonStatus) {
m.Called(next)
}

// RegisterMetrics mocks the RegisterMetrics method.
func (m *MockLedgerState) RegisterMetrics(registry *prometheus.Registry) {
m.Called(registry)
}
11 changes: 10 additions & 1 deletion services/horizon/internal/txsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"database/sql"
"fmt"
"github.com/stellar/go/services/horizon/internal/ledger"

Check failure on line 7 in services/horizon/internal/txsub/system.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `goimports`-ed with -local github.com/golangci/golangci-lint (goimports)
"sync"
"time"

Expand Down Expand Up @@ -40,6 +41,7 @@
Submitter Submitter
SubmissionTimeout time.Duration
Log *log.Entry
LedgerState ledger.StateInterface

Metrics struct {
// SubmissionDuration exposes timing metrics about the rate and latency of
Expand Down Expand Up @@ -190,7 +192,7 @@
WithField("sourceAddress", sourceAddress).
Warn("missing sequence number for account")
}
if num >= seq {
if num >= seq || sys.isSyncedUp() {
return nil
}
}
Expand All @@ -204,6 +206,13 @@
}
}

// isSyncedUp Check if Horizon and Core have synced up: If yes, then no need to wait for account sequence
// and send txBAD_SEQ right away.
func (sys *System) isSyncedUp() bool {
currentStatus := sys.LedgerState.CurrentStatus()
return int(currentStatus.CoreLatest) <= int(currentStatus.HistoryLatest)
}

func (sys *System) deriveTxSubError(ctx context.Context) error {
if ctx.Err() == context.Canceled {
return ErrCanceled
Expand Down
93 changes: 93 additions & 0 deletions services/horizon/internal/txsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"context"
"database/sql"
"errors"
"github.com/stellar/go/services/horizon/internal/ledger"

Check failure on line 9 in services/horizon/internal/txsub/system_test.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `goimports`-ed with -local github.com/golangci/golangci-lint (goimports)
"testing"
"time"

Expand Down Expand Up @@ -155,6 +156,17 @@
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil)

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Twice()
suite.system.LedgerState = mockLedgerState

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
Expand Down Expand Up @@ -187,6 +199,17 @@
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil)

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Once()
suite.system.LedgerState = mockLedgerState

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
Expand Down Expand Up @@ -253,6 +276,17 @@
}).
Return(nil).Once()

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Twice()
suite.system.LedgerState = mockLedgerState

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
Expand Down Expand Up @@ -281,6 +315,64 @@
Return(map[string]uint64{suite.unmuxedSource.Address(): 1}, nil).
Once()

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Times(3)
suite.system.LedgerState = mockLedgerState

// set poll interval to 1ms so we don't need to wait 3 seconds for the test to complete
suite.system.Init()
suite.system.accountSeqPollInterval = time.Millisecond

r := <-suite.system.Submit(
suite.ctx,
suite.successTx.Transaction.TxEnvelope,
suite.successXDR,
suite.successTx.Transaction.TransactionHash,
)

assert.NotNil(suite.T(), r.Err)
assert.True(suite.T(), suite.submitter.WasSubmittedTo)
}

// If error is bad_seq and horizon and core are in sync, then return error
func (suite *SystemTestSuite) TestSubmit_BadSeqErrorWhenInSync() {
suite.submitter.R = suite.badSeq
suite.db.On("PreFilteredTransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Return(sql.ErrNoRows).Twice()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice()
suite.db.On("TransactionByHash", suite.ctx, mock.Anything, suite.successTx.Transaction.TransactionHash).
Return(sql.ErrNoRows).Twice()
suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice()
suite.db.On("GetSequenceNumbers", suite.ctx, []string{suite.unmuxedSource.Address()}).
Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil).
Twice()

mockLedgerState := &MockLedgerState{}
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 3,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Once()
mockLedgerState.On("CurrentStatus").Return(ledger.Status{
CoreStatus: ledger.CoreStatus{
CoreLatest: 1,
},
HorizonStatus: ledger.HorizonStatus{
HistoryLatest: 1,
},
}).Once()
suite.system.LedgerState = mockLedgerState

// set poll interval to 1ms so we don't need to wait 3 seconds for the test to complete
suite.system.Init()
suite.system.accountSeqPollInterval = time.Millisecond
Expand All @@ -293,6 +385,7 @@
)

assert.NotNil(suite.T(), r.Err)
assert.Equal(suite.T(), r.Err.Error(), "tx failed: AAAAAAAAAAD////7AAAAAA==") // decodes to txBadSeq
assert.True(suite.T(), suite.submitter.WasSubmittedTo)
}

Expand Down
Loading