Skip to content

Commit

Permalink
horizon: Modify tx submission system to work with RO database (stella…
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored and sreuland committed Jul 5, 2022
1 parent f174482 commit b3407fd
Show file tree
Hide file tree
Showing 42 changed files with 703 additions and 986 deletions.
2 changes: 1 addition & 1 deletion services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The migration makes the following schema changes:
* 'whitelist by account id' ([4221](https://github.com/stellar/go/issues/4221))
* 'whitelist by canonical asset id' ([4222](https://github.com/stellar/go/issues/4222))

The filters and their configuration are optional features and must be enabled with horizon command line parameters `admin-port=4200` and `enable-ingestion-filtering=true`
The filters and their configuration are optional features and must be enabled with horizon command line parameters `admin-port=4200` and `exp-enable-ingestion-filtering=true`

Once set, filter configurations and their rules are initially empty and the filters are disabled by default. To enable filters, update the configuration settings, refer to the Admin API Docs which are published on the Admin Port at http://localhost:<admin_port>/, follow details and examples for endpoints:
* `/ingestion/filters/account`
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/actions/submit_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

type NetworkSubmitter interface {
Submit(ctx context.Context, rawTx string, envelope xdr.TransactionEnvelope, hash string, innerHash string) <-chan txsub.Result
Submit(ctx context.Context, rawTx string, envelope xdr.TransactionEnvelope, hash string) <-chan txsub.Result
}

type SubmitTransactionHandler struct {
Expand Down Expand Up @@ -155,7 +155,7 @@ func (handler SubmitTransactionHandler) GetResource(w HeaderWriter, r *http.Requ
return nil, hProblem.StaleHistory
}

submission := handler.Submitter.Submit(r.Context(), info.raw, info.parsed, info.hash, info.innerHash)
submission := handler.Submitter.Submit(r.Context(), info.raw, info.parsed, info.hash)

select {
case result := <-submission:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type networkSubmitterMock struct {
mock.Mock
}

func (m *networkSubmitterMock) Submit(ctx context.Context, rawTx string, envelope xdr.TransactionEnvelope, hash string, innerHash string) <-chan txsub.Result {
func (m *networkSubmitterMock) Submit(ctx context.Context, rawTx string, envelope xdr.TransactionEnvelope, hash string) <-chan txsub.Result {
a := m.Called()
return a.Get(0).(chan txsub.Result)
}
Expand Down
11 changes: 5 additions & 6 deletions services/horizon/internal/db2/history/fee_bump_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,17 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
})
ctx := context.Background()
insertBuilder := q.NewTransactionBatchInsertBuilder(2)
prefilterInsertBuilder := q.NewTransactionFilteredTmpBatchInsertBuilder(2)
// include both fee bump and normal transaction in the same batch
// to make sure both kinds of transactions can be inserted using a single exec statement
tt.Assert.NoError(insertBuilder.Add(ctx, feeBumpTransaction, sequence))
tt.Assert.NoError(insertBuilder.Add(ctx, normalTransaction, sequence))
tt.Assert.NoError(insertBuilder.Exec(ctx))

tt.Assert.NoError(q.InitEmptyTxSubmissionResult(ctx, hex.EncodeToString(normalTransaction.Result.TransactionHash[:]), ""))
tt.Assert.NoError(q.InitEmptyTxSubmissionResult(ctx, fixture.OuterHash, fixture.InnerHash))
txs := []ingest.LedgerTransaction{normalTransaction, feeBumpTransaction}
affectedRows, err := q.SetTxSubmissionResults(ctx, txs, uint32(fixture.Ledger.Sequence), fixture.Ledger.ClosedAt)
tt.Assert.NoError(err)
tt.Assert.Equal(int64(2), affectedRows)
tt.Assert.NoError(prefilterInsertBuilder.Add(ctx, feeBumpTransaction, sequence))
tt.Assert.NoError(prefilterInsertBuilder.Add(ctx, normalTransaction, sequence))
tt.Assert.NoError(prefilterInsertBuilder.Exec(ctx))

account := fixture.Envelope.SourceAccount().ToAccountId()
feeBumpAccount := fixture.Envelope.FeeBumpAccount().ToAccountId()

Expand Down
8 changes: 7 additions & 1 deletion services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ type IngestionQ interface {
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
QTxSubmissionResult

Begin() error
BeginTx(*sql.TxOptions) error
Expand All @@ -282,6 +281,7 @@ type IngestionQ interface {
GetLiquidityPoolCompactionSequence(context.Context) (uint32, error)
TruncateIngestStateTables(context.Context) error
DeleteRangeAll(ctx context.Context, start, end int64) error
DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error)
}

// QAccounts defines account related queries.
Expand Down Expand Up @@ -719,6 +719,12 @@ type Transaction struct {
TransactionWithoutLedger
}

// Transaction is a row of data from the `history_transactions_filtered_tmp` table
type TransactionFilteredTmp struct {
CreatedAt time.Time `db:"created_at"`
TransactionWithoutLedger
}

func (t *Transaction) HasPreconditions() bool {
return !t.TimeBounds.Null ||
!t.LedgerBounds.Null ||
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ func (m *MockQTransactions) NewTransactionBatchInsertBuilder(maxBatchSize int) T
a := m.Called(maxBatchSize)
return a.Get(0).(TransactionBatchInsertBuilder)
}

func (m *MockQTransactions) NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
a := m.Called(maxBatchSize)
return a.Get(0).(TransactionBatchInsertBuilder)
}
39 changes: 0 additions & 39 deletions services/horizon/internal/db2/history/mock_q_txsub_result.go

This file was deleted.

12 changes: 6 additions & 6 deletions services/horizon/internal/db2/history/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,13 @@ func TestOperationIncludeTransactions(t *testing.T) {
func TestValidateTransactionForOperation(t *testing.T) {
tt := test.Start(t)
tt.Scenario("failed_transactions")
selectTransactionCopy := selectTransaction
selectTransactionCopy := selectTransactionHistory
defer func() {
selectTransaction = selectTransactionCopy
selectTransactionHistory = selectTransactionCopy
tt.Finish()
}()

selectTransaction = sq.Select(
selectTransactionHistory = sq.Select(
"ht.transaction_hash, " +
"ht.tx_result, " +
"COALESCE(ht.successful, true) as successful").
Expand All @@ -435,7 +435,7 @@ func TestValidateTransactionForOperation(t *testing.T) {
tt.Assert.Error(err)
tt.Assert.EqualError(err, "transaction id 0 does not match transaction id in operation 17179877376")

selectTransaction = sq.Select(
selectTransactionHistory = sq.Select(
"ht.id, " +
"ht.transaction_hash, " +
"COALESCE(ht.successful, true) as successful").
Expand All @@ -452,7 +452,7 @@ func TestValidateTransactionForOperation(t *testing.T) {
tt.Assert.Error(err)
tt.Assert.EqualError(err, "transaction result does not match transaction result in operation AAAAAAAAAGQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAAA=")

selectTransaction = sq.Select(
selectTransactionHistory = sq.Select(
"ht.id, " +
"ht.tx_result, " +
"COALESCE(ht.successful, true) as successful").
Expand All @@ -469,7 +469,7 @@ func TestValidateTransactionForOperation(t *testing.T) {
tt.Assert.Error(err)
tt.Assert.EqualError(err, "transaction hash does not match transaction hash in operation 1c454630267aa8767ec8c8e30450cea6ba660145e9c924abb75d7a6669b6c28a")

selectTransaction = sq.Select(
selectTransactionHistory = sq.Select(
"ht.id, " +
"ht.tx_result, " +
"ht.transaction_hash").
Expand Down
142 changes: 78 additions & 64 deletions services/horizon/internal/db2/history/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"fmt"

sq "github.com/Masterminds/squirrel"

Expand All @@ -14,42 +15,38 @@ import (
// TransactionByHash is a query that loads a single row from the
// `history_transactions` table based upon the provided hash.
func (q *Q) TransactionByHash(ctx context.Context, dest interface{}, hash string) error {
byHash := selectTransaction.
Where("ht.transaction_hash = ?", hash)
byInnerHash := selectTransaction.
Where("ht.inner_transaction_hash = ?", hash)
innerOrOuter := sq.Or{sq.Eq{"ht.transaction_hash": hash}, sq.Eq{"ht.inner_transaction_hash": hash}}
byHashOrInnerHashHistory := selectTransactionHistory.Where(innerOrOuter)

byInnerHashString, args, err := byInnerHash.ToSql()
if err != nil {
return errors.Wrap(err, "could not get string for inner hash sql query")
}
union := byHash.Suffix("UNION ALL "+byInnerHashString, args...)
return q.Get(ctx, dest, byHashOrInnerHashHistory)
}

func (q *Q) PreFilteredTransactionByHash(ctx context.Context, dest interface{}, hash string) error {
innerOrOuter := sq.Or{sq.Eq{"ht.transaction_hash": hash}, sq.Eq{"ht.inner_transaction_hash": hash}}
byHashOrInnerHashPreFilter := selectTransactionPreFilteredTmp.Where(innerOrOuter)

return q.Get(ctx, dest, union)
return q.Get(ctx, dest, byHashOrInnerHashPreFilter)
}

// TransactionsByHashesSinceLedger fetches transactions from the `history_transactions`
// TransactionsByHashesSinceLedger fetches transactions from `history_transactions_filtered_tmp`
// table which match the given hash since the given ledger sequence (for perf reasons).
func (q *Q) TransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]Transaction, error) {
func (q *Q) AllTransactionsByHashesSinceLedger(ctx context.Context, hashes []string, sinceLedgerSeq uint32) ([]Transaction, error) {
var dest []Transaction
byHash := selectTransaction.
Where(map[string]interface{}{"ht.transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})
byInnerHash := selectTransaction.
Where(map[string]interface{}{"ht.inner_transaction_hash": hashes}).
Where(sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq})

byInnerHashString, args, err := byInnerHash.ToSql()
innerOrOuterAndSeqGtEq :=
sq.And{sq.GtOrEq{"ht.ledger_sequence": sinceLedgerSeq}, sq.Or{sq.Eq{"ht.transaction_hash": hashes}, sq.Eq{"ht.inner_transaction_hash": hashes}}}

preFilteredTxs := selectTransactionPreFilteredTmp.Where(innerOrOuterAndSeqGtEq)
historyTxs := selectTransactionHistory.Where(innerOrOuterAndSeqGtEq)

preFilteredTxsString, args, err := preFilteredTxs.ToSql()
if err != nil {
return nil, errors.Wrap(err, "could not get string for inner hash sql query")
return nil, errors.Wrap(err, "could not get string for un filtered sql query")
}
union := byHash.Suffix("UNION ALL "+byInnerHashString, args...)

err = q.Select(ctx, &dest, union)
if err != nil {
union := historyTxs.Suffix("UNION ALL "+preFilteredTxsString, args...)
if err := q.Select(ctx, &dest, union); err != nil {
return nil, err
}

return dest, nil
}

Expand All @@ -60,7 +57,7 @@ func (q *Q) TransactionsByIDs(ctx context.Context, ids ...int64) (map[int64]Tran
return nil, errors.New("no id arguments provided")
}

sql := selectTransaction.Where(map[string]interface{}{
sql := selectTransactionHistory.Where(map[string]interface{}{
"ht.id": ids,
})

Expand All @@ -77,13 +74,24 @@ func (q *Q) TransactionsByIDs(ctx context.Context, ids ...int64) (map[int64]Tran
return byID, nil
}

// DeleteTransactionsFilteredTmpOlderThan deletes entries older than certain duration
func (q *Q) DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error) {
sql := sq.Delete("history_transactions_filtered_tmp").
Where(sq.Expr("now() >= (created_at + interval '1 second' * ?)", howOldInSeconds))
result, err := q.Exec(ctx, sql)
if err != nil {
return 0, err
}
return result.RowsAffected()
}

// Transactions provides a helper to filter rows from the `history_transactions`
// table with pre-defined filters. See `TransactionsQ` methods for the
// available filters.
func (q *Q) Transactions() *TransactionsQ {
return &TransactionsQ{
parent: q,
sql: selectTransaction,
sql: selectTransactionHistory,
includeFailed: false,
}
}
Expand Down Expand Up @@ -225,42 +233,48 @@ func (q *TransactionsQ) Select(ctx context.Context, dest interface{}) error {
// QTransactions defines transaction related queries.
type QTransactions interface {
NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder
NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder
}

func selectTransaction(table string) sq.SelectBuilder {
return sq.Select(
"ht.id, " +
"ht.transaction_hash, " +
"ht.ledger_sequence, " +
"ht.application_order, " +
"ht.account, " +
"ht.account_muxed, " +
"ht.account_sequence, " +
"ht.max_fee, " +
// `fee_charged` is NULL by default, DB needs to be reingested
// to populate the value. If value is not present display `max_fee`.
"COALESCE(ht.fee_charged, ht.max_fee) as fee_charged, " +
"ht.operation_count, " +
"ht.tx_envelope, " +
"ht.tx_result, " +
"ht.tx_meta, " +
"ht.tx_fee_meta, " +
"ht.created_at, " +
"ht.updated_at, " +
"COALESCE(ht.successful, true) as successful, " +
"ht.signatures, " +
"ht.memo_type, " +
"ht.memo, " +
"ht.time_bounds, " +
"ht.ledger_bounds, " +
"ht.min_account_sequence, " +
"ht.min_account_sequence_age, " +
"ht.min_account_sequence_ledger_gap, " +
"ht.extra_signers, " +
"hl.closed_at AS ledger_close_time, " +
"ht.inner_transaction_hash, " +
"ht.fee_account, " +
"ht.fee_account_muxed, " +
"ht.new_max_fee, " +
"ht.inner_signatures").
From(fmt.Sprintf("%s ht", table)).
LeftJoin("history_ledgers hl ON ht.ledger_sequence = hl.sequence")
}

var selectTransaction = sq.Select(
"ht.id, " +
"ht.transaction_hash, " +
"ht.ledger_sequence, " +
"ht.application_order, " +
"ht.account, " +
"ht.account_muxed, " +
"ht.account_sequence, " +
"ht.max_fee, " +
// `fee_charged` is NULL by default, DB needs to be reingested
// to populate the value. If value is not present display `max_fee`.
"COALESCE(ht.fee_charged, ht.max_fee) as fee_charged, " +
"ht.operation_count, " +
"ht.tx_envelope, " +
"ht.tx_result, " +
"ht.tx_meta, " +
"ht.tx_fee_meta, " +
"ht.created_at, " +
"ht.updated_at, " +
"COALESCE(ht.successful, true) as successful, " +
"ht.signatures, " +
"ht.memo_type, " +
"ht.memo, " +
"ht.time_bounds, " +
"ht.ledger_bounds, " +
"ht.min_account_sequence, " +
"ht.min_account_sequence_age, " +
"ht.min_account_sequence_ledger_gap, " +
"ht.extra_signers, " +
"hl.closed_at AS ledger_close_time, " +
"ht.inner_transaction_hash, " +
"ht.fee_account, " +
"ht.fee_account_muxed, " +
"ht.new_max_fee, " +
"ht.inner_signatures").
From("history_transactions ht").
LeftJoin("history_ledgers hl ON ht.ledger_sequence = hl.sequence")
var selectTransactionHistory = selectTransaction("history_transactions")
var selectTransactionPreFilteredTmp = selectTransaction("history_transactions_filtered_tmp")
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ func (q *Q) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchI
}
}

// NewTransactionBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance
func (q *Q) NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
return &transactionBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_transactions_filtered_tmp"),
MaxBatchSize: maxBatchSize,
},
}
}

// Add adds a new transaction to the batch
func (i *transactionBatchInsertBuilder) Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error {
row, err := transactionToRow(transaction, sequence, i.encodingBuffer)
Expand Down
Loading

0 comments on commit b3407fd

Please sign in to comment.