Skip to content

Commit

Permalink
services/horizon/internal/db2/history: Use FastBatchInsertBuilder to …
Browse files Browse the repository at this point in the history
…insert operations into history_operations (#4952)
  • Loading branch information
tamirms authored Jul 19, 2023
1 parent af8161b commit e163964
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
matrix:
os: [ubuntu-20.04]
go: [1.19.6, 1.20.1]
pg: [9.6.5, 10]
pg: [10]
runs-on: ${{ matrix.os }}
services:
postgres:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
matrix:
os: [ubuntu-20.04]
go: [1.19.6, 1.20.1]
pg: [9.6.5]
pg: [10]
ingestion-backend: [db, captive-core, captive-core-remote-storage]
protocol-version: [19]
runs-on: ${{ matrix.os }}
Expand Down
5 changes: 2 additions & 3 deletions services/horizon/internal/db2/history/fee_bump_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,13 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
account := fixture.Envelope.SourceAccount().ToAccountId()
feeBumpAccount := fixture.Envelope.FeeBumpAccount().ToAccountId()

opBuilder := q.NewOperationBatchInsertBuilder(1)
opBuilder := q.NewOperationBatchInsertBuilder()
details, err := json.Marshal(map[string]string{
"bump_to": "98",
})
tt.Assert.NoError(err)

tt.Assert.NoError(opBuilder.Add(
ctx,
toid.New(fixture.Ledger.Sequence, 1, 1).ToInt64(),
toid.New(fixture.Ledger.Sequence, 1, 0).ToInt64(),
1,
Expand All @@ -280,7 +279,7 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
account.Address(),
null.String{},
))
tt.Assert.NoError(opBuilder.Exec(ctx))
tt.Assert.NoError(opBuilder.Exec(ctx, q))

effectBuilder := q.NewEffectBatchInsertBuilder(2)
details, err = json.Marshal(map[string]interface{}{"new_seq": 98})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/guregu/null"
"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/mock"
)
Expand All @@ -14,7 +15,7 @@ type MockOperationsBatchInsertBuilder struct {
}

