From 06a6c608a90a1cf3a0a152c059f45e6dba6300ff Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Fri, 10 Jul 2020 19:25:58 +0200 Subject: [PATCH 1/4] Check transaction submission results using repeatable read transaction --- .../internal/actions_transaction_test.go | 11 - services/horizon/internal/init.go | 6 +- .../horizon/internal/txsub/helpers_test.go | 54 +++++ services/horizon/internal/txsub/main.go | 17 -- services/horizon/internal/txsub/results.go | 65 ++++++ .../horizon/internal/txsub/results/db/main.go | 49 ---- .../internal/txsub/results/db/main_test.go | 66 ------ .../horizon/internal/txsub/results_test.go | 38 ++++ services/horizon/internal/txsub/system.go | 92 ++++---- .../horizon/internal/txsub/system_test.go | 213 +++++++++++++----- .../horizon/internal/txsub/test_helpers.go | 59 ----- 11 files changed, 366 insertions(+), 304 deletions(-) create mode 100644 services/horizon/internal/txsub/helpers_test.go create mode 100644 services/horizon/internal/txsub/results.go delete mode 100644 services/horizon/internal/txsub/results/db/main.go delete mode 100644 services/horizon/internal/txsub/results/db/main_test.go create mode 100644 services/horizon/internal/txsub/results_test.go delete mode 100644 services/horizon/internal/txsub/test_helpers.go diff --git a/services/horizon/internal/actions_transaction_test.go b/services/horizon/internal/actions_transaction_test.go index c14ba6b90e..913e0ee6c3 100644 --- a/services/horizon/internal/actions_transaction_test.go +++ b/services/horizon/internal/actions_transaction_test.go @@ -11,8 +11,6 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/expingest" "github.com/stellar/go/services/horizon/internal/test" - "github.com/stellar/go/services/horizon/internal/txsub" - "github.com/stellar/go/services/horizon/internal/txsub/sequence" "github.com/stellar/go/xdr" ) @@ -277,15 +275,6 @@ func TestTransactionActions_Post(t *testing.T) { // existing transaction w := ht.Post("/transactions", form) ht.Assert.Equal(200, w.Code) - - // sequence buffer full - ht.App.submitter.Results = &txsub.MockResultProvider{ - Results: []txsub.Result{ - {Err: sequence.ErrNoMoreRoom}, - }, - } - w = ht.Post("/transactions", form) - ht.Assert.Equal(503, w.Code) } func TestTransactionActions_PostSuccessful(t *testing.T) { diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index b312cbec0a..2b3235bfa7 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -12,7 +12,6 @@ import ( "github.com/stellar/go/services/horizon/internal/expingest" "github.com/stellar/go/services/horizon/internal/simplepath" "github.com/stellar/go/services/horizon/internal/txsub" - results "github.com/stellar/go/services/horizon/internal/txsub/results/db" "github.com/stellar/go/services/horizon/internal/txsub/sequence" "github.com/stellar/go/support/db" "github.com/stellar/go/support/log" @@ -199,9 +198,8 @@ func initSubmissionSystem(app *App) { Pending: txsub.NewDefaultSubmissionList(), Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL), SubmissionQueue: sequence.NewManager(), - Results: &results.DB{ - History: &history.Q{Session: app.HorizonSession(context.Background())}, + DB: func(ctx context.Context) txsub.HorizonDB { + return &history.Q{Session: app.HorizonSession(ctx)} }, - Sequences: &history.Q{Session: app.HorizonSession(context.Background())}, } } diff --git a/services/horizon/internal/txsub/helpers_test.go b/services/horizon/internal/txsub/helpers_test.go new file mode 100644 index 0000000000..3483ca28cb --- /dev/null +++ b/services/horizon/internal/txsub/helpers_test.go @@ -0,0 +1,54 @@ +package txsub + +// This file provides mock implementations for the txsub interfaces +// which are useful in a testing context. +// +// NOTE: this file is not a test file so that other packages may import +// txsub and use these mocks in their own tests + +import ( + "context" + "database/sql" + "github.com/stretchr/testify/mock" +) + +// MockSubmitter is a test helper that simplements the Submitter interface +type MockSubmitter struct { + R SubmissionResult + WasSubmittedTo bool +} + +// Submit implements `txsub.Submitter` +func (sub *MockSubmitter) Submit(ctx context.Context, env string) SubmissionResult { + sub.WasSubmittedTo = true + return sub.R +} + +type mockDBQ struct { + mock.Mock +} + +func (m *mockDBQ) BeginTx(txOpts *sql.TxOptions) error { + args := m.Called(txOpts) + return args.Error(0) +} + +func (m *mockDBQ) Rollback() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockDBQ) NoRows(err error) bool { + args := m.Called(err) + return args.Bool(0) +} + +func (m *mockDBQ) GetSequenceNumbers(addresses []string) (map[string]uint64, error) { + args := m.Called(addresses) + return args.Get(0).(map[string]uint64), args.Error(1) +} + +func (m *mockDBQ) TransactionByHash(dest interface{}, hash string) error { + args := m.Called(dest, hash) + return args.Error(0) +} diff --git a/services/horizon/internal/txsub/main.go b/services/horizon/internal/txsub/main.go index 47613628db..ede2df8dd8 100644 --- a/services/horizon/internal/txsub/main.go +++ b/services/horizon/internal/txsub/main.go @@ -9,23 +9,6 @@ import ( "github.com/stellar/go/xdr" ) -// ResultProvider represents an abstract store that can lookup Result objects -// by transaction hash or by [address,sequence] pairs. A ResultProvider is -// used within the transaction submission system to decide whether a submission should -// be submitted to the backing stellar-core process, as well as looking up the status -// of each transaction in the open submission list at each tick (i.e. ledger close) -type ResultProvider interface { - // Look up a result by transaction hash - ResultByHash(context.Context, string) Result -} - -// SequenceProvider represents an abstract store that can lookup the current -// sequence number of an account. It is used by the SequenceLock to -type SequenceProvider interface { - // Look up a sequence by address - GetSequenceNumbers(addresses []string) (map[string]uint64, error) -} - // Listener represents some client who is interested in retrieving the result // of a specific transaction. type Listener chan<- Result diff --git a/services/horizon/internal/txsub/results.go b/services/horizon/internal/txsub/results.go new file mode 100644 index 0000000000..7b25ea2061 --- /dev/null +++ b/services/horizon/internal/txsub/results.go @@ -0,0 +1,65 @@ +package txsub + +import ( + "database/sql" + + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/xdr" +) + +func txResultByHash(db HorizonDB, hash string) (history.Transaction, error) { + // query history database + var hr history.Transaction + err := db.TransactionByHash(&hr, hash) + if err == nil { + return txResultFromHistory(hr) + } + + if !db.NoRows(err) { + return hr, err + } + + // if no result was found in either db, return ErrNoResults + return hr, ErrNoResults +} + +func txResultFromHistory(tx history.Transaction) (history.Transaction, error) { + var txResult xdr.TransactionResult + err := xdr.SafeUnmarshalBase64(tx.TxResult, &txResult) + if err == nil { + if !txResult.Successful() { + err = &FailedTransactionError{ + ResultXDR: tx.TxResult, + } + } + } + + return tx, err +} + +func checkTxAlreadyExists(db HorizonDB, hash, sourceAddress string) (history.Transaction, uint64, error) { + err := db.BeginTx(&sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }) + if err != nil { + return history.Transaction{}, 0, err + } + defer db.Rollback() + + tx, err := txResultByHash(db, hash) + if err == ErrNoResults { + var sequenceNumbers map[string]uint64 + sequenceNumbers, err = db.GetSequenceNumbers([]string{sourceAddress}) + if err != nil { + return tx, 0, err + } + + num, ok := sequenceNumbers[sourceAddress] + if !ok { + return tx, 0, ErrNoAccount + } + return tx, num, ErrNoResults + } + return tx, 0, err +} diff --git a/services/horizon/internal/txsub/results/db/main.go b/services/horizon/internal/txsub/results/db/main.go deleted file mode 100644 index ea34811628..0000000000 --- a/services/horizon/internal/txsub/results/db/main.go +++ /dev/null @@ -1,49 +0,0 @@ -// Package results provides an implementation of the txsub.ResultProvider interface -// backed using the SQL databases used by both stellar core and horizon -package results - -import ( - "context" - "github.com/stellar/go/services/horizon/internal/db2/history" - "github.com/stellar/go/services/horizon/internal/txsub" - "github.com/stellar/go/xdr" -) - -// DB provides transaction submission results by querying the -// connected horizon and, if set, stellar core databases. -type DB struct { - History *history.Q -} - -var _ txsub.ResultProvider = &DB{} - -// ResultByHash implements txsub.ResultProvider -func (rp *DB) ResultByHash(ctx context.Context, hash string) txsub.Result { - // query history database - var hr history.Transaction - err := rp.History.TransactionByHash(&hr, hash) - if err == nil { - return txResultFromHistory(hr) - } - - if !rp.History.NoRows(err) { - return txsub.Result{Err: err} - } - - // if no result was found in either db, return ErrNoResults - return txsub.Result{Err: txsub.ErrNoResults} -} - -func txResultFromHistory(tx history.Transaction) txsub.Result { - var txResult xdr.TransactionResult - err := xdr.SafeUnmarshalBase64(tx.TxResult, &txResult) - if err == nil { - if !txResult.Successful() { - err = &txsub.FailedTransactionError{ - ResultXDR: tx.TxResult, - } - } - } - - return txsub.Result{Err: err, Transaction: tx} -} diff --git a/services/horizon/internal/txsub/results/db/main_test.go b/services/horizon/internal/txsub/results/db/main_test.go deleted file mode 100644 index 91d5eaf459..0000000000 --- a/services/horizon/internal/txsub/results/db/main_test.go +++ /dev/null @@ -1,66 +0,0 @@ -package results - -import ( - "testing" - - "github.com/stellar/go/services/horizon/internal/db2/history" - "github.com/stellar/go/services/horizon/internal/test" - "github.com/stellar/go/services/horizon/internal/txsub" -) - -func TestResultProvider(t *testing.T) { - tt := test.Start(t).Scenario("base") - defer tt.Finish() - - rp := &DB{ - History: &history.Q{Session: tt.HorizonSession()}, - } - - // Regression: ensure a transaction that is not ingested still returns the - // result - hash := "2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d" - ret := rp.ResultByHash(tt.Ctx, hash) - - tt.Require.NoError(ret.Err) - tt.Assert.Equal(hash, ret.Transaction.TransactionHash) -} - -func TestResultProviderHorizonOnly(t *testing.T) { - tt := test.Start(t).Scenario("base") - defer tt.Finish() - - rp := &DB{ - History: &history.Q{Session: tt.HorizonSession()}, - } - - hash := "adf1efb9fd253f53cbbe6230c131d2af19830328e52b610464652d67d2fb7195" - _, err := tt.CoreSession().ExecRaw("INSERT INTO txhistory VALUES ('" + hash + "', 5, 1, 'AAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3AAAAZAAAAAAAAAABAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAArqN6LeOagjxMaUP96Bzfs9e0corNZXzBWJkFoK7kvkwAAAAAO5rKAAAAAAAAAAABVvwF9wAAAECDzqvkQBQoNAJifPRXDoLhvtycT3lFPCQ51gkdsFHaBNWw05S/VhW0Xgkr0CBPE4NaFV2Kmcs3ZwLmib4TRrML', 'I3Tpk0m57326ml2zM5t4/ajzR3exrzO6RorVwN+UbU0AAAAAAAAAZAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAA==', 'AAAAAQAAAAAAAAABAAAAAwAAAAMAAAACAAAAAAAAAABi/B0L0JGythwN1lY0aypo19NHxvLCyO5tBEcCVvwF9w3gtrOnY/7UAAAAAAAAAAMAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAEAAAACAAAAAAAAAABi/B0L0JGythwN1lY0aypo19NHxvLCyO5tBEcCVvwF9w3gtrNryTTUAAAAAAAAAAMAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAACuo3ot45qCPExpQ/3oHN+z17Ryis1lfMFYmQWgruS+TAAAAAA7msoAAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAA==');") - tt.Require.NoError(err) - - ret := rp.ResultByHash(tt.Ctx, hash) - - tt.Require.Equal(ret.Err, txsub.ErrNoResults) -} - -func TestResultFailed(t *testing.T) { - tt := test.Start(t).Scenario("failed_transactions") - defer tt.Finish() - - rp := &DB{ - History: &history.Q{Session: tt.HorizonSession()}, - } - - hash := "aa168f12124b7c196c0adaee7c73a64d37f99428cacb59a91ff389626845e7cf" - - // Ignore core db results - _, err := tt.CoreSession().ExecRaw( - `DELETE FROM txhistory WHERE txid = ?`, - hash, - ) - tt.Require.NoError(err) - - ret := rp.ResultByHash(tt.Ctx, hash) - - tt.Require.Error(ret.Err) - tt.Assert.Equal("AAAAAAAAAGT/////AAAAAQAAAAAAAAAB/////gAAAAA=", ret.Err.(*txsub.FailedTransactionError).ResultXDR) -} diff --git a/services/horizon/internal/txsub/results_test.go b/services/horizon/internal/txsub/results_test.go new file mode 100644 index 0000000000..defa22504b --- /dev/null +++ b/services/horizon/internal/txsub/results_test.go @@ -0,0 +1,38 @@ +package txsub + +import ( + "testing" + + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/services/horizon/internal/test" +) + +func TestGetIngestedTx(t *testing.T) { + tt := test.Start(t).Scenario("base") + defer tt.Finish() + q := &history.Q{Session: tt.HorizonSession()} + hash := "2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d" + tx, err := txResultByHash(q, hash) + tt.Assert.NoError(err) + tt.Assert.Equal(hash, tx.TransactionHash) +} + +func TestGetMissingTx(t *testing.T) { + tt := test.Start(t).Scenario("base") + defer tt.Finish() + q := &history.Q{Session: tt.HorizonSession()} + hash := "adf1efb9fd253f53cbbe6230c131d2af19830328e52b610464652d67d2fb7195" + + _, err := txResultByHash(q, hash) + tt.Assert.Equal(ErrNoResults, err) +} + +func TestGetFailedTx(t *testing.T) { + tt := test.Start(t).Scenario("failed_transactions") + defer tt.Finish() + q := &history.Q{Session: tt.HorizonSession()} + hash := "aa168f12124b7c196c0adaee7c73a64d37f99428cacb59a91ff389626845e7cf" + + _, err := txResultByHash(q, hash) + tt.Assert.Equal("AAAAAAAAAGT/////AAAAAQAAAAAAAAAB/////gAAAAA=", err.(*FailedTransactionError).ResultXDR) +} diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 98b3051168..681ee444b6 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -2,16 +2,25 @@ package txsub import ( "context" + "database/sql" "fmt" - "github.com/stellar/go/xdr" "sync" "time" "github.com/rcrowley/go-metrics" "github.com/stellar/go/services/horizon/internal/txsub/sequence" "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" ) +type HorizonDB interface { + TransactionByHash(dest interface{}, hash string) error + GetSequenceNumbers(addresses []string) (map[string]uint64, error) + BeginTx(*sql.TxOptions) error + Rollback() error + NoRows(error) bool +} + // System represents a completely configured transaction submission system. // Its methods tie together the various pieces used to reliably submit transactions // to a stellar-core instance. @@ -21,9 +30,8 @@ type System struct { tickMutex sync.Mutex tickInProgress bool + DB func(context.Context) HorizonDB Pending OpenSubmissionList - Results ResultProvider - Sequences SequenceProvider Submitter Submitter SubmissionQueue *sequence.Manager SubmissionTimeout time.Duration @@ -76,43 +84,27 @@ func (sys *System) Submit( response := make(chan Result, 1) result = response + db := sys.DB(ctx) + // The database doesn't (yet) store muxed accounts, so we query + // the corresponding AccountId + sourceAccount := envelope.SourceAccount().ToAccountId() + sourceAddress := sourceAccount.Address() + sys.Log.Ctx(ctx).WithFields(log.F{ "hash": hash, "tx_type": envelope.Type.String(), "tx": rawTx, }).Info("Processing transaction") - // check the configured result provider for an existing result - r := sys.Results.ResultByHash(ctx, hash) - - if r.Err == nil { + tx, sequenceNumber, err := checkTxAlreadyExists(db, hash, sourceAddress) + if err == nil { sys.Log.Ctx(ctx).WithField("hash", hash).Info("Found submission result in a DB") - sys.finish(ctx, hash, response, r) + sys.finish(ctx, hash, response, Result{Transaction: tx}) return } - - if r.Err != ErrNoResults { + if err != ErrNoResults { sys.Log.Ctx(ctx).WithField("hash", hash).Info("Error getting submission result from a DB") - sys.finish(ctx, hash, response, r) - return - } - - // From now: r.Err == ErrNoResults - sourceAccount := envelope.SourceAccount() - // The database doesn't (yet) store muxed accounts, so we query - // the corresponding AccountId - accid := sourceAccount.ToAccountId() - sourceAddress := accid.Address() - curSeq, err := sys.Sequences.GetSequenceNumbers([]string{sourceAddress}) - if err != nil { - sys.finish(ctx, hash, response, Result{Err: err}) - return - } - - // If account's sequence cannot be found, abort with tx_NO_ACCOUNT - // error code - if _, ok := curSeq[sourceAddress]; !ok { - sys.finish(ctx, hash, response, Result{Err: ErrNoAccount}) + sys.finish(ctx, hash, response, Result{Transaction: tx, Err: err}) return } @@ -122,7 +114,9 @@ func (sys *System) Submit( // update the submission queue with the source accounts current sequence value // which will cause the channel returned by Push() to emit if possible. - sys.SubmissionQueue.Update(curSeq) + sys.SubmissionQueue.Update(map[string]uint64{ + sourceAddress: sequenceNumber, + }) select { case err := <-seq: @@ -163,11 +157,10 @@ func (sys *System) Submit( } // If error is txBAD_SEQ, check for the result again - r = sys.Results.ResultByHash(ctx, hash) - - if r.Err == nil { + tx, err = txResultByHash(db, hash) + if err == nil { // If the found use it as the result - sys.finish(ctx, hash, response, r) + sys.finish(ctx, hash, response, Result{Transaction: tx}) } else { // finally, return the bad_seq error if no result was found on 2nd attempt sys.finish(ctx, hash, response, Result{Err: sr.Err}) @@ -247,9 +240,20 @@ func (sys *System) Tick(ctx context.Context) { WithField("queued", sys.SubmissionQueue.String()). Debug("ticking txsub system") + db := sys.DB(ctx) + options := &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + } + if err := db.BeginTx(options); err != nil { + logger.WithError(err).Warn("could not start repeatable read transaction for txsub tick") + return + } + defer db.Rollback() + addys := sys.SubmissionQueue.Addresses() if len(addys) > 0 { - curSeq, err := sys.Sequences.GetSequenceNumbers(addys) + curSeq, err := db.GetSequenceNumbers(addys) if err != nil { logger.WithStack(err).Error(err) return @@ -259,24 +263,22 @@ func (sys *System) Tick(ctx context.Context) { } for _, hash := range sys.Pending.Pending(ctx) { - r := sys.Results.ResultByHash(ctx, hash) + tx, err := txResultByHash(db, hash) - if r.Err == nil { + if err == nil { logger.WithField("hash", hash).Debug("finishing open submission") - sys.Pending.Finish(ctx, hash, r) + sys.Pending.Finish(ctx, hash, Result{Transaction: tx}) continue } - _, ok := r.Err.(*FailedTransactionError) - - if ok { + if _, ok := err.(*FailedTransactionError); ok { logger.WithField("hash", hash).Debug("finishing open submission") - sys.Pending.Finish(ctx, hash, r) + sys.Pending.Finish(ctx, hash, Result{Transaction: tx, Err: err}) continue } - if r.Err != ErrNoResults { - logger.WithStack(r.Err).Error(r.Err) + if err != ErrNoResults { + logger.WithStack(err).Error(err) } } diff --git a/services/horizon/internal/txsub/system_test.go b/services/horizon/internal/txsub/system_test.go index 64c94d51a1..c298972c31 100644 --- a/services/horizon/internal/txsub/system_test.go +++ b/services/horizon/internal/txsub/system_test.go @@ -2,14 +2,15 @@ package txsub import ( "context" + "database/sql" "errors" - "fmt" "testing" "time" "github.com/guregu/null" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -22,8 +23,7 @@ type SystemTestSuite struct { suite.Suite ctx context.Context submitter *MockSubmitter - results *MockResultProvider - sequences *MockSequenceProvider + db *mockDBQ system *System noResults Result successTx Result @@ -35,18 +35,15 @@ type SystemTestSuite struct { func (suite *SystemTestSuite) SetupTest() { suite.ctx = test.Context() suite.submitter = &MockSubmitter{} - suite.results = &MockResultProvider{ - Results: []Result{}, - ResultForInnerHash: map[string]Result{}, - } - suite.sequences = &MockSequenceProvider{} + suite.db = &mockDBQ{} suite.system = &System{ Pending: NewDefaultSubmissionList(), Submitter: suite.submitter, - Results: suite.results, - Sequences: suite.sequences, SubmissionQueue: sequence.NewManager(), + DB: func(ctx context.Context) HorizonDB { + return suite.db + }, } suite.unmuxedSource = xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") @@ -85,6 +82,16 @@ func (suite *SystemTestSuite) SetupTest() { }, } + result := xdr.TransactionResult{ + FeeCharged: 123, + Result: xdr.TransactionResultResult{ + Code: xdr.TransactionResultCodeTxSuccess, + Results: &[]xdr.OperationResult{}, + }, + } + resultXDR, err := xdr.MarshalBase64(result) + suite.Assert().NoError(err) + suite.noResults = Result{Err: ErrNoResults} envelopeBase64, _ := xdr.MarshalBase64(tx) suite.successTx = Result{ @@ -93,7 +100,7 @@ func (suite *SystemTestSuite) SetupTest() { TransactionHash: "2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d", LedgerSequence: 2, TxEnvelope: envelopeBase64, - TxResult: "I3Tpk0m57326ml2zM5t4/ajzR3exrzO6RorVwN+UbU0AAAAAAAAAZAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAA==", + TxResult: resultXDR, }, }, } @@ -102,15 +109,27 @@ func (suite *SystemTestSuite) SetupTest() { suite.badSeq = SubmissionResult{ Err: ErrBadSequence, } +} - suite.sequences.On("GetSequenceNumbers", []string{suite.unmuxedSource.Address()}). - Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). - Once() +func (suite *SystemTestSuite) TearDownTest() { + t := suite.T() + suite.db.AssertExpectations(t) } // Returns the result provided by the ResultProvider. func (suite *SystemTestSuite) TestSubmit_Basic() { - suite.results.Results = []Result{suite.successTx} + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Run(func(args mock.Arguments) { + ptr := args.Get(0).(*history.Transaction) + *ptr = suite.successTx.Transaction + }). + Return(nil).Once() + r := <-suite.system.Submit( suite.ctx, suite.successTx.Transaction.TxEnvelope, @@ -125,6 +144,18 @@ func (suite *SystemTestSuite) TestSubmit_Basic() { // Returns the error from submission if no result is found by hash and the suite.submitter returns an error. func (suite *SystemTestSuite) TestSubmit_NotFoundError() { + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Return(sql.ErrNoRows).Once() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() + suite.db.On("GetSequenceNumbers", []string{suite.unmuxedSource.Address()}). + Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). + Once() + suite.submitter.R.Err = errors.New("busted for some reason") r := <-suite.system.Submit( suite.ctx, @@ -143,7 +174,23 @@ func (suite *SystemTestSuite) TestSubmit_NotFoundError() { // If the error is bad_seq and the result at the transaction's sequence number is for the same hash, return result. func (suite *SystemTestSuite) TestSubmit_BadSeq() { suite.submitter.R = suite.badSeq - suite.results.Results = []Result{suite.noResults, suite.successTx} + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Return(sql.ErrNoRows).Once() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() + suite.db.On("GetSequenceNumbers", []string{suite.unmuxedSource.Address()}). + Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). + Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Run(func(args mock.Arguments) { + ptr := args.Get(0).(*history.Transaction) + *ptr = suite.successTx.Transaction + }). + Return(nil).Once() r := <-suite.system.Submit( suite.ctx, @@ -160,7 +207,17 @@ func (suite *SystemTestSuite) TestSubmit_BadSeq() { // If error is bad_seq and no result is found, return error. func (suite *SystemTestSuite) TestSubmit_BadSeqNotFound() { suite.submitter.R = suite.badSeq - suite.submitter.R = suite.badSeq + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Return(sql.ErrNoRows).Twice() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Twice() + suite.db.On("GetSequenceNumbers", []string{suite.unmuxedSource.Address()}). + Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). + Once() r := <-suite.system.Submit( suite.ctx, suite.successTx.Transaction.TxEnvelope, @@ -174,6 +231,18 @@ func (suite *SystemTestSuite) TestSubmit_BadSeqNotFound() { // If no result found and no error submitting, add to open transaction list. func (suite *SystemTestSuite) TestSubmit_OpenTransactionList() { + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Return(sql.ErrNoRows).Once() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() + suite.db.On("GetSequenceNumbers", []string{suite.unmuxedSource.Address()}). + Return(map[string]uint64{suite.unmuxedSource.Address(): 0}, nil). + Once() + suite.system.Submit( suite.ctx, suite.successTx.Transaction.TxEnvelope, @@ -190,6 +259,12 @@ func (suite *SystemTestSuite) TestSubmit_OpenTransactionList() { // Tick should be a no-op if there are no open submissions. func (suite *SystemTestSuite) TestTick_Noop() { + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.system.Tick(suite.ctx) } @@ -199,51 +274,56 @@ func (suite *SystemTestSuite) TestTick_Noop() { // `sys.Sequences.Get(addys)` is delayed by 1 second. It allows to simulate two // calls to `Tick()` executed at the same time. func (suite *SystemTestSuite) TestTick_Deadlock() { - secondDone := make(chan bool, 1) - testDone := make(chan bool) - - go func() { - select { - case <-secondDone: - // OK! - case <-time.After(5 * time.Second): - assert.Fail(suite.T(), "Timeout, likely a deadlock in Tick()") - } - - testDone <- true - }() + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() // Start first Tick suite.system.SubmissionQueue.Push("address", 0) - // Configure suite.sequences to return after 1 second in a first call - suite.sequences.On("GetSequenceNumbers", []string{"address"}).After(time.Second).Return(map[string]uint64{}, nil) - - go func() { - fmt.Println("Starting first Tick()") - suite.system.Tick(suite.ctx) - fmt.Println("Finished first Tick()") - }() - - go func() { - // Start second Tick - should be deadlocked if mutex is not Unlock()'ed. - fmt.Println("Starting second Tick()") - suite.system.Tick(suite.ctx) - fmt.Println("Finished second Tick()") - secondDone <- true - }() - - <-testDone + suite.db.On("GetSequenceNumbers", []string{"address"}). + Return(map[string]uint64{}, nil). + Run(func(args mock.Arguments) { + // Start second tick + suite.system.Tick(suite.ctx) + }). + Once() + + suite.system.Tick(suite.ctx) } // Test that Tick finishes any available transactions, func (suite *SystemTestSuite) TestTick_FinishesTransactions() { l := make(chan Result, 1) suite.system.Pending.Add(suite.ctx, suite.successTx.Transaction.TransactionHash, l) + + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Return(sql.ErrNoRows).Once() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() + suite.system.Tick(suite.ctx) + assert.Equal(suite.T(), 0, len(l)) assert.Equal(suite.T(), 1, len(suite.system.Pending.Pending(suite.ctx))) - suite.results.Results = []Result{suite.successTx} + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Run(func(args mock.Arguments) { + ptr := args.Get(0).(*history.Transaction) + *ptr = suite.successTx.Transaction + }). + Return(nil).Once() + suite.system.Tick(suite.ctx) assert.Equal(suite.T(), 1, len(l)) @@ -270,17 +350,34 @@ func (suite *SystemTestSuite) TestTickFinishFeeBumpTransaction() { }, } - suite.sequences = &MockSequenceProvider{} - suite.sequences.On("GetSequenceNumbers", []string{"GABQGAYAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB2MX"}). + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, innerHash). + Return(sql.ErrNoRows).Once() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() + suite.db.On("GetSequenceNumbers", []string{"GABQGAYAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB2MX"}). Return(map[string]uint64{"GABQGAYAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB2MX": 96}, nil). Once() - suite.system.Sequences = suite.sequences l := suite.system.Submit(suite.ctx, innerTxEnvelope, parsedInnerTx, innerHash) assert.Equal(suite.T(), 0, len(l)) assert.Equal(suite.T(), 1, len(suite.system.Pending.Pending(suite.ctx))) - suite.results.ResultForInnerHash[innerHash] = feeBumpTx + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, innerHash). + Run(func(args mock.Arguments) { + ptr := args.Get(0).(*history.Transaction) + *ptr = feeBumpTx.Transaction + }). + Return(nil).Once() + suite.system.Tick(suite.ctx) assert.Equal(suite.T(), 1, len(l)) @@ -296,6 +393,16 @@ func (suite *SystemTestSuite) TestTick_RemovesStaleSubmissions() { suite.system.SubmissionTimeout = 100 * time.Millisecond suite.system.Pending.Add(suite.ctx, suite.successTx.Transaction.TransactionHash, l) <-time.After(101 * time.Millisecond) + + suite.db.On("BeginTx", &sql.TxOptions{ + Isolation: sql.LevelRepeatableRead, + ReadOnly: true, + }).Return(nil).Once() + suite.db.On("Rollback").Return(nil).Once() + suite.db.On("TransactionByHash", mock.Anything, suite.successTx.Transaction.TransactionHash). + Return(sql.ErrNoRows).Once() + suite.db.On("NoRows", sql.ErrNoRows).Return(true).Once() + suite.system.Tick(suite.ctx) assert.Equal(suite.T(), 0, len(suite.system.Pending.Pending(suite.ctx))) diff --git a/services/horizon/internal/txsub/test_helpers.go b/services/horizon/internal/txsub/test_helpers.go deleted file mode 100644 index 2495b3c2f5..0000000000 --- a/services/horizon/internal/txsub/test_helpers.go +++ /dev/null @@ -1,59 +0,0 @@ -package txsub - -// This file provides mock implementations for the txsub interfaces -// which are useful in a testing context. -// -// NOTE: this file is not a test file so that other packages may import -// txsub and use these mocks in their own tests - -import ( - "context" - - "github.com/stretchr/testify/mock" -) - -// MockSubmitter is a test helper that simplements the Submitter interface -type MockSubmitter struct { - R SubmissionResult - WasSubmittedTo bool -} - -// Submit implements `txsub.Submitter` -func (sub *MockSubmitter) Submit(ctx context.Context, env string) SubmissionResult { - sub.WasSubmittedTo = true - return sub.R -} - -// MockResultProvider is a test helper that simplements the ResultProvider -// interface -type MockResultProvider struct { - Results []Result - ResultForInnerHash map[string]Result -} - -// ResultByHash implements `txsub.ResultProvider` -func (results *MockResultProvider) ResultByHash(ctx context.Context, hash string) Result { - if r, ok := results.ResultForInnerHash[hash]; ok { - return r - } - - if len(results.Results) > 0 { - r := results.Results[0] - results.Results = results.Results[1:] - return r - } - - return Result{Err: ErrNoResults} -} - -// MockSequenceProvider is a test helper that simplements the SequenceProvider -// interface -type MockSequenceProvider struct { - mock.Mock -} - -// Get implements `txsub.SequenceProvider` -func (o *MockSequenceProvider) GetSequenceNumbers(addresses []string) (map[string]uint64, error) { - args := o.Called(addresses) - return args.Get(0).(map[string]uint64), args.Error(1) -} From abb0cec2759149522127f479eab9558cfde08e77 Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 13 Jul 2020 19:01:15 +0200 Subject: [PATCH 2/4] Update services/horizon/internal/txsub/system.go Co-authored-by: Bartek Nowotarski --- services/horizon/internal/txsub/system.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/txsub/system.go b/services/horizon/internal/txsub/system.go index 681ee444b6..e8b067edee 100644 --- a/services/horizon/internal/txsub/system.go +++ b/services/horizon/internal/txsub/system.go @@ -246,7 +246,7 @@ func (sys *System) Tick(ctx context.Context) { ReadOnly: true, } if err := db.BeginTx(options); err != nil { - logger.WithError(err).Warn("could not start repeatable read transaction for txsub tick") + logger.WithError(err).Error("could not start repeatable read transaction for txsub tick") return } defer db.Rollback() From 0c25b1d166eabec4c6191ecdc087f1f1316bf4cf Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Tue, 14 Jul 2020 16:51:59 +0200 Subject: [PATCH 3/4] Remove comment from initSubmissionSystem --- services/horizon/internal/init.go | 32 ------------------------------- 1 file changed, 32 deletions(-) diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 2b3235bfa7..fa71764cd8 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -162,38 +162,6 @@ func initWebMetrics(app *App) { } func initSubmissionSystem(app *App) { - // Due to a delay between Stellar-Core closing a ledger and Horizon - // ingesting it, it's possible that results of transaction submitted to - // Stellar-Core via Horizon may not be immediately visible. This is - // happening because `txsub` package checks two sources when checking a - // transaction result: Stellar-Core and Horizon DB. - // - // The extreme case is https://github.com/stellar/go/issues/2272 where - // results of transaction creating an account are not visible: requesting - // account details in Horizon returns `404 Not Found`: - // - // ``` - // Horizon DB Core DB User - // | | | - // | | | - // | | <------- Submit create_account op - // | | | - // | Insert transaction | - // | | | - // | Tx found -----------------> | - // | | | - // | | | - // | <--------------------------------------- Get account info - // | | | - // | | | - // Account NOT found ------------------------------------> | - // | | | - // Insert account | | - // ``` - // - // To fix this skip checking Stellar-Core DB for transaction results if - // Horizon is ingesting failed transactions. - app.submitter = &txsub.System{ Pending: txsub.NewDefaultSubmissionList(), Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL), From d40502c3c3da7f1664f325e4155b6644c90c4a76 Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Tue, 14 Jul 2020 17:11:45 +0200 Subject: [PATCH 4/4] PR fixes --- services/horizon/internal/txsub/results.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/txsub/results.go b/services/horizon/internal/txsub/results.go index 7b25ea2061..f8cb0099e0 100644 --- a/services/horizon/internal/txsub/results.go +++ b/services/horizon/internal/txsub/results.go @@ -4,6 +4,7 @@ import ( "database/sql" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -16,7 +17,7 @@ func txResultByHash(db HorizonDB, hash string) (history.Transaction, error) { } if !db.NoRows(err) { - return hr, err + return hr, errors.Wrap(err, "could not lookup transaction by hash") } // if no result was found in either db, return ErrNoResults @@ -32,18 +33,25 @@ func txResultFromHistory(tx history.Transaction) (history.Transaction, error) { ResultXDR: tx.TxResult, } } + } else { + err = errors.Wrap(err, "could not unmarshall transaction result") } return tx, err } +// checkTxAlreadyExists uses a repeatable read transaction to look up both transaction results +// and sequence numbers. Without the repeatable read transaction it is possible that the two database +// queries execute on different ledgers. In this case, txsub can mistakenly respond with a bad_seq error +// because the first query occurs when the tx is not yet ingested and the second query occurs when the tx +// is ingested. func checkTxAlreadyExists(db HorizonDB, hash, sourceAddress string) (history.Transaction, uint64, error) { err := db.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, ReadOnly: true, }) if err != nil { - return history.Transaction{}, 0, err + return history.Transaction{}, 0, errors.Wrap(err, "cannot start repeatable read tx") } defer db.Rollback() @@ -52,7 +60,7 @@ func checkTxAlreadyExists(db HorizonDB, hash, sourceAddress string) (history.Tra var sequenceNumbers map[string]uint64 sequenceNumbers, err = db.GetSequenceNumbers([]string{sourceAddress}) if err != nil { - return tx, 0, err + return tx, 0, errors.Wrapf(err, "cannot fetch sequence number for %v", sourceAddress) } num, ok := sequenceNumbers[sourceAddress]