From b90338c0d1831c15b5151f744903e2df58cffec5 Mon Sep 17 00:00:00 2001 From: urvisavla Date: Tue, 14 Nov 2023 12:19:21 -0800 Subject: [PATCH] services/horizon: Use COPY for inserting into liquidity_pools table (#5110) --- .../liquidity_pool_batch_insert_builder.go | 36 +++++++++ .../internal/db2/history/liquidity_pools.go | 6 +- ...ock_liquidity_pool_batch_insert_builder.go | 21 ++++++ .../db2/history/mock_q_liquidity_pools.go | 5 ++ .../internal/ingest/processor_runner_test.go | 36 +++++++++ .../liquidity_pools_change_processor.go | 20 +++-- .../liquidity_pools_change_processor_test.go | 75 ++++++++++++------- 7 files changed, 164 insertions(+), 35 deletions(-) create mode 100644 services/horizon/internal/db2/history/liquidity_pool_batch_insert_builder.go create mode 100644 services/horizon/internal/db2/history/mock_liquidity_pool_batch_insert_builder.go diff --git a/services/horizon/internal/db2/history/liquidity_pool_batch_insert_builder.go b/services/horizon/internal/db2/history/liquidity_pool_batch_insert_builder.go new file mode 100644 index 0000000000..65472fa657 --- /dev/null +++ b/services/horizon/internal/db2/history/liquidity_pool_batch_insert_builder.go @@ -0,0 +1,36 @@ +package history + +import ( + "context" + + "github.com/stellar/go/support/db" +) + +type LiquidityPoolBatchInsertBuilder interface { + Add(liquidityPool LiquidityPool) error + Exec(ctx context.Context) error +} + +type liquidityPoolBatchInsertBuilder struct { + session db.SessionInterface + builder db.FastBatchInsertBuilder + table string +} + +func (q *Q) NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder { + return &liquidityPoolBatchInsertBuilder{ + session: q, + builder: db.FastBatchInsertBuilder{}, + table: "liquidity_pools", + } +} + +// Add adds a new liquidity pool to the batch +func (i *liquidityPoolBatchInsertBuilder) Add(liquidityPool LiquidityPool) error { + return i.builder.RowStruct(liquidityPool) +} + +// Exec writes the batch of liquidity pools to the database. +func (i *liquidityPoolBatchInsertBuilder) Exec(ctx context.Context) error { + return i.builder.Exec(ctx, i.session, i.table) +} diff --git a/services/horizon/internal/db2/history/liquidity_pools.go b/services/horizon/internal/db2/history/liquidity_pools.go index 46e6ba59d3..c4d0a6e6c9 100644 --- a/services/horizon/internal/db2/history/liquidity_pools.go +++ b/services/horizon/internal/db2/history/liquidity_pools.go @@ -37,7 +37,10 @@ type LiquidityPool struct { type LiquidityPoolAssetReserves []LiquidityPoolAssetReserve func (c LiquidityPoolAssetReserves) Value() (driver.Value, error) { - return json.Marshal(c) + // Convert the byte array into a string as a workaround to bypass buggy encoding in the pq driver + // (More info about this bug here https://github.com/stellar/go/issues/5086#issuecomment-1773215436). + val, err := json.Marshal(c) + return string(val), err } func (c *LiquidityPoolAssetReserves) Scan(value interface{}) error { @@ -91,6 +94,7 @@ type QLiquidityPools interface { FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error) GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error) CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (int64, error) + NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder } // UpsertLiquidityPools upserts a batch of liquidity pools in the liquidity_pools table. diff --git a/services/horizon/internal/db2/history/mock_liquidity_pool_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_liquidity_pool_batch_insert_builder.go new file mode 100644 index 0000000000..d0685ea381 --- /dev/null +++ b/services/horizon/internal/db2/history/mock_liquidity_pool_batch_insert_builder.go @@ -0,0 +1,21 @@ +package history + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +type MockLiquidityPoolBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockLiquidityPoolBatchInsertBuilder) Add(liquidityPool LiquidityPool) error { + a := m.Called(liquidityPool) + return a.Error(0) +} + +func (m *MockLiquidityPoolBatchInsertBuilder) Exec(ctx context.Context) error { + a := m.Called(ctx) + return a.Error(0) +} diff --git a/services/horizon/internal/db2/history/mock_q_liquidity_pools.go b/services/horizon/internal/db2/history/mock_q_liquidity_pools.go index 7b64b24126..d9ae946cee 100644 --- a/services/horizon/internal/db2/history/mock_q_liquidity_pools.go +++ b/services/horizon/internal/db2/history/mock_q_liquidity_pools.go @@ -45,3 +45,8 @@ func (m *MockQLiquidityPools) CompactLiquidityPools(ctx context.Context, cutOffS a := m.Called(ctx, cutOffSequence) return a.Get(0).(int64), a.Error(1) } + +func (m *MockQLiquidityPools) NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder { + a := m.Called() + return a.Get(0).(LiquidityPoolBatchInsertBuilder) +} diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index f1ac314538..eb1ffa9e70 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -61,6 +61,12 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} + q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). + Return(mockLiquidityPoolBatchInsertBuilder).Twice() + + mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -138,6 +144,12 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} + q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). + Return(mockLiquidityPoolBatchInsertBuilder).Twice() + + mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -185,6 +197,12 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} + q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). + Return(mockLiquidityPoolBatchInsertBuilder).Twice() + + mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -224,6 +242,12 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(mockClaimantsBatchInsertBuilder).Twice() + mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} + q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). + Return(mockLiquidityPoolBatchInsertBuilder).Twice() + + mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + runner := ProcessorRunner{ ctx: ctx, historyQ: q, @@ -360,6 +384,12 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} + q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). + Return(mockLiquidityPoolBatchInsertBuilder).Twice() + + mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) @@ -551,6 +581,12 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont Return(mockClaimableBalanceBatchInsertBuilder) mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil) + mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} + q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). + Return(mockLiquidityPoolBatchInsertBuilder).Twice() + + mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) return []interface{}{mockAccountSignersBatchInsertBuilder, diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_change_processor.go b/services/horizon/internal/ingest/processors/liquidity_pools_change_processor.go index c5e5252280..2387f77baf 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_change_processor.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_change_processor.go @@ -10,9 +10,10 @@ import ( ) type LiquidityPoolsChangeProcessor struct { - qLiquidityPools history.QLiquidityPools - cache *ingest.ChangeCompactor - sequence uint32 + qLiquidityPools history.QLiquidityPools + cache *ingest.ChangeCompactor + sequence uint32 + batchInsertBuilder history.LiquidityPoolBatchInsertBuilder } func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32) *LiquidityPoolsChangeProcessor { @@ -26,6 +27,7 @@ func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32 func (p *LiquidityPoolsChangeProcessor) reset() { p.cache = ingest.NewChangeCompactor() + p.batchInsertBuilder = p.qLiquidityPools.NewLiquidityPoolBatchInsertBuilder() } func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { @@ -43,13 +45,13 @@ func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, chang if err != nil { return errors.Wrap(err, "error in Commit") } - p.reset() } return nil } func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error { + defer p.reset() changes := p.cache.GetChanges() var lps []history.LiquidityPool @@ -57,7 +59,10 @@ func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error { switch { case change.Pre == nil && change.Post != nil: // Created - lps = append(lps, p.ledgerEntryToRow(change.Post)) + err := p.batchInsertBuilder.Add(p.ledgerEntryToRow(change.Post)) + if err != nil { + return errors.Wrap(err, "error adding to LiquidityPoolsBatchInsertBuilder") + } case change.Pre != nil && change.Post == nil: // Removed lp := p.ledgerEntryToRow(change.Pre) @@ -70,6 +75,11 @@ func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error { } } + err := p.batchInsertBuilder.Exec(ctx) + if err != nil { + return errors.Wrap(err, "error executing LiquidityPoolsBatchInsertBuilder") + } + if len(lps) > 0 { if err := p.qLiquidityPools.UpsertLiquidityPools(ctx, lps); err != nil { return errors.Wrap(err, "error upserting liquidity pools") diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_change_processor_test.go b/services/horizon/internal/ingest/processors/liquidity_pools_change_processor_test.go index 4e7383b1fe..47a47ee6c5 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_change_processor_test.go @@ -19,16 +19,22 @@ func TestLiquidityPoolsChangeProcessorTestSuiteState(t *testing.T) { type LiquidityPoolsChangeProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *LiquidityPoolsChangeProcessor - mockQ *history.MockQLiquidityPools - sequence uint32 + ctx context.Context + processor *LiquidityPoolsChangeProcessor + mockQ *history.MockQLiquidityPools + sequence uint32 + mockLiquidityPoolBatchInsertBuilder *history.MockLiquidityPoolBatchInsertBuilder } func (s *LiquidityPoolsChangeProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQLiquidityPools{} + s.mockLiquidityPoolBatchInsertBuilder = &history.MockLiquidityPoolBatchInsertBuilder{} + s.mockQ. + On("NewLiquidityPoolBatchInsertBuilder"). + Return(s.mockLiquidityPoolBatchInsertBuilder) + s.mockLiquidityPoolBatchInsertBuilder.On("Exec", s.ctx).Return(nil) s.sequence = 456 s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence) } @@ -85,7 +91,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteState) TestCreatesLiquidityPools( }, LastModifiedLedger: 123, } - s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{lp}).Return(nil).Once() + s.mockLiquidityPoolBatchInsertBuilder.On("Add", lp).Return(nil).Once() s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once() @@ -109,16 +115,23 @@ func TestLiquidityPoolsChangeProcessorTestSuiteLedger(t *testing.T) { type LiquidityPoolsChangeProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *LiquidityPoolsChangeProcessor - mockQ *history.MockQLiquidityPools - sequence uint32 + ctx context.Context + processor *LiquidityPoolsChangeProcessor + mockQ *history.MockQLiquidityPools + sequence uint32 + mockLiquidityPoolBatchInsertBuilder *history.MockLiquidityPoolBatchInsertBuilder } func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQLiquidityPools{} + s.mockLiquidityPoolBatchInsertBuilder = &history.MockLiquidityPoolBatchInsertBuilder{} + s.mockQ. + On("NewLiquidityPoolBatchInsertBuilder"). + Return(s.mockLiquidityPoolBatchInsertBuilder).Twice() + s.mockLiquidityPoolBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.sequence = 456 s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence) } @@ -170,6 +183,28 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() { }, }, } + + liquidityPool := history.LiquidityPool{ + PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000", + Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct, + Fee: 34, + TrustlineCount: 52115, + ShareCount: 412241, + AssetReserves: []history.LiquidityPoolAssetReserve{ + { + xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + 450, + }, + { + xdr.MustNewNativeAsset(), + 500, + }, + }, + LastModifiedLedger: 123, + } + s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool). + Return(nil).Once() + err := s.processor.ProcessChange(s.ctx, ingest.Change{ Type: xdr.LedgerEntryTypeLiquidityPool, Pre: nil, @@ -191,7 +226,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() { }, }, } - + s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).Return(nil).Once() pre.LastModifiedLedgerSeq = pre.LastModifiedLedgerSeq - 1 err = s.processor.ProcessChange(s.ctx, ingest.Change{ Type: xdr.LedgerEntryTypeLiquidityPool, @@ -200,25 +235,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() { }) s.Assert().NoError(err) - postLP := history.LiquidityPool{ - PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000", - Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct, - Fee: 34, - TrustlineCount: 52115, - ShareCount: 412241, - AssetReserves: []history.LiquidityPoolAssetReserve{ - { - xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - 450, - }, - { - xdr.MustNewNativeAsset(), - 500, - }, - }, - LastModifiedLedger: 123, - } - s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{postLP}).Return(nil).Once() + s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).Return(nil).Once() s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once() }