diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index 7263e7aeb2..0c279d737f 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -247,17 +247,19 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { hash: "edba3051b2f2d9b713e8a08709d631eccb72c59864ff3c564c68792271bb24a7", }) ctx := context.Background() - insertBuilder := q.NewTransactionBatchInsertBuilder(2) - prefilterInsertBuilder := q.NewTransactionFilteredTmpBatchInsertBuilder(2) + tt.Assert.NoError(q.Begin()) + + insertBuilder := q.NewTransactionBatchInsertBuilder() + prefilterInsertBuilder := q.NewTransactionFilteredTmpBatchInsertBuilder() // include both fee bump and normal transaction in the same batch // to make sure both kinds of transactions can be inserted using a single exec statement - tt.Assert.NoError(insertBuilder.Add(ctx, feeBumpTransaction, sequence)) - tt.Assert.NoError(insertBuilder.Add(ctx, normalTransaction, sequence)) - tt.Assert.NoError(insertBuilder.Exec(ctx)) + tt.Assert.NoError(insertBuilder.Add(feeBumpTransaction, sequence)) + tt.Assert.NoError(insertBuilder.Add(normalTransaction, sequence)) + tt.Assert.NoError(insertBuilder.Exec(ctx, q)) - tt.Assert.NoError(prefilterInsertBuilder.Add(ctx, feeBumpTransaction, sequence)) - tt.Assert.NoError(prefilterInsertBuilder.Add(ctx, normalTransaction, sequence)) - tt.Assert.NoError(prefilterInsertBuilder.Exec(ctx)) + tt.Assert.NoError(prefilterInsertBuilder.Add(feeBumpTransaction, sequence)) + tt.Assert.NoError(prefilterInsertBuilder.Add(normalTransaction, sequence)) + tt.Assert.NoError(prefilterInsertBuilder.Exec(ctx, q)) account := fixture.Envelope.SourceAccount().ToAccountId() feeBumpAccount := fixture.Envelope.FeeBumpAccount().ToAccountId() @@ -299,6 +301,8 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { tt.Assert.NoError(err) tt.Assert.NoError(effectBuilder.Exec(ctx)) + tt.Assert.NoError(q.Commit()) + fixture.Transaction = Transaction{ TransactionWithoutLedger: TransactionWithoutLedger{ TotalOrderID: TotalOrderID{528280981504}, diff --git a/services/horizon/internal/db2/history/mock_q_transactions.go b/services/horizon/internal/db2/history/mock_q_transactions.go index 3bf308128f..064d0e34c4 100644 --- a/services/horizon/internal/db2/history/mock_q_transactions.go +++ b/services/horizon/internal/db2/history/mock_q_transactions.go @@ -7,12 +7,12 @@ type MockQTransactions struct { mock.Mock } -func (m *MockQTransactions) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder { - a := m.Called(maxBatchSize) +func (m *MockQTransactions) NewTransactionBatchInsertBuilder() TransactionBatchInsertBuilder { + a := m.Called() return a.Get(0).(TransactionBatchInsertBuilder) } -func (m *MockQTransactions) NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder { - a := m.Called(maxBatchSize) +func (m *MockQTransactions) NewTransactionFilteredTmpBatchInsertBuilder() TransactionBatchInsertBuilder { + a := m.Called() return a.Get(0).(TransactionBatchInsertBuilder) } diff --git a/services/horizon/internal/db2/history/mock_transactions_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_transactions_batch_insert_builder.go index 8e2608d553..db16097a03 100644 --- a/services/horizon/internal/db2/history/mock_transactions_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/mock_transactions_batch_insert_builder.go @@ -6,18 +6,19 @@ import ( "github.com/stretchr/testify/mock" "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" ) type MockTransactionsBatchInsertBuilder struct { mock.Mock } -func (m *MockTransactionsBatchInsertBuilder) Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error { - a := m.Called(ctx, transaction, sequence) +func (m *MockTransactionsBatchInsertBuilder) Add(transaction ingest.LedgerTransaction, sequence uint32) error { + a := m.Called(transaction, sequence) return a.Error(0) } -func (m *MockTransactionsBatchInsertBuilder) Exec(ctx context.Context) error { - a := m.Called(ctx) +func (m *MockTransactionsBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + a := m.Called(ctx, session) return a.Error(0) } diff --git a/services/horizon/internal/db2/history/operation_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_batch_insert_builder_test.go index 18b4913a0a..4ad624745c 100644 --- a/services/horizon/internal/db2/history/operation_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_batch_insert_builder_test.go @@ -16,7 +16,9 @@ func TestAddOperation(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - txBatch := q.NewTransactionBatchInsertBuilder(0) + tt.Assert.NoError(q.Begin()) + + txBatch := q.NewTransactionBatchInsertBuilder() builder := q.NewOperationBatchInsertBuilder(1) @@ -35,8 +37,8 @@ func TestAddOperation(t *testing.T) { ) sequence := int32(56) - tt.Assert.NoError(txBatch.Add(tt.Ctx, transaction, uint32(sequence))) - tt.Assert.NoError(txBatch.Exec(tt.Ctx)) + tt.Assert.NoError(txBatch.Add(transaction, uint32(sequence))) + tt.Assert.NoError(txBatch.Exec(tt.Ctx, q)) details, err := json.Marshal(map[string]string{ "to": "GANFZDRBCNTUXIODCJEYMACPMCSZEVE4WZGZ3CZDZ3P2SXK4KH75IK6Y", @@ -62,6 +64,8 @@ func TestAddOperation(t *testing.T) { err = builder.Exec(tt.Ctx) tt.Assert.NoError(err) + tt.Assert.NoError(q.Commit()) + ops := []Operation{} err = q.Select(tt.Ctx, &ops, selectOperation) diff --git a/services/horizon/internal/db2/history/operation_test.go b/services/horizon/internal/db2/history/operation_test.go index 6fadfde67a..8c28809568 100644 --- a/services/horizon/internal/db2/history/operation_test.go +++ b/services/horizon/internal/db2/history/operation_test.go @@ -77,8 +77,10 @@ func TestOperationByLiquidityPool(t *testing.T) { opID1 := toid.New(sequence, txIndex, 1).ToInt64() opID2 := toid.New(sequence, txIndex, 2).ToInt64() + tt.Assert.NoError(q.Begin()) + // Insert a phony transaction - transactionBuilder := q.NewTransactionBatchInsertBuilder(2) + transactionBuilder := q.NewTransactionBatchInsertBuilder() firstTransaction := buildLedgerTransaction(tt.T, testTransaction{ index: uint32(txIndex), envelopeXDR: "AAAAACiSTRmpH6bHC6Ekna5e82oiGY5vKDEEUgkq9CB//t+rAAAAyAEXUhsAADDRAAAAAAAAAAAAAAABAAAAAAAAAAsBF1IbAABX4QAAAAAAAAAA", @@ -87,9 +89,9 @@ func TestOperationByLiquidityPool(t *testing.T) { metaXDR: "AAAAAQAAAAAAAAAA", hash: "19aaa18db88605aedec04659fb45e06f240b022eb2d429e05133e4d53cd945ba", }) - err := transactionBuilder.Add(tt.Ctx, firstTransaction, uint32(sequence)) + err := transactionBuilder.Add(firstTransaction, uint32(sequence)) tt.Assert.NoError(err) - err = transactionBuilder.Exec(tt.Ctx) + err = transactionBuilder.Exec(tt.Ctx, q) tt.Assert.NoError(err) // Insert a two phony operations @@ -137,6 +139,8 @@ func TestOperationByLiquidityPool(t *testing.T) { err = lpOperationBuilder.Exec(tt.Ctx) tt.Assert.NoError(err) + tt.Assert.NoError(q.Commit()) + // Check ascending order pq := db2.PageQuery{ Cursor: "", diff --git a/services/horizon/internal/db2/history/transaction.go b/services/horizon/internal/db2/history/transaction.go index 0771a626b8..a308ab9ddc 100644 --- a/services/horizon/internal/db2/history/transaction.go +++ b/services/horizon/internal/db2/history/transaction.go @@ -232,8 +232,8 @@ func (q *TransactionsQ) Select(ctx context.Context, dest interface{}) error { // QTransactions defines transaction related queries. type QTransactions interface { - NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder - NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder + NewTransactionBatchInsertBuilder() TransactionBatchInsertBuilder + NewTransactionFilteredTmpBatchInsertBuilder() TransactionBatchInsertBuilder } func selectTransaction(table string) sq.SelectBuilder { diff --git a/services/horizon/internal/db2/history/transaction_batch_insert_builder.go b/services/horizon/internal/db2/history/transaction_batch_insert_builder.go index 2ecb25dbe7..742621cec5 100644 --- a/services/horizon/internal/db2/history/transaction_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/transaction_batch_insert_builder.go @@ -21,50 +21,47 @@ import ( // TransactionBatchInsertBuilder is used to insert transactions into the // history_transactions table type TransactionBatchInsertBuilder interface { - Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error - Exec(ctx context.Context) error + Add(transaction ingest.LedgerTransaction, sequence uint32) error + Exec(ctx context.Context, session db.SessionInterface) error } // transactionBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder type transactionBatchInsertBuilder struct { encodingBuffer *xdr.EncodingBuffer - builder db.BatchInsertBuilder + table string + builder db.FastBatchInsertBuilder } // NewTransactionBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance -func (q *Q) NewTransactionBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder { +func (q *Q) NewTransactionBatchInsertBuilder() TransactionBatchInsertBuilder { return &transactionBatchInsertBuilder{ encodingBuffer: xdr.NewEncodingBuffer(), - builder: db.BatchInsertBuilder{ - Table: q.GetTable("history_transactions"), - MaxBatchSize: maxBatchSize, - }, + table: "history_transactions", + builder: db.FastBatchInsertBuilder{}, } } -// NewTransactionBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance -func (q *Q) NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize int) TransactionBatchInsertBuilder { +// NewTransactionFilteredTmpBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance +func (q *Q) NewTransactionFilteredTmpBatchInsertBuilder() TransactionBatchInsertBuilder { return &transactionBatchInsertBuilder{ encodingBuffer: xdr.NewEncodingBuffer(), - builder: db.BatchInsertBuilder{ - Table: q.GetTable("history_transactions_filtered_tmp"), - MaxBatchSize: maxBatchSize, - }, + table: "history_transactions_filtered_tmp", + builder: db.FastBatchInsertBuilder{}, } } // Add adds a new transaction to the batch -func (i *transactionBatchInsertBuilder) Add(ctx context.Context, transaction ingest.LedgerTransaction, sequence uint32) error { +func (i *transactionBatchInsertBuilder) Add(transaction ingest.LedgerTransaction, sequence uint32) error { row, err := transactionToRow(transaction, sequence, i.encodingBuffer) if err != nil { return err } - return i.builder.RowStruct(ctx, row) + return i.builder.RowStruct(row) } -func (i *transactionBatchInsertBuilder) Exec(ctx context.Context) error { - return i.builder.Exec(ctx) +func (i *transactionBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + return i.builder.Exec(ctx, session, i.table) } func signatures(xdrSignatures []xdr.DecoratedSignature) pq.StringArray { diff --git a/services/horizon/internal/db2/history/transaction_test.go b/services/horizon/internal/db2/history/transaction_test.go index 576b93ffb9..692b720a44 100644 --- a/services/horizon/internal/db2/history/transaction_test.go +++ b/services/horizon/internal/db2/history/transaction_test.go @@ -55,12 +55,13 @@ func TestTransactionByLiquidityPool(t *testing.T) { }, }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) + tt.Assert.NoError(q.Begin()) + tt.Assert.NoError(ledgerBatch.Exec(tt.Ctx, q.SessionInterface)) - tt.Assert.NoError(q.Commit()) // Insert a phony transaction - transactionBuilder := q.NewTransactionBatchInsertBuilder(2) + transactionBuilder := q.NewTransactionBatchInsertBuilder() firstTransaction := buildLedgerTransaction(tt.T, testTransaction{ index: uint32(txIndex), envelopeXDR: "AAAAACiSTRmpH6bHC6Ekna5e82oiGY5vKDEEUgkq9CB//t+rAAAAyAEXUhsAADDRAAAAAAAAAAAAAAABAAAAAAAAAAsBF1IbAABX4QAAAAAAAAAA", @@ -69,9 +70,9 @@ func TestTransactionByLiquidityPool(t *testing.T) { metaXDR: "AAAAAQAAAAAAAAAA", hash: "19aaa18db88605aedec04659fb45e06f240b022eb2d429e05133e4d53cd945ba", }) - err = transactionBuilder.Add(tt.Ctx, firstTransaction, uint32(sequence)) + err = transactionBuilder.Add(firstTransaction, uint32(sequence)) tt.Assert.NoError(err) - err = transactionBuilder.Exec(tt.Ctx) + err = transactionBuilder.Exec(tt.Ctx, q) tt.Assert.NoError(err) // Insert Liquidity Pool history @@ -87,6 +88,8 @@ func TestTransactionByLiquidityPool(t *testing.T) { err = lpTransactionBuilder.Exec(tt.Ctx) tt.Assert.NoError(err) + tt.Assert.NoError(q.Commit()) + var records []Transaction err = q.Transactions().ForLiquidityPool(tt.Ctx, liquidityPoolID).Select(tt.Ctx, &records) tt.Assert.NoError(err) @@ -210,8 +213,10 @@ func TestInsertTransactionDoesNotAllowDuplicateIndex(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} + tt.Assert.NoError(q.Begin()) + sequence := uint32(123) - insertBuilder := q.NewTransactionBatchInsertBuilder(0) + insertBuilder := q.NewTransactionBatchInsertBuilder() firstTransaction := buildLedgerTransaction(tt.T, testTransaction{ index: 1, @@ -230,16 +235,15 @@ func TestInsertTransactionDoesNotAllowDuplicateIndex(t *testing.T) { hash: "7e2def20d5a21a56be2a457b648f702ee1af889d3df65790e92a05081e9fabf1", }) - tt.Assert.NoError(insertBuilder.Add(tt.Ctx, firstTransaction, sequence)) - tt.Assert.NoError(insertBuilder.Exec(tt.Ctx)) + tt.Assert.NoError(insertBuilder.Add(firstTransaction, sequence)) + tt.Assert.NoError(insertBuilder.Exec(tt.Ctx, q)) + tt.Assert.NoError(q.Commit()) - tt.Assert.NoError(insertBuilder.Add(tt.Ctx, secondTransaction, sequence)) - tt.Assert.EqualError( - insertBuilder.Exec(tt.Ctx), - "error adding values while inserting to history_transactions: "+ - "exec failed: pq: duplicate key value violates unique constraint "+ - "\"hs_transaction_by_id\"", - ) + tt.Assert.NoError(q.Begin()) + insertBuilder = q.NewTransactionBatchInsertBuilder() + tt.Assert.NoError(insertBuilder.Add(secondTransaction, sequence)) + tt.Assert.Error(insertBuilder.Exec(tt.Ctx, q)) + tt.Assert.NoError(q.Rollback()) ledger := Ledger{ Sequence: int32(sequence), @@ -305,8 +309,6 @@ func TestInsertTransaction(t *testing.T) { _, err := q.Exec(tt.Ctx, sq.Insert("history_ledgers").SetMap(ledgerToMap(ledger))) tt.Assert.NoError(err) - insertBuilder := q.NewTransactionBatchInsertBuilder(0) - success := true emptySignatures := []string{} @@ -826,8 +828,11 @@ func TestInsertTransaction(t *testing.T) { }, } { t.Run(testCase.name, func(t *testing.T) { - tt.Assert.NoError(insertBuilder.Add(tt.Ctx, testCase.toInsert, sequence)) - tt.Assert.NoError(insertBuilder.Exec(tt.Ctx)) + insertBuilder := q.NewTransactionBatchInsertBuilder() + tt.Assert.NoError(q.Begin()) + tt.Assert.NoError(insertBuilder.Add(testCase.toInsert, sequence)) + tt.Assert.NoError(insertBuilder.Exec(tt.Ctx, q)) + tt.Assert.NoError(q.Commit()) var transactions []Transaction tt.Assert.NoError(q.Transactions().IncludeFailed().Select(tt.Ctx, &transactions)) diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index c6fcd75f2c..4dc3d1eda9 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -149,7 +149,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( processors.NewOperationProcessor(s.historyQ, sequence), tradeProcessor, processors.NewParticipantsProcessor(s.historyQ, sequence), - processors.NewTransactionProcessor(s.historyQ, sequence), + processors.NewTransactionProcessor(s.session, s.historyQ, sequence), processors.NewClaimableBalancesTransactionProcessor(s.historyQ, sequence), processors.NewLiquidityPoolsTransactionProcessor(s.historyQ, sequence), }) @@ -168,7 +168,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor(ledger xdr.LedgerHeaderHisto // when in online mode, the submission result processor must always run (regardless of filtering) var p []horizonTransactionProcessor if s.config.EnableIngestionFiltering { - txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ, uint32(ledger.Header.LedgerSeq)) + txSubProc := processors.NewTransactionFilteredTmpProcessor(s.session, s.historyQ, uint32(ledger.Header.LedgerSeq)) p = append(p, txSubProc) } diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 14c1daf0df..dbbe3a5191 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -241,7 +241,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize). Return(&history.MockOperationsBatchInsertBuilder{}).Twice() // Twice = with/without failed - q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). Return(&history.MockTransactionsBatchInsertBuilder{}).Twice() q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Twice() @@ -277,6 +277,7 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { } q := &mockDBQ{} + mockSession := &db.MockSession{} defer mock.AssertExpectationsForObjects(t, q) ledger := xdr.LedgerCloseMeta{ @@ -303,12 +304,12 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx).Return(nil).Twice() + mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Twice() - q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). Return(mockTransactionsBatchInsertBuilder) - q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder", maxBatchSize). + q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). Return(mockTransactionsBatchInsertBuilder) q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). @@ -322,7 +323,6 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { mockBatchInsertBuilder.On( "Add", ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil) - mockSession := &db.MockSession{} mockBatchInsertBuilder.On( "Exec", ctx, @@ -349,6 +349,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { NetworkPassphrase: network.PublicNetworkPassphrase, } + mockSession := &db.MockSession{} q := &mockDBQ{} defer mock.AssertExpectationsForObjects(t, q) @@ -376,8 +377,8 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). + mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). Return(mockTransactionsBatchInsertBuilder).Twice() q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). @@ -388,7 +389,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { mockBatchInsertBuilder.On( "Add", ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil) - mockSession := &db.MockSession{} mockBatchInsertBuilder.On( "Exec", ctx, diff --git a/services/horizon/internal/ingest/processors/transactions_processor.go b/services/horizon/internal/ingest/processors/transactions_processor.go index e2a880f296..0e6603f804 100644 --- a/services/horizon/internal/ingest/processors/transactions_processor.go +++ b/services/horizon/internal/ingest/processors/transactions_processor.go @@ -5,33 +5,37 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" ) type TransactionProcessor struct { + session db.SessionInterface transactionsQ history.QTransactions sequence uint32 batch history.TransactionBatchInsertBuilder } -func NewTransactionFilteredTmpProcessor(transactionsQ history.QTransactions, sequence uint32) *TransactionProcessor { +func NewTransactionFilteredTmpProcessor(session db.SessionInterface, transactionsQ history.QTransactions, sequence uint32) *TransactionProcessor { return &TransactionProcessor{ + session: session, transactionsQ: transactionsQ, sequence: sequence, - batch: transactionsQ.NewTransactionFilteredTmpBatchInsertBuilder(maxBatchSize), + batch: transactionsQ.NewTransactionFilteredTmpBatchInsertBuilder(), } } -func NewTransactionProcessor(transactionsQ history.QTransactions, sequence uint32) *TransactionProcessor { +func NewTransactionProcessor(session db.SessionInterface, transactionsQ history.QTransactions, sequence uint32) *TransactionProcessor { return &TransactionProcessor{ + session: session, transactionsQ: transactionsQ, sequence: sequence, - batch: transactionsQ.NewTransactionBatchInsertBuilder(maxBatchSize), + batch: transactionsQ.NewTransactionBatchInsertBuilder(), } } func (p *TransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error { - if err := p.batch.Add(ctx, transaction, p.sequence); err != nil { + if err := p.batch.Add(transaction, p.sequence); err != nil { return errors.Wrap(err, "Error batch inserting transaction rows") } @@ -39,7 +43,7 @@ func (p *TransactionProcessor) ProcessTransaction(ctx context.Context, transacti } func (p *TransactionProcessor) Commit(ctx context.Context) error { - if err := p.batch.Exec(ctx); err != nil { + if err := p.batch.Exec(ctx, p.session); err != nil { return errors.Wrap(err, "Error flushing transaction batch") } diff --git a/services/horizon/internal/ingest/processors/transactions_processor_test.go b/services/horizon/internal/ingest/processors/transactions_processor_test.go index ec1cf105e5..dcaf307729 100644 --- a/services/horizon/internal/ingest/processors/transactions_processor_test.go +++ b/services/horizon/internal/ingest/processors/transactions_processor_test.go @@ -7,7 +7,9 @@ import ( "testing" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" + "github.com/stretchr/testify/suite" ) @@ -15,6 +17,7 @@ type TransactionsProcessorTestSuiteLedger struct { suite.Suite ctx context.Context processor *TransactionProcessor + mockSession *db.MockSession mockQ *history.MockQTransactions mockBatchInsertBuilder *history.MockTransactionsBatchInsertBuilder } @@ -29,10 +32,10 @@ func (s *TransactionsProcessorTestSuiteLedger) SetupTest() { s.mockBatchInsertBuilder = &history.MockTransactionsBatchInsertBuilder{} s.mockQ. - On("NewTransactionBatchInsertBuilder", maxBatchSize). + On("NewTransactionBatchInsertBuilder"). Return(s.mockBatchInsertBuilder).Once() - s.processor = NewTransactionProcessor(s.mockQ, 20) + s.processor = NewTransactionProcessor(s.mockSession, s.mockQ, 20) } func (s *TransactionsProcessorTestSuiteLedger) TearDownTest() { @@ -47,10 +50,10 @@ func (s *TransactionsProcessorTestSuiteLedger) TestAddTransactionsSucceeds() { secondTx := createTransaction(false, 3) thirdTx := createTransaction(true, 4) - s.mockBatchInsertBuilder.On("Add", s.ctx, firstTx, sequence).Return(nil).Once() - s.mockBatchInsertBuilder.On("Add", s.ctx, secondTx, sequence).Return(nil).Once() - s.mockBatchInsertBuilder.On("Add", s.ctx, thirdTx, sequence).Return(nil).Once() - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.mockBatchInsertBuilder.On("Add", firstTx, sequence).Return(nil).Once() + s.mockBatchInsertBuilder.On("Add", secondTx, sequence).Return(nil).Once() + s.mockBatchInsertBuilder.On("Add", thirdTx, sequence).Return(nil).Once() + s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) err := s.processor.ProcessTransaction(s.ctx, firstTx) @@ -66,7 +69,7 @@ func (s *TransactionsProcessorTestSuiteLedger) TestAddTransactionsSucceeds() { func (s *TransactionsProcessorTestSuiteLedger) TestAddTransactionsFails() { sequence := uint32(20) firstTx := createTransaction(true, 1) - s.mockBatchInsertBuilder.On("Add", s.ctx, firstTx, sequence). + s.mockBatchInsertBuilder.On("Add", firstTx, sequence). Return(errors.New("transient error")).Once() err := s.processor.ProcessTransaction(s.ctx, firstTx) @@ -78,8 +81,8 @@ func (s *TransactionsProcessorTestSuiteLedger) TestExecFails() { sequence := uint32(20) firstTx := createTransaction(true, 1) - s.mockBatchInsertBuilder.On("Add", s.ctx, firstTx, sequence).Return(nil).Once() - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(errors.New("transient error")).Once() + s.mockBatchInsertBuilder.On("Add", firstTx, sequence).Return(nil).Once() + s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(errors.New("transient error")).Once() err := s.processor.ProcessTransaction(s.ctx, firstTx) s.Assert().NoError(err)