Skip to content

Commit

Permalink
Use FastBatchInsertBuilder to insert transactions into the history_tr…
Browse files Browse the repository at this point in the history
…ansactions table
  • Loading branch information
tamirms committed Jul 10, 2023
1 parent 2808889 commit 386b660
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 85 deletions.
20 changes: 12 additions & 8 deletions services/horizon/internal/db2/history/fee_bump_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,19 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
hash: "edba3051b2f2d9b713e8a08709d631eccb72c59864ff3c564c68792271bb24a7",
})
ctx := context.Background()
insertBuilder := q.NewTransactionBatchInsertBuilder(2)
prefilterInsertBuilder := q.NewTransactionFilteredTmpBatchInsertBuilder(2)
tt.Assert.NoError(q.Begin())

insertBuilder := q.NewTransactionBatchInsertBuilder()
prefilterInsertBuilder := q.NewTransactionFilteredTmpBatchInsertBuilder()
// include both fee bump and normal transaction in the same batch
// to make sure both kinds of transactions can be inserted using a single exec statement
tt.Assert.NoError(insertBuilder.Add(ctx, feeBumpTransaction, sequence))
tt.Assert.NoError(insertBuilder.Add(ctx, normalTransaction, sequence))
tt.Assert.NoError(insertBuilder.Exec(ctx))
tt.Assert.NoError(insertBuilder.Add(feeBumpTransaction, sequence))
tt.Assert.NoError(insertBuilder.Add(normalTransaction, sequence))
tt.Assert.NoError(insertBuilder.Exec(ctx, q))

tt.Assert.NoError(prefilterInsertBuilder.Add(ctx, feeBumpTransaction, sequence))
tt.Assert.NoError(prefilterInsertBuilder.Add(ctx, normalTransaction, sequence))
tt.Assert.NoError(prefilterInsertBuilder.Exec(ctx))
tt.Assert.NoError(prefilterInsertBuilder.Add(feeBumpTransaction, sequence))
tt.Assert.NoError(prefilterInsertBuilder.Add(normalTransaction, sequence))
tt.Assert.NoError(prefilterInsertBuilder.Exec(ctx, q))

account := fixture.Envelope.SourceAccount().ToAccountId()
feeBumpAccount := fixture.Envelope.FeeBumpAccount().ToAccountId()
Expand Down Expand Up @@ -299,6 +301,8 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
tt.Assert.NoError(err)
tt.Assert.NoError(effectBuilder.Exec(ctx))

tt.Assert.NoError(q.Commit())

