diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 795ff54318..2de770423a 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -57,7 +57,7 @@ jobs: matrix: os: [ubuntu-20.04] go: [1.19.6, 1.20.1] - pg: [9.6.5, 10] + pg: [10] runs-on: ${{ matrix.os }} services: postgres: diff --git a/.github/workflows/horizon.yml b/.github/workflows/horizon.yml index 68fa123de3..933252ce0c 100644 --- a/.github/workflows/horizon.yml +++ b/.github/workflows/horizon.yml @@ -13,7 +13,7 @@ jobs: matrix: os: [ubuntu-20.04] go: [1.19.6, 1.20.1] - pg: [9.6.5] + pg: [10] ingestion-backend: [db, captive-core, captive-core-remote-storage] protocol-version: [19] runs-on: ${{ matrix.os }} diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index 0c279d737f..46cc423227 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -264,14 +264,13 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { account := fixture.Envelope.SourceAccount().ToAccountId() feeBumpAccount := fixture.Envelope.FeeBumpAccount().ToAccountId() - opBuilder := q.NewOperationBatchInsertBuilder(1) + opBuilder := q.NewOperationBatchInsertBuilder() details, err := json.Marshal(map[string]string{ "bump_to": "98", }) tt.Assert.NoError(err) tt.Assert.NoError(opBuilder.Add( - ctx, toid.New(fixture.Ledger.Sequence, 1, 1).ToInt64(), toid.New(fixture.Ledger.Sequence, 1, 0).ToInt64(), 1, @@ -280,7 +279,7 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { account.Address(), null.String{}, )) - tt.Assert.NoError(opBuilder.Exec(ctx)) + tt.Assert.NoError(opBuilder.Exec(ctx, q)) effectBuilder := q.NewEffectBatchInsertBuilder(2) details, err = json.Marshal(map[string]interface{}{"new_seq": 98}) diff --git a/services/horizon/internal/db2/history/mock_operations_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_operations_batch_insert_builder.go index e57eb93db9..34cda0d3f8 100644 --- a/services/horizon/internal/db2/history/mock_operations_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/mock_operations_batch_insert_builder.go @@ -4,6 +4,7 @@ import ( "context" "github.com/guregu/null" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" "github.com/stretchr/testify/mock" ) @@ -14,7 +15,7 @@ type MockOperationsBatchInsertBuilder struct { } // Add mock -func (m *MockOperationsBatchInsertBuilder) Add(ctx context.Context, +func (m *MockOperationsBatchInsertBuilder) Add( id int64, transactionID int64, applicationOrder uint32, @@ -23,7 +24,7 @@ func (m *MockOperationsBatchInsertBuilder) Add(ctx context.Context, sourceAccount string, sourceAccountMuxed null.String, ) error { - a := m.Called(ctx, + a := m.Called( id, transactionID, applicationOrder, @@ -36,7 +37,7 @@ func (m *MockOperationsBatchInsertBuilder) Add(ctx context.Context, } // Exec mock -func (m *MockOperationsBatchInsertBuilder) Exec(ctx context.Context) error { - a := m.Called(ctx) +func (m *MockOperationsBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + a := m.Called(ctx, session) return a.Error(0) } diff --git a/services/horizon/internal/db2/history/mock_q_operations.go b/services/horizon/internal/db2/history/mock_q_operations.go index 08a97c6da9..6c2741ad3e 100644 --- a/services/horizon/internal/db2/history/mock_q_operations.go +++ b/services/horizon/internal/db2/history/mock_q_operations.go @@ -8,7 +8,7 @@ type MockQOperations struct { } // NewOperationBatchInsertBuilder mock -func (m *MockQOperations) NewOperationBatchInsertBuilder(maxBatchSize int) OperationBatchInsertBuilder { - a := m.Called(maxBatchSize) +func (m *MockQOperations) NewOperationBatchInsertBuilder() OperationBatchInsertBuilder { + a := m.Called() return a.Get(0).(OperationBatchInsertBuilder) } diff --git a/services/horizon/internal/db2/history/operation.go b/services/horizon/internal/db2/history/operation.go index af1741b0d0..c1b5d71bb9 100644 --- a/services/horizon/internal/db2/history/operation.go +++ b/services/horizon/internal/db2/history/operation.go @@ -379,7 +379,7 @@ func validateTransactionForOperation(transaction Transaction, operation Operatio // QOperations defines history_operation related queries. type QOperations interface { - NewOperationBatchInsertBuilder(maxBatchSize int) OperationBatchInsertBuilder + NewOperationBatchInsertBuilder() OperationBatchInsertBuilder } var selectOperation = sq.Select( diff --git a/services/horizon/internal/db2/history/operation_batch_insert_builder.go b/services/horizon/internal/db2/history/operation_batch_insert_builder.go index a3baee8863..9922ee7d7c 100644 --- a/services/horizon/internal/db2/history/operation_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/operation_batch_insert_builder.go @@ -12,7 +12,6 @@ import ( // history_operations table type OperationBatchInsertBuilder interface { Add( - ctx context.Context, id int64, transactionID int64, applicationOrder uint32, @@ -21,27 +20,25 @@ type OperationBatchInsertBuilder interface { sourceAccount string, sourceAcccountMuxed null.String, ) error - Exec(ctx context.Context) error + Exec(ctx context.Context, session db.SessionInterface) error } // operationBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder type operationBatchInsertBuilder struct { - builder db.BatchInsertBuilder + builder db.FastBatchInsertBuilder + table string } // NewOperationBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance -func (q *Q) NewOperationBatchInsertBuilder(maxBatchSize int) OperationBatchInsertBuilder { +func (q *Q) NewOperationBatchInsertBuilder() OperationBatchInsertBuilder { return &operationBatchInsertBuilder{ - builder: db.BatchInsertBuilder{ - Table: q.GetTable("history_operations"), - MaxBatchSize: maxBatchSize, - }, + table: "history_operations", + builder: db.FastBatchInsertBuilder{}, } } // Add adds a transaction's operations to the batch func (i *operationBatchInsertBuilder) Add( - ctx context.Context, id int64, transactionID int64, applicationOrder uint32, @@ -50,7 +47,7 @@ func (i *operationBatchInsertBuilder) Add( sourceAccount string, sourceAccountMuxed null.String, ) error { - return i.builder.Row(ctx, map[string]interface{}{ + return i.builder.Row(map[string]interface{}{ "id": id, "transaction_id": transactionID, "application_order": applicationOrder, @@ -62,6 +59,6 @@ func (i *operationBatchInsertBuilder) Add( } -func (i *operationBatchInsertBuilder) Exec(ctx context.Context) error { - return i.builder.Exec(ctx) +func (i *operationBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + return i.builder.Exec(ctx, session, i.table) } diff --git a/services/horizon/internal/db2/history/operation_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_batch_insert_builder_test.go index 4ad624745c..cf10c19d2b 100644 --- a/services/horizon/internal/db2/history/operation_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_batch_insert_builder_test.go @@ -20,7 +20,7 @@ func TestAddOperation(t *testing.T) { txBatch := q.NewTransactionBatchInsertBuilder() - builder := q.NewOperationBatchInsertBuilder(1) + builder := q.NewOperationBatchInsertBuilder() transactionHash := "2a805712c6d10f9e74bb0ccf54ae92a2b4b1e586451fe8133a2433816f6b567c" transactionResult := "AAAAAAAAAGQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAAA=" @@ -50,7 +50,7 @@ func TestAddOperation(t *testing.T) { sourceAccount := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" sourceAccountMuxed := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - err = builder.Add(tt.Ctx, + err = builder.Add( toid.New(sequence, 1, 1).ToInt64(), toid.New(sequence, 1, 0).ToInt64(), 1, @@ -61,7 +61,7 @@ func TestAddOperation(t *testing.T) { ) tt.Assert.NoError(err) - err = builder.Exec(tt.Ctx) + err = builder.Exec(tt.Ctx, q) tt.Assert.NoError(err) tt.Assert.NoError(q.Commit()) diff --git a/services/horizon/internal/db2/history/operation_test.go b/services/horizon/internal/db2/history/operation_test.go index 8c28809568..a010dda56f 100644 --- a/services/horizon/internal/db2/history/operation_test.go +++ b/services/horizon/internal/db2/history/operation_test.go @@ -95,9 +95,8 @@ func TestOperationByLiquidityPool(t *testing.T) { tt.Assert.NoError(err) // Insert a two phony operations - operationBuilder := q.NewOperationBatchInsertBuilder(2) + operationBuilder := q.NewOperationBatchInsertBuilder() err = operationBuilder.Add( - tt.Ctx, opID1, txID, 1, @@ -107,11 +106,8 @@ func TestOperationByLiquidityPool(t *testing.T) { null.String{}, ) tt.Assert.NoError(err) - err = operationBuilder.Exec(tt.Ctx) - tt.Assert.NoError(err) err = operationBuilder.Add( - tt.Ctx, opID2, txID, 1, @@ -121,7 +117,7 @@ func TestOperationByLiquidityPool(t *testing.T) { null.String{}, ) tt.Assert.NoError(err) - err = operationBuilder.Exec(tt.Ctx) + err = operationBuilder.Exec(tt.Ctx, q) tt.Assert.NoError(err) // Insert Liquidity Pool history diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 4dc3d1eda9..b64e9db245 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -146,7 +146,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( statsLedgerTransactionProcessor, processors.NewEffectProcessor(s.historyQ, sequence), processors.NewLedgerProcessor(s.session, s.historyQ, ledger, CurrentVersion), - processors.NewOperationProcessor(s.historyQ, sequence), + processors.NewOperationProcessor(s.session, s.historyQ, sequence), tradeProcessor, processors.NewParticipantsProcessor(s.historyQ, sequence), processors.NewTransactionProcessor(s.session, s.historyQ, sequence), diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index dbbe3a5191..c01ee53730 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -239,7 +239,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { q := &mockDBQ{} defer mock.AssertExpectationsForObjects(t, q) - q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize). + q.MockQOperations.On("NewOperationBatchInsertBuilder"). Return(&history.MockOperationsBatchInsertBuilder{}).Twice() // Twice = with/without failed q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). Return(&history.MockTransactionsBatchInsertBuilder{}).Twice() @@ -298,8 +298,8 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - mockOperationsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize). + mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQOperations.On("NewOperationBatchInsertBuilder"). Return(mockOperationsBatchInsertBuilder).Twice() mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} @@ -371,8 +371,8 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - mockOperationsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize). + mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQOperations.On("NewOperationBatchInsertBuilder"). Return(mockOperationsBatchInsertBuilder).Twice() mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} diff --git a/services/horizon/internal/ingest/processors/operations_processor.go b/services/horizon/internal/ingest/processors/operations_processor.go index a194d5472c..1d6972f91f 100644 --- a/services/horizon/internal/ingest/processors/operations_processor.go +++ b/services/horizon/internal/ingest/processors/operations_processor.go @@ -11,6 +11,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/protocols/horizon/base" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" @@ -18,17 +19,19 @@ import ( // OperationProcessor operations processor type OperationProcessor struct { + session db.SessionInterface operationsQ history.QOperations sequence uint32 batch history.OperationBatchInsertBuilder } -func NewOperationProcessor(operationsQ history.QOperations, sequence uint32) *OperationProcessor { +func NewOperationProcessor(session db.SessionInterface, operationsQ history.QOperations, sequence uint32) *OperationProcessor { return &OperationProcessor{ + session: session, operationsQ: operationsQ, sequence: sequence, - batch: operationsQ.NewOperationBatchInsertBuilder(maxBatchSize), + batch: operationsQ.NewOperationBatchInsertBuilder(), } } @@ -57,7 +60,7 @@ func (p *OperationProcessor) ProcessTransaction(ctx context.Context, transaction if source.Type == xdr.CryptoKeyTypeKeyTypeMuxedEd25519 { sourceAccountMuxed = null.StringFrom(source.Address()) } - if err := p.batch.Add(ctx, + if err := p.batch.Add( operation.ID(), operation.TransactionID(), operation.Order(), @@ -74,7 +77,7 @@ func (p *OperationProcessor) ProcessTransaction(ctx context.Context, transaction } func (p *OperationProcessor) Commit(ctx context.Context) error { - return p.batch.Exec(ctx) + return p.batch.Exec(ctx, p.session) } // transactionOperationWrapper represents the data for a single operation within a transaction diff --git a/services/horizon/internal/ingest/processors/operations_processor_test.go b/services/horizon/internal/ingest/processors/operations_processor_test.go index 79b94b1f7f..af5e585ddd 100644 --- a/services/horizon/internal/ingest/processors/operations_processor_test.go +++ b/services/horizon/internal/ingest/processors/operations_processor_test.go @@ -13,6 +13,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -21,6 +22,7 @@ type OperationsProcessorTestSuiteLedger struct { suite.Suite ctx context.Context processor *OperationProcessor + mockSession *db.MockSession mockQ *history.MockQOperations mockBatchInsertBuilder *history.MockOperationsBatchInsertBuilder } @@ -34,10 +36,11 @@ func (s *OperationsProcessorTestSuiteLedger) SetupTest() { s.mockQ = &history.MockQOperations{} s.mockBatchInsertBuilder = &history.MockOperationsBatchInsertBuilder{} s.mockQ. - On("NewOperationBatchInsertBuilder", maxBatchSize). + On("NewOperationBatchInsertBuilder"). Return(s.mockBatchInsertBuilder).Once() s.processor = NewOperationProcessor( + s.mockSession, s.mockQ, 56, ) @@ -74,7 +77,6 @@ func (s *OperationsProcessorTestSuiteLedger) mockBatchInsertAdds(txs []ingest.Le } s.mockBatchInsertBuilder.On( "Add", - s.ctx, expected.ID(), expected.TransactionID(), expected.Order(), @@ -122,7 +124,7 @@ func (s *OperationsProcessorTestSuiteLedger) TestAddOperationSucceeds() { err = s.mockBatchInsertAdds(txs, uint32(56)) s.Assert().NoError(err) - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) for _, tx := range txs { @@ -136,7 +138,7 @@ func (s *OperationsProcessorTestSuiteLedger) TestAddOperationFails() { s.mockBatchInsertBuilder. On( - "Add", s.ctx, + "Add", mock.Anything, mock.Anything, mock.Anything, @@ -152,7 +154,7 @@ func (s *OperationsProcessorTestSuiteLedger) TestAddOperationFails() { } func (s *OperationsProcessorTestSuiteLedger) TestExecFails() { - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(errors.New("transient error")).Once() + s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(errors.New("transient error")).Once() err := s.processor.Commit(s.ctx) s.Assert().Error(err) s.Assert().EqualError(err, "transient error")