Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/txsub: Check transaction submission results using repeatable read transaction #2805

Merged
merged 5 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions services/horizon/internal/actions_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/expingest"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/services/horizon/internal/txsub"
"github.com/stellar/go/services/horizon/internal/txsub/sequence"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -277,15 +275,6 @@ func TestTransactionActions_Post(t *testing.T) {
// existing transaction
w := ht.Post("/transactions", form)
ht.Assert.Equal(200, w.Code)

// sequence buffer full
ht.App.submitter.Results = &txsub.MockResultProvider{
Results: []txsub.Result{
{Err: sequence.ErrNoMoreRoom},
},
}
w = ht.Post("/transactions", form)
ht.Assert.Equal(503, w.Code)
}

func TestTransactionActions_PostSuccessful(t *testing.T) {
Expand Down
38 changes: 2 additions & 36 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stellar/go/services/horizon/internal/expingest"
"github.com/stellar/go/services/horizon/internal/simplepath"
"github.com/stellar/go/services/horizon/internal/txsub"
results "github.com/stellar/go/services/horizon/internal/txsub/results/db"
"github.com/stellar/go/services/horizon/internal/txsub/sequence"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/log"
Expand Down Expand Up @@ -163,45 +162,12 @@ func initWebMetrics(app *App) {
}

func initSubmissionSystem(app *App) {
// Due to a delay between Stellar-Core closing a ledger and Horizon
// ingesting it, it's possible that results of transaction submitted to
// Stellar-Core via Horizon may not be immediately visible. This is
// happening because `txsub` package checks two sources when checking a
// transaction result: Stellar-Core and Horizon DB.
//
// The extreme case is https://github.com/stellar/go/issues/2272 where
// results of transaction creating an account are not visible: requesting
// account details in Horizon returns `404 Not Found`:
//
// ```
// Horizon DB Core DB User
// | | |
// | | |
// | | <------- Submit create_account op
// | | |
// | Insert transaction |
// | | |
// | Tx found -----------------> |
// | | |
// | | |
// | <--------------------------------------- Get account info
// | | |
// | | |
// Account NOT found ------------------------------------> |
// | | |
// Insert account | |
// ```
//
// To fix this skip checking Stellar-Core DB for transaction results if
// Horizon is ingesting failed transactions.

app.submitter = &txsub.System{
Pending: txsub.NewDefaultSubmissionList(),
tamirms marked this conversation as resolved.
Show resolved Hide resolved
Submitter: txsub.NewDefaultSubmitter(http.DefaultClient, app.config.StellarCoreURL),
SubmissionQueue: sequence.NewManager(),
Results: &results.DB{
History: &history.Q{Session: app.HorizonSession(context.Background())},
DB: func(ctx context.Context) txsub.HorizonDB {
return &history.Q{Session: app.HorizonSession(ctx)}
},
Sequences: &history.Q{Session: app.HorizonSession(context.Background())},
}
}
54 changes: 54 additions & 0 deletions services/horizon/internal/txsub/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package txsub

// This file provides mock implementations for the txsub interfaces
// which are useful in a testing context.
//
// NOTE: this file is not a test file so that other packages may import
// txsub and use these mocks in their own tests

import (
"context"
"database/sql"
"github.com/stretchr/testify/mock"
)

// MockSubmitter is a test helper that simplements the Submitter interface
type MockSubmitter struct {
R SubmissionResult
WasSubmittedTo bool
}

// Submit implements `txsub.Submitter`
func (sub *MockSubmitter) Submit(ctx context.Context, env string) SubmissionResult {
sub.WasSubmittedTo = true
return sub.R
}

type mockDBQ struct {
mock.Mock
}

func (m *mockDBQ) BeginTx(txOpts *sql.TxOptions) error {
args := m.Called(txOpts)
return args.Error(0)
}

func (m *mockDBQ) Rollback() error {
args := m.Called()
return args.Error(0)
}

func (m *mockDBQ) NoRows(err error) bool {
args := m.Called(err)
return args.Bool(0)
}

func (m *mockDBQ) GetSequenceNumbers(addresses []string) (map[string]uint64, error) {
args := m.Called(addresses)
return args.Get(0).(map[string]uint64), args.Error(1)
}

func (m *mockDBQ) TransactionByHash(dest interface{}, hash string) error {
args := m.Called(dest, hash)
return args.Error(0)
}
17 changes: 0 additions & 17 deletions services/horizon/internal/txsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,6 @@ import (
"github.com/stellar/go/xdr"
)

// ResultProvider represents an abstract store that can lookup Result objects
// by transaction hash or by [address,sequence] pairs. A ResultProvider is
// used within the transaction submission system to decide whether a submission should
// be submitted to the backing stellar-core process, as well as looking up the status
// of each transaction in the open submission list at each tick (i.e. ledger close)
type ResultProvider interface {
// Look up a result by transaction hash
ResultByHash(context.Context, string) Result
}

// SequenceProvider represents an abstract store that can lookup the current
// sequence number of an account. It is used by the SequenceLock to
type SequenceProvider interface {
// Look up a sequence by address
GetSequenceNumbers(addresses []string) (map[string]uint64, error)
}

// Listener represents some client who is interested in retrieving the result
// of a specific transaction.
type Listener chan<- Result
Expand Down
73 changes: 73 additions & 0 deletions services/horizon/internal/txsub/results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package txsub

import (
"database/sql"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

func txResultByHash(db HorizonDB, hash string) (history.Transaction, error) {
// query history database
var hr history.Transaction
err := db.TransactionByHash(&hr, hash)
if err == nil {
return txResultFromHistory(hr)
}

if !db.NoRows(err) {
return hr, errors.Wrap(err, "could not lookup transaction by hash")
}

// if no result was found in either db, return ErrNoResults
return hr, ErrNoResults
}

func txResultFromHistory(tx history.Transaction) (history.Transaction, error) {
var txResult xdr.TransactionResult
err := xdr.SafeUnmarshalBase64(tx.TxResult, &txResult)
if err == nil {
if !txResult.Successful() {
err = &FailedTransactionError{
ResultXDR: tx.TxResult,
}
}
} else {
err = errors.Wrap(err, "could not unmarshall transaction result")
}

return tx, err
}

// checkTxAlreadyExists uses a repeatable read transaction to look up both transaction results
// and sequence numbers. Without the repeatable read transaction it is possible that the two database
// queries execute on different ledgers. In this case, txsub can mistakenly respond with a bad_seq error
// because the first query occurs when the tx is not yet ingested and the second query occurs when the tx
// is ingested.
func checkTxAlreadyExists(db HorizonDB, hash, sourceAddress string) (history.Transaction, uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a short comment explaining why this has to be done in a repeatable read tx. And let's error.Wrap all errors in this function.

err := db.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
return history.Transaction{}, 0, errors.Wrap(err, "cannot start repeatable read tx")
}
defer db.Rollback()

tx, err := txResultByHash(db, hash)
if err == ErrNoResults {
var sequenceNumbers map[string]uint64
sequenceNumbers, err = db.GetSequenceNumbers([]string{sourceAddress})
if err != nil {
return tx, 0, errors.Wrapf(err, "cannot fetch sequence number for %v", sourceAddress)
}

num, ok := sequenceNumbers[sourceAddress]
if !ok {
return tx, 0, ErrNoAccount
}
return tx, num, ErrNoResults
}
return tx, 0, err
}
49 changes: 0 additions & 49 deletions services/horizon/internal/txsub/results/db/main.go

This file was deleted.

66 changes: 0 additions & 66 deletions services/horizon/internal/txsub/results/db/main_test.go

This file was deleted.

38 changes: 38 additions & 0 deletions services/horizon/internal/txsub/results_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package txsub

import (
"testing"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestGetIngestedTx(t *testing.T) {
tt := test.Start(t).Scenario("base")
defer tt.Finish()
q := &history.Q{Session: tt.HorizonSession()}
hash := "2374e99349b9ef7dba9a5db3339b78fda8f34777b1af33ba468ad5c0df946d4d"
tx, err := txResultByHash(q, hash)
tt.Assert.NoError(err)
tt.Assert.Equal(hash, tx.TransactionHash)
}

func TestGetMissingTx(t *testing.T) {
tt := test.Start(t).Scenario("base")
defer tt.Finish()
q := &history.Q{Session: tt.HorizonSession()}
hash := "adf1efb9fd253f53cbbe6230c131d2af19830328e52b610464652d67d2fb7195"

_, err := txResultByHash(q, hash)
tt.Assert.Equal(ErrNoResults, err)
}

func TestGetFailedTx(t *testing.T) {
tt := test.Start(t).Scenario("failed_transactions")
defer tt.Finish()
q := &history.Q{Session: tt.HorizonSession()}
hash := "aa168f12124b7c196c0adaee7c73a64d37f99428cacb59a91ff389626845e7cf"

_, err := txResultByHash(q, hash)
tt.Assert.Equal("AAAAAAAAAGT/////AAAAAQAAAAAAAAAB/////gAAAAA=", err.(*FailedTransactionError).ResultXDR)
}
Loading