fixture.Transaction = Transaction{
TransactionWithoutLedger: TransactionWithoutLedger{
TotalOrderID: TotalOrderID{528280981504},
Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/db2/history/mock_q_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ type MockQTransactions struct {
mock.Mock
}

func (m *MockQTransactions) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
a := m.Called(maxBatchSize)
func (m *MockQTransactions) NewTransactionBatchInsertBuilder() TransactionBatchInsertBuilder {
a := m.Called()
return a.Get(0).(TransactionBatchInsertBuilder)
}

func (m *MockQTransactions) NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
a := m.Called(maxBatchSize)
func (m *MockQTransactions) NewTransactionFilteredTmpBatchInsertBuilder() TransactionBatchInsertBuilder {
a := m.Called()
return a.Get(0).(TransactionBatchInsertBuilder)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
"github.com/stretchr/testify/mock"

"github.com/stellar/go/ingest"
"github.com/stellar/go/support/db"
)

type MockTransactionsBatchInsertBuilder struct {
mock.Mock
}

func (m *MockTransactionsBatchInsertBuilder) Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error {
a := m.Called(ctx, transaction, sequence)
func (m *MockTransactionsBatchInsertBuilder) Add(transaction ingest.LedgerTransaction, sequence uint32) error {
a := m.Called(transaction, sequence)
return a.Error(0)
}

func (m *MockTransactionsBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
func (m *MockTransactionsBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
a := m.Called(ctx, session)
return a.Error(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ func TestAddOperation(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

txBatch := q.NewTransactionBatchInsertBuilder(0)
tt.Assert.NoError(q.Begin())

txBatch := q.NewTransactionBatchInsertBuilder()

builder := q.NewOperationBatchInsertBuilder(1)

Expand All @@ -35,8 +37,8 @@ func TestAddOperation(t *testing.T) {
)

sequence := int32(56)
tt.Assert.NoError(txBatch.Add(tt.Ctx, transaction, uint32(sequence)))
tt.Assert.NoError(txBatch.Exec(tt.Ctx))
tt.Assert.NoError(txBatch.Add(transaction, uint32(sequence)))
tt.Assert.NoError(txBatch.Exec(tt.Ctx, q))

details, err := json.Marshal(map[string]string{
"to": "GANFZDRBCNTUXIODCJEYMACPMCSZEVE4WZGZ3CZDZ3P2SXK4KH75IK6Y",
Expand All @@ -62,6 +64,8 @@ func TestAddOperation(t *testing.T) {
err = builder.Exec(tt.Ctx)
tt.Assert.NoError(err)

tt.Assert.NoError(q.Commit())

ops := []Operation{}
err = q.Select(tt.Ctx, &ops, selectOperation)

Expand Down
10 changes: 7 additions & 3 deletions services/horizon/internal/db2/history/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ func TestOperationByLiquidityPool(t *testing.T) {
opID1 := toid.New(sequence, txIndex, 1).ToInt64()
opID2 := toid.New(sequence, txIndex, 2).ToInt64()

tt.Assert.NoError(q.Begin())

// Insert a phony transaction
transactionBuilder := q.NewTransactionBatchInsertBuilder(2)
transactionBuilder := q.NewTransactionBatchInsertBuilder()
firstTransaction := buildLedgerTransaction(tt.T, testTransaction{
index: uint32(txIndex),
envelopeXDR: "AAAAACiSTRmpH6bHC6Ekna5e82oiGY5vKDEEUgkq9CB//t+rAAAAyAEXUhsAADDRAAAAAAAAAAAAAAABAAAAAAAAAAsBF1IbAABX4QAAAAAAAAAA",
Expand All @@ -87,9 +89,9 @@ func TestOperationByLiquidityPool(t *testing.T) {
metaXDR: "AAAAAQAAAAAAAAAA",
hash: "19aaa18db88605aedec04659fb45e06f240b022eb2d429e05133e4d53cd945ba",
})
err := transactionBuilder.Add(tt.Ctx, firstTransaction, uint32(sequence))
err := transactionBuilder.Add(firstTransaction, uint32(sequence))
tt.Assert.NoError(err)
err = transactionBuilder.Exec(tt.Ctx)
err = transactionBuilder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)

// Insert a two phony operations
Expand Down Expand Up @@ -137,6 +139,8 @@ func TestOperationByLiquidityPool(t *testing.T) {
err = lpOperationBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)

tt.Assert.NoError(q.Commit())

// Check ascending order
pq := db2.PageQuery{
Cursor: "",
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ func (q *TransactionsQ) Select(ctx context.Context, dest interface{}) error {

// QTransactions defines transaction related queries.
type QTransactions interface {
NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder
NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder
NewTransactionBatchInsertBuilder() TransactionBatchInsertBuilder
NewTransactionFilteredTmpBatchInsertBuilder() TransactionBatchInsertBuilder
}

func selectTransaction(table string) sq.SelectBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,47 @@ import (
// TransactionBatchInsertBuilder is used to insert transactions into the
// history_transactions table
type TransactionBatchInsertBuilder interface {
Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error
Exec(ctx context.Context) error
Add(transaction ingest.LedgerTransaction, sequence uint32) error
Exec(ctx context.Context, session db.SessionInterface) error
}

// transactionBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type transactionBatchInsertBuilder struct {
encodingBuffer *xdr.EncodingBuffer
builder db.BatchInsertBuilder
table string
builder db.FastBatchInsertBuilder
}

// NewTransactionBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance
func (q *Q) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
func (q *Q) NewTransactionBatchInsertBuilder() TransactionBatchInsertBuilder {
return &transactionBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_transactions"),
MaxBatchSize: maxBatchSize,
},
table: "history_transactions",
builder: db.FastBatchInsertBuilder{},
}
}

// NewTransactionBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance
func (q *Q) NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder {
// NewTransactionFilteredTmpBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance
func (q *Q) NewTransactionFilteredTmpBatchInsertBuilder() TransactionBatchInsertBuilder {
return &transactionBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_transactions_filtered_tmp"),
MaxBatchSize: maxBatchSize,
},
table: "history_transactions_filtered_tmp",
builder: db.FastBatchInsertBuilder{},
}
}

// Add adds a new transaction to the batch
func (i *transactionBatchInsertBuilder) Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error {
func (i *transactionBatchInsertBuilder) Add(transaction ingest.LedgerTransaction, sequence uint32) error {
row, err := transactionToRow(transaction, sequence, i.encodingBuffer)
if err != nil {
return err
}

return i.builder.RowStruct(ctx, row)
return i.builder.RowStruct(row)
}

func (i *transactionBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
func (i *transactionBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
return i.builder.Exec(ctx, session, i.table)
}

func signatures(xdrSignatures []xdr.DecoratedSignature) pq.StringArray {
Expand Down
41 changes: 23 additions & 18 deletions services/horizon/internal/db2/history/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ func TestTransactionByLiquidityPool(t *testing.T) {
},
}, 0, 0, 0, 0, 0)
tt.Assert.NoError(err)

tt.Assert.NoError(q.Begin())

tt.Assert.NoError(ledgerBatch.Exec(tt.Ctx, q.SessionInterface))
tt.Assert.NoError(q.Commit())

// Insert a phony transaction
transactionBuilder := q.NewTransactionBatchInsertBuilder(2)
transactionBuilder := q.NewTransactionBatchInsertBuilder()
firstTransaction := buildLedgerTransaction(tt.T, testTransaction{
index: uint32(txIndex),
envelopeXDR: "AAAAACiSTRmpH6bHC6Ekna5e82oiGY5vKDEEUgkq9CB//t+rAAAAyAEXUhsAADDRAAAAAAAAAAAAAAABAAAAAAAAAAsBF1IbAABX4QAAAAAAAAAA",
Expand All @@ -69,9 +70,9 @@ func TestTransactionByLiquidityPool(t *testing.T) {
metaXDR: "AAAAAQAAAAAAAAAA",
hash: "19aaa18db88605aedec04659fb45e06f240b022eb2d429e05133e4d53cd945ba",
})
err = transactionBuilder.Add(tt.Ctx, firstTransaction, uint32(sequence))
err = transactionBuilder.Add(firstTransaction, uint32(sequence))
tt.Assert.NoError(err)
err = transactionBuilder.Exec(tt.Ctx)
err = transactionBuilder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)

// Insert Liquidity Pool history
Expand All @@ -87,6 +88,8 @@ func TestTransactionByLiquidityPool(t *testing.T) {
err = lpTransactionBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)

tt.Assert.NoError(q.Commit())

var records []Transaction
err = q.Transactions().ForLiquidityPool(tt.Ctx, liquidityPoolID).Select(tt.Ctx, &records)
tt.Assert.NoError(err)
Expand Down Expand Up @@ -210,8 +213,10 @@ func TestInsertTransactionDoesNotAllowDuplicateIndex(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

tt.Assert.NoError(q.Begin())

sequence := uint32(123)
insertBuilder := q.NewTransactionBatchInsertBuilder(0)
insertBuilder := q.NewTransactionBatchInsertBuilder()

firstTransaction := buildLedgerTransaction(tt.T, testTransaction{
index: 1,
Expand All @@ -230,16 +235,15 @@ func TestInsertTransactionDoesNotAllowDuplicateIndex(t *testing.T) {
hash: "7e2def20d5a21a56be2a457b648f702ee1af889d3df65790e92a05081e9fabf1",
})

tt.Assert.NoError(insertBuilder.Add(tt.Ctx, firstTransaction, sequence))
tt.Assert.NoError(insertBuilder.Exec(tt.Ctx))
tt.Assert.NoError(insertBuilder.Add(firstTransaction, sequence))
tt.Assert.NoError(insertBuilder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

tt.Assert.NoError(insertBuilder.Add(tt.Ctx, secondTransaction, sequence))
tt.Assert.EqualError(
insertBuilder.Exec(tt.Ctx),
"error adding values while inserting to history_transactions: "+
"exec failed: pq: duplicate key value violates unique constraint "+
"\"hs_transaction_by_id\"",
)
tt.Assert.NoError(q.Begin())
insertBuilder = q.NewTransactionBatchInsertBuilder()
tt.Assert.NoError(insertBuilder.Add(secondTransaction, sequence))
tt.Assert.Error(insertBuilder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Rollback())

ledger := Ledger{
Sequence: int32(sequence),
Expand Down Expand Up @@ -305,8 +309,6 @@ func TestInsertTransaction(t *testing.T) {
_, err := q.Exec(tt.Ctx, sq.Insert("history_ledgers").SetMap(ledgerToMap(ledger)))
tt.Assert.NoError(err)

insertBuilder := q.NewTransactionBatchInsertBuilder(0)

success := true

emptySignatures := []string{}
Expand Down Expand Up @@ -826,8 +828,11 @@ func TestInsertTransaction(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
tt.Assert.NoError(insertBuilder.Add(tt.Ctx, testCase.toInsert, sequence))
tt.Assert.NoError(insertBuilder.Exec(tt.Ctx))
insertBuilder := q.NewTransactionBatchInsertBuilder()
tt.Assert.NoError(q.Begin())
tt.Assert.NoError(insertBuilder.Add(testCase.toInsert, sequence))
tt.Assert.NoError(insertBuilder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())

var transactions []Transaction
tt.Assert.NoError(q.Transactions().IncludeFailed().Select(tt.Ctx, &transactions))
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *ProcessorRunner) buildTransactionProcessor(
processors.NewOperationProcessor(s.historyQ, sequence),
tradeProcessor,
processors.NewParticipantsProcessor(s.historyQ, sequence),
processors.NewTransactionProcessor(s.historyQ, sequence),
processors.NewTransactionProcessor(s.session, s.historyQ, sequence),
processors.NewClaimableBalancesTransactionProcessor(s.historyQ, sequence),
processors.NewLiquidityPoolsTransactionProcessor(s.historyQ, sequence),
})
Expand All @@ -168,7 +168,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor(ledger xdr.LedgerHeaderHisto
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ, uint32(ledger.Header.LedgerSeq))
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.session, s.historyQ, uint32(ledger.Header.LedgerSeq))
p = append(p, txSubProc)
}

Expand Down
Loading

0 comments on commit 386b660

Please sign in to comment.