// Add mock
func (m *MockOperationsBatchInsertBuilder) Add(ctx context.Context,
func (m *MockOperationsBatchInsertBuilder) Add(
id int64,
transactionID int64,
applicationOrder uint32,
Expand All @@ -23,7 +24,7 @@ func (m *MockOperationsBatchInsertBuilder) Add(ctx context.Context,
sourceAccount string,
sourceAccountMuxed null.String,
) error {
a := m.Called(ctx,
a := m.Called(
id,
transactionID,
applicationOrder,
Expand All @@ -36,7 +37,7 @@ func (m *MockOperationsBatchInsertBuilder) Add(ctx context.Context,
}

// Exec mock
func (m *MockOperationsBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
func (m *MockOperationsBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
a := m.Called(ctx, session)
return a.Error(0)
}
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/mock_q_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type MockQOperations struct {
}

// NewOperationBatchInsertBuilder mock
func (m *MockQOperations) NewOperationBatchInsertBuilder(maxBatchSize int) OperationBatchInsertBuilder {
a := m.Called(maxBatchSize)
func (m *MockQOperations) NewOperationBatchInsertBuilder() OperationBatchInsertBuilder {
a := m.Called()
return a.Get(0).(OperationBatchInsertBuilder)
}
2 changes: 1 addition & 1 deletion services/horizon/internal/db2/history/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func validateTransactionForOperation(transaction Transaction, operation Operatio

// QOperations defines history_operation related queries.
type QOperations interface {
NewOperationBatchInsertBuilder(maxBatchSize int) OperationBatchInsertBuilder
NewOperationBatchInsertBuilder() OperationBatchInsertBuilder
}

var selectOperation = sq.Select(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
// history_operations table
type OperationBatchInsertBuilder interface {
Add(
ctx context.Context,
id int64,
transactionID int64,
applicationOrder uint32,
Expand All @@ -21,27 +20,25 @@ type OperationBatchInsertBuilder interface {
sourceAccount string,
sourceAcccountMuxed null.String,
) error
Exec(ctx context.Context) error
Exec(ctx context.Context, session db.SessionInterface) error
}

// operationBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type operationBatchInsertBuilder struct {
builder db.BatchInsertBuilder
builder db.FastBatchInsertBuilder
table string
}

// NewOperationBatchInsertBuilder constructs a new TransactionBatchInsertBuilder instance
func (q *Q) NewOperationBatchInsertBuilder(maxBatchSize int) OperationBatchInsertBuilder {
func (q *Q) NewOperationBatchInsertBuilder() OperationBatchInsertBuilder {
return &operationBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("history_operations"),
MaxBatchSize: maxBatchSize,
},
table: "history_operations",
builder: db.FastBatchInsertBuilder{},
}
}

// Add adds a transaction's operations to the batch
func (i *operationBatchInsertBuilder) Add(
ctx context.Context,
id int64,
transactionID int64,
applicationOrder uint32,
Expand All @@ -50,7 +47,7 @@ func (i *operationBatchInsertBuilder) Add(
sourceAccount string,
sourceAccountMuxed null.String,
) error {
return i.builder.Row(ctx, map[string]interface{}{
return i.builder.Row(map[string]interface{}{
"id": id,
"transaction_id": transactionID,
"application_order": applicationOrder,
Expand All @@ -62,6 +59,6 @@ func (i *operationBatchInsertBuilder) Add(

}

func (i *operationBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
func (i *operationBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error {
return i.builder.Exec(ctx, session, i.table)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestAddOperation(t *testing.T) {

txBatch := q.NewTransactionBatchInsertBuilder()

builder := q.NewOperationBatchInsertBuilder(1)
builder := q.NewOperationBatchInsertBuilder()

transactionHash := "2a805712c6d10f9e74bb0ccf54ae92a2b4b1e586451fe8133a2433816f6b567c"
transactionResult := "AAAAAAAAAGQAAAAAAAAAAQAAAAAAAAABAAAAAAAAAAA="
Expand Down Expand Up @@ -50,7 +50,7 @@ func TestAddOperation(t *testing.T) {

sourceAccount := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
sourceAccountMuxed := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
err = builder.Add(tt.Ctx,
err = builder.Add(
toid.New(sequence, 1, 1).ToInt64(),
toid.New(sequence, 1, 0).ToInt64(),
1,
Expand All @@ -61,7 +61,7 @@ func TestAddOperation(t *testing.T) {
)
tt.Assert.NoError(err)

err = builder.Exec(tt.Ctx)
err = builder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)

tt.Assert.NoError(q.Commit())
Expand Down
8 changes: 2 additions & 6 deletions services/horizon/internal/db2/history/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ func TestOperationByLiquidityPool(t *testing.T) {
tt.Assert.NoError(err)

// Insert a two phony operations
operationBuilder := q.NewOperationBatchInsertBuilder(2)
operationBuilder := q.NewOperationBatchInsertBuilder()
err = operationBuilder.Add(
tt.Ctx,
opID1,
txID,
1,
Expand All @@ -107,11 +106,8 @@ func TestOperationByLiquidityPool(t *testing.T) {
null.String{},
)
tt.Assert.NoError(err)
err = operationBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)

err = operationBuilder.Add(
tt.Ctx,
opID2,
txID,
1,
Expand All @@ -121,7 +117,7 @@ func TestOperationByLiquidityPool(t *testing.T) {
null.String{},
)
tt.Assert.NoError(err)
err = operationBuilder.Exec(tt.Ctx)
err = operationBuilder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)

// Insert Liquidity Pool history
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *ProcessorRunner) buildTransactionProcessor(
statsLedgerTransactionProcessor,
processors.NewEffectProcessor(s.historyQ, sequence),
processors.NewLedgerProcessor(s.session, s.historyQ, ledger, CurrentVersion),
processors.NewOperationProcessor(s.historyQ, sequence),
processors.NewOperationProcessor(s.session, s.historyQ, sequence),
tradeProcessor,
processors.NewParticipantsProcessor(s.historyQ, sequence),
processors.NewTransactionProcessor(s.session, s.historyQ, sequence),
Expand Down
10 changes: 5 additions & 5 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) {
q := &mockDBQ{}
defer mock.AssertExpectationsForObjects(t, q)

q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize).
q.MockQOperations.On("NewOperationBatchInsertBuilder").
Return(&history.MockOperationsBatchInsertBuilder{}).Twice() // Twice = with/without failed
q.MockQTransactions.On("NewTransactionBatchInsertBuilder").
Return(&history.MockTransactionsBatchInsertBuilder{}).Twice()
Expand Down Expand Up @@ -298,8 +298,8 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder)
mockOperationsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize).
mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQOperations.On("NewOperationBatchInsertBuilder").
Return(mockOperationsBatchInsertBuilder).Twice()

mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
Expand Down Expand Up @@ -371,8 +371,8 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) {

mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{}
defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder)
mockOperationsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize).
mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once()
q.MockQOperations.On("NewOperationBatchInsertBuilder").
Return(mockOperationsBatchInsertBuilder).Twice()

mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,27 @@ import (
"github.com/stellar/go/ingest"
"github.com/stellar/go/protocols/horizon/base"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"
)

// OperationProcessor operations processor
type OperationProcessor struct {
session db.SessionInterface
operationsQ history.QOperations

sequence uint32
batch history.OperationBatchInsertBuilder
}

func NewOperationProcessor(operationsQ history.QOperations, sequence uint32) *OperationProcessor {
func NewOperationProcessor(session db.SessionInterface, operationsQ history.QOperations, sequence uint32) *OperationProcessor {
return &OperationProcessor{
session: session,
operationsQ: operationsQ,
sequence: sequence,
batch: operationsQ.NewOperationBatchInsertBuilder(maxBatchSize),
batch: operationsQ.NewOperationBatchInsertBuilder(),
}
}

Expand Down Expand Up @@ -57,7 +60,7 @@ func (p *OperationProcessor) ProcessTransaction(ctx context.Context, transaction
if source.Type == xdr.CryptoKeyTypeKeyTypeMuxedEd25519 {
sourceAccountMuxed = null.StringFrom(source.Address())
}
if err := p.batch.Add(ctx,
if err := p.batch.Add(
operation.ID(),
operation.TransactionID(),
operation.Order(),
Expand All @@ -74,7 +77,7 @@ func (p *OperationProcessor) ProcessTransaction(ctx context.Context, transaction
}

func (p *OperationProcessor) Commit(ctx context.Context) error {
return p.batch.Exec(ctx)
return p.batch.Exec(ctx, p.session)
}

// transactionOperationWrapper represents the data for a single operation within a transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand All @@ -21,6 +22,7 @@ type OperationsProcessorTestSuiteLedger struct {
suite.Suite
ctx context.Context
processor *OperationProcessor
mockSession *db.MockSession
mockQ *history.MockQOperations
mockBatchInsertBuilder *history.MockOperationsBatchInsertBuilder
}
Expand All @@ -34,10 +36,11 @@ func (s *OperationsProcessorTestSuiteLedger) SetupTest() {
s.mockQ = &history.MockQOperations{}
s.mockBatchInsertBuilder = &history.MockOperationsBatchInsertBuilder{}
s.mockQ.
On("NewOperationBatchInsertBuilder", maxBatchSize).
On("NewOperationBatchInsertBuilder").
Return(s.mockBatchInsertBuilder).Once()

s.processor = NewOperationProcessor(
s.mockSession,
s.mockQ,
56,
)
Expand Down Expand Up @@ -74,7 +77,6 @@ func (s *OperationsProcessorTestSuiteLedger) mockBatchInsertAdds(txs []ingest.Le
}
s.mockBatchInsertBuilder.On(
"Add",
s.ctx,
expected.ID(),
expected.TransactionID(),
expected.Order(),
Expand Down Expand Up @@ -122,7 +124,7 @@ func (s *OperationsProcessorTestSuiteLedger) TestAddOperationSucceeds() {

err = s.mockBatchInsertAdds(txs, uint32(56))
s.Assert().NoError(err)
s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()
s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once()
s.Assert().NoError(s.processor.Commit(s.ctx))

for _, tx := range txs {
Expand All @@ -136,7 +138,7 @@ func (s *OperationsProcessorTestSuiteLedger) TestAddOperationFails() {

s.mockBatchInsertBuilder.
On(
"Add", s.ctx,
"Add",
mock.Anything,
mock.Anything,
mock.Anything,
Expand All @@ -152,7 +154,7 @@ func (s *OperationsProcessorTestSuiteLedger) TestAddOperationFails() {
}

func (s *OperationsProcessorTestSuiteLedger) TestExecFails() {
s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(errors.New("transient error")).Once()
s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(errors.New("transient error")).Once()
err := s.processor.Commit(s.ctx)
s.Assert().Error(err)
s.Assert().EqualError(err, "transient error")
Expand Down

0 comments on commit e163964

Please sign in to comment.