Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

horizon: Modify tx submission system to work with RO database #4418

Merged
merged 16 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
* 'whitelist by account id' ([4221](https://github.com/stellar/go/issues/4221))
* 'whitelist by canonical asset id' ([4222](https://github.com/stellar/go/issues/4222))

The filters and their configuration are optional features and must be enabled with horizon command line parameters `admin-port=4200` and `enable-ingestion-filtering=true`
The filters and their configuration are optional features and must be enabled with horizon command line parameters `admin-port=4200` and `exp-enable-ingestion-filtering=true`

Once set, filter configurations and their rules are initially empty and the filters are disabled by default. To enable filters, update the configuration settings, refer to the Admin API Docs which are published on the Admin Port at http://localhost:<admin_port>/, follow details and examples for endpoints:
* `/ingestion/filters/account`
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/actions/submit_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

type NetworkSubmitter interface {
Submit(ctx context.Context, rawTx string, envelope xdr.TransactionEnvelope, hash string, innerHash string) <-chan txsub.Result
Submit(ctx context.Context, rawTx string, envelope xdr.TransactionEnvelope, hash string) <-chan txsub.Result
}

type SubmitTransactionHandler struct {
Expand Down Expand Up @@ -155,7 +155,7 @@ func (handler SubmitTransactionHandler) GetResource(w HeaderWriter, r *http.Requ
return nil, hProblem.StaleHistory
}

submission := handler.Submitter.Submit(r.Context(), info.raw, info.parsed, info.hash, info.innerHash)
submission := handler.Submitter.Submit(r.Context(), info.raw, info.parsed, info.hash)

select {
case result := <-submission:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type networkSubmitterMock struct {
mock.Mock
}

func (m *networkSubmitterMock) Submit(ctx context.Context, rawTx string, envelope xdr.TransactionEnvelope, hash string, innerHash string) <-chan txsub.Result {
func (m *networkSubmitterMock) Submit(ctx context.Context, rawTx string, envelope xdr.TransactionEnvelope, hash string) <-chan txsub.Result {
a := m.Called()
return a.Get(0).(chan txsub.Result)
}
Expand Down
11 changes: 5 additions & 6 deletions services/horizon/internal/db2/history/fee_bump_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,17 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
})
ctx := context.Background()
insertBuilder := q.NewTransactionBatchInsertBuilder(2)
prefilterInsertBuilder := q.NewTransactionFilteredTmpBatchInsertBuilder(2)
// include both fee bump and normal transaction in the same batch
// to make sure both kinds of transactions can be inserted using a single exec statement
tt.Assert.NoError(insertBuilder.Add(ctx, feeBumpTransaction, sequence))
tt.Assert.NoError(insertBuilder.Add(ctx, normalTransaction, sequence))
tt.Assert.NoError(insertBuilder.Exec(ctx))

tt.Assert.NoError(q.InitEmptyTxSubmissionResult(ctx, hex.EncodeToString(normalTransaction.Result.TransactionHash[:]), ""))
tt.Assert.NoError(q.InitEmptyTxSubmissionResult(ctx, fixture.OuterHash, fixture.InnerHash))
txs := []ingest.LedgerTransaction{normalTransaction, feeBumpTransaction}
affectedRows, err := q.SetTxSubmissionResults(ctx, txs, uint32(fixture.Ledger.Sequence), fixture.Ledger.ClosedAt)
tt.Assert.NoError(err)
tt.Assert.Equal(int64(2), affectedRows)
tt.Assert.NoError(prefilterInsertBuilder.Add(ctx, feeBumpTransaction, sequence))
tt.Assert.NoError(prefilterInsertBuilder.Add(ctx, normalTransaction, sequence))
tt.Assert.NoError(prefilterInsertBuilder.Exec(ctx))

account := fixture.Envelope.SourceAccount().ToAccountId()
feeBumpAccount := fixture.Envelope.FeeBumpAccount().ToAccountId()

Expand Down
8 changes: 7 additions & 1 deletion services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ type IngestionQ interface {
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
QTxSubmissionResult

Begin() error
BeginTx(*sql.TxOptions) error
Expand All @@ -282,6 +281,7 @@ type IngestionQ interface {
GetLiquidityPoolCompactionSequence(context.Context) (uint32, error)
TruncateIngestStateTables(context.Context) error
DeleteRangeAll(ctx context.Context, start, end int64) error
DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error)
}

// QAccounts defines account related queries.
Expand Down Expand Up @@ -719,6 +719,12 @@ type Transaction struct {
TransactionWithoutLedger
}

// Transaction is a row of data from the `history_transactions_filtered_tmp` table
type TransactionFilteredTmp struct {
CreatedAt time.Time `db:"created_at"`
TransactionWithoutLedger
}

func (t *Transaction) HasPreconditions() bool {
return !t.TimeBounds.Null ||
!t.LedgerBounds.Null ||
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ func (m *MockQTransactions) NewTransactionBatchInsertBuilder(maxBatchSize int) T
a := m.Called(maxBatchSize)
return a.Get(0).(TransactionBatchInsertBuilder)
}

func (m *MockQTransactions) NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
a := m.Called(maxBatchSize)
return a.Get(0).(TransactionBatchInsertBuilder)
}
39 changes: 0 additions & 39 deletions services/horizon/internal/db2/history/mock_q_txsub_result.go

This file was deleted.

12 changes: 6 additions & 6 deletions services/horizon/internal/db2/history/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,13 @@ func TestOperationIncludeTransactions(t *testing.T) {
func TestValidateTransactionForOperation(t *testing.T) {
tt := test.Start(t)
tt.Scenario("failed_transactions")
selectTransactionCopy := selectTransaction
selectTransactionCopy := selectTransactionHistory
defer func() {
selectTransaction = selectTransactionCopy
selectTransactionHistory = selectTransactionCopy
tt.Finish()
}()

selectTransaction = sq.Select(
selectTransactionHistory = sq.Select(
"ht.transaction_hash, " +
"ht.tx_result, " +
"COALESCE(ht.successful, true) as successful").
Expand All @@ -435,7 +435,7 @@ func TestValidateTransactionForOperation(t *testing.T) {
tt.Assert.Error(err)
tt.Assert.EqualError(err, "transaction id 0 does not match transaction id in operation 17179877376")

selectTransaction = sq.Select(
selectTransactionHistory = sq.Select(
"ht.id, " +
"ht.transaction_hash, " +
"COALESCE(ht.successful, true) as successful").
Expand All @@ -452,7 +452,7 @@ func TestValidateTransactionForOperation(t *testing.T) {
tt.Assert.Error(err)
tt.Assert.EqualError(err, "transaction result does not match transaction result in operation AAAAAAAAAGQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAAA=")

selectTransaction = sq.Select(
selectTransactionHistory = sq.Select(
"ht.id, " +
"ht.tx_result, " +
"COALESCE(ht.successful, true) as successful").
Expand All @@ -469,7 +469,7 @@ func TestValidateTransactionForOperation(t *testing.T) {
tt.Assert.Error(err)
tt.Assert.EqualError(err, "transaction hash does not match transaction hash in operation 1c454630267aa8767ec8c8e30450cea6ba660145e9c924abb75d7a6669b6c28a")

selectTransaction = sq.Select(
selectTransactionHistory = sq.Select(
"ht.id, " +
"ht.tx_result, " +
"ht.transaction_hash").
Expand Down
142 changes: 78 additions & 64 deletions services/horizon/internal/db2/history/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"fmt"

sq "github.com/Masterminds/squirrel"

Expand All @@ -14,42 +15,38 @@ import (
// TransactionByHash is a query that loads a single row from the
// `history_transactions` table based upon the provided hash.
func (q *Q) TransactionByHash(ctx context.Context, dest interface{}, hash string) error {
byHash := selectTransaction.
Where("ht.transaction_hash = ?", hash)
byInnerHash := selectTransaction.
Where("ht.inner_transaction_hash = ?", hash)
innerOrOuter := sq.Or{sq.Eq{"ht.transaction_hash": hash}, sq.Eq{"ht.inner_transaction_hash": hash}}
byHashOrInnerHashHistory := selectTransactionHistory.Where(innerOrOuter)

byInnerHashString, args, err := byInnerHash.ToSql()
if err != nil {
return errors.Wrap(err, "could not get string for inner hash sql query")
}
union := byHash.Suffix("UNION ALL "+byInnerHashString, args...)
return q.Get(ctx, dest, byHashOrInnerHashHistory)
}

func (q *Q) PreFilteredTransactionByHash(ctx context.Context, dest interface{}, hash string) error {
innerOrOuter := sq.Or{sq.Eq{"ht.transaction_hash": hash}, sq.Eq{"ht.inner_transaction_hash": hash}}
byHashOrInnerHashPreFilter := selectTransactionPreFilteredTmp.Where(innerOrOuter)

return q.Get(ctx, dest, union)
return q.Get(ctx, dest, byHashOrInnerHashPreFilter)
}

// TransactionsByHashesSinceLedger fetches transactions from the `history_transactions`
// TransactionsByHashesSinceLedger fetches transactions from `history_transactions_filtered_tmp`
// table which match the given hash since the given ledger sequence (for perf reasons).
func (q *Q) TransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]Transaction, error) {
func (q *Q) AllTransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]Transaction, error) {
var dest []Transaction
byHash := selectTransaction.
Where(map[string]interface{}{"ht.transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})
byInnerHash := selectTransaction.
Where(map[string]interface{}{"ht.inner_transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})

byInnerHashString, args, err := byInnerHash.ToSql()
innerOrOuterAndSeqGtEq :=
sq.And{sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq}, sq.Or{sq.Eq{"ht.transaction_hash": hashes}, sq.Eq{"ht.inner_transaction_hash": hashes}}}

preFilteredTxs := selectTransactionPreFilteredTmp.Where(innerOrOuterAndSeqGtEq)
historyTxs := selectTransactionHistory.Where(innerOrOuterAndSeqGtEq)

preFilteredTxsString, args, err := preFilteredTxs.ToSql()
if err != nil {
return nil, errors.Wrap(err, "could not get string for inner hash sql query")
return nil, errors.Wrap(err, "could not get string for un filtered sql query")
}
union := byHash.Suffix("UNION ALL "+byInnerHashString, args...)

err = q.Select(ctx, &dest, union)
if err != nil {
union := historyTxs.Suffix("UNION ALL "+preFilteredTxsString, args...)
if err := q.Select(ctx, &dest, union); err != nil {
return nil, err
}

return dest, nil
}

Expand All @@ -60,7 +57,7 @@ func (q *Q) TransactionsByIDs(ctx context.Context, ids ...int64) (map[int64]Tran
return nil, errors.New("no id arguments provided")
}

sql := selectTransaction.Where(map[string]interface{}{
sql := selectTransactionHistory.Where(map[string]interface{}{
"ht.id": ids,
})

Expand All @@ -77,13 +74,24 @@ func (q *Q) TransactionsByIDs(ctx context.Context, ids ...int64) (map[int64]Tran
return byID, nil
}

// DeleteTransactionsFilteredTmpOlderThan deletes entries older than certain duration
func (q *Q) DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error) {
sql := sq.Delete("history_transactions_filtered_tmp").
Where(sq.Expr("now() >= (created_at + interval '1 second' * ?)", howOldInSeconds))
result, err := q.Exec(ctx, sql)
if err != nil {
return 0, err
}
return result.RowsAffected()
}

// Transactions provides a helper to filter rows from the `history_transactions`
// table with pre-defined filters. See `TransactionsQ` methods for the
// available filters.
func (q *Q) Transactions() *TransactionsQ {
return &TransactionsQ{
parent: q,
sql: selectTransaction,
sql: selectTransactionHistory,
includeFailed: false,
}
}
Expand Down Expand Up @@ -225,42 +233,48 @@ func (q *TransactionsQ) Select(ctx context.Context, dest interface{}) error {
// QTransactions defines transaction related queries.
type QTransactions interface {
NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder
NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder
}

func selectTransaction(table string) sq.SelectBuilder {
return sq.Select(
"ht.id, " +
"ht.transaction_hash, " +
"ht.ledger_sequence, " +
"ht.application_order, " +
"ht.account, " +
"ht.account_muxed, " +
"ht.account_sequence, " +
"ht.max_fee, " +
// `fee_charged` is NULL by default, DB needs to be reingested
// to populate the value. If value is not present display `max_fee`.
"COALESCE(ht.fee_charged, ht.max_fee) as fee_charged, " +
"ht.operation_count, " +
"ht.tx_envelope, " +
"ht.tx_result, " +
"ht.tx_meta, " +
"ht.tx_fee_meta, " +
"ht.created_at, " +
"ht.updated_at, " +
"COALESCE(ht.successful, true) as successful, " +
"ht.signatures, " +
"ht.memo_type, " +
"ht.memo, " +
"ht.time_bounds, " +
"ht.ledger_bounds, " +
"ht.min_account_sequence, " +
"ht.min_account_sequence_age, " +
"ht.min_account_sequence_ledger_gap, " +
"ht.extra_signers, " +
"hl.closed_at AS ledger_close_time, " +
"ht.inner_transaction_hash, " +
"ht.fee_account, " +
"ht.fee_account_muxed, " +
"ht.new_max_fee, " +
"ht.inner_signatures").
From(fmt.Sprintf("%s ht", table)).
LeftJoin("history_ledgers hl ON ht.ledger_sequence = hl.sequence")
}

var selectTransaction = sq.Select(
"ht.id, " +
"ht.transaction_hash, " +
"ht.ledger_sequence, " +
"ht.application_order, " +
"ht.account, " +
"ht.account_muxed, " +
"ht.account_sequence, " +
"ht.max_fee, " +
// `fee_charged` is NULL by default, DB needs to be reingested
// to populate the value. If value is not present display `max_fee`.
"COALESCE(ht.fee_charged, ht.max_fee) as fee_charged, " +
"ht.operation_count, " +
"ht.tx_envelope, " +
"ht.tx_result, " +
"ht.tx_meta, " +
"ht.tx_fee_meta, " +
"ht.created_at, " +
"ht.updated_at, " +
"COALESCE(ht.successful, true) as successful, " +
"ht.signatures, " +
"ht.memo_type, " +
"ht.memo, " +
"ht.time_bounds, " +
"ht.ledger_bounds, " +
"ht.min_account_sequence, " +
"ht.min_account_sequence_age, " +
"ht.min_account_sequence_ledger_gap, " +
"ht.extra_signers, " +
"hl.closed_at AS ledger_close_time, " +
"ht.inner_transaction_hash, " +
"ht.fee_account, " +
"ht.fee_account_muxed, " +
"ht.new_max_fee, " +
"ht.inner_signatures").
From("history_transactions ht").
LeftJoin("history_ledgers hl ON ht.ledger_sequence = hl.sequence")
var selectTransactionHistory = selectTransaction("history_transactions")
var selectTransactionPreFilteredTmp = selectTransaction("history_transactions_filtered_tmp")
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func (q *Q) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchI
}
}

// NewTransactionBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance
func (q *Q) NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
return &transactionBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_transactions_filtered_tmp"),
MaxBatchSize: maxBatchSize,
},
}
}

// Add adds a new transaction to the batch
func (i *transactionBatchInsertBuilder) Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error {
row, err := transactionToRow(transaction, sequence, i.encodingBuffer)
Expand Down
Loading