Skip to content

Commit

Permalink
services/horizon: Use COPY for inserting into liquidity_pools table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla authored Nov 14, 2023
1 parent 7d248ce commit b90338c
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package history

import (
"context"

"github.com/stellar/go/support/db"
)

type LiquidityPoolBatchInsertBuilder interface {
Add(liquidityPool LiquidityPool) error
Exec(ctx context.Context) error
}

type liquidityPoolBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

func (q *Q) NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder {
return &liquidityPoolBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "liquidity_pools",
}
}

// Add adds a new liquidity pool to the batch
func (i *liquidityPoolBatchInsertBuilder) Add(liquidityPool LiquidityPool) error {
return i.builder.RowStruct(liquidityPool)
}

// Exec writes the batch of liquidity pools to the database.
func (i *liquidityPoolBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
6 changes: 5 additions & 1 deletion services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ type LiquidityPool struct {
type LiquidityPoolAssetReserves []LiquidityPoolAssetReserve

func (c LiquidityPoolAssetReserves) Value() (driver.Value, error) {
return json.Marshal(c)
// Convert the byte array into a string as a workaround to bypass buggy encoding in the pq driver
// (More info about this bug here https://github.com/stellar/go/issues/5086#issuecomment-1773215436).
val, err := json.Marshal(c)
return string(val), err
}

func (c *LiquidityPoolAssetReserves) Scan(value interface{}) error {
Expand Down Expand Up @@ -91,6 +94,7 @@ type QLiquidityPools interface {
FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error)
GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error)
CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (int64, error)
NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder
}

// UpsertLiquidityPools upserts a batch of liquidity pools in the liquidity_pools table.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockLiquidityPoolBatchInsertBuilder struct {
mock.Mock
}

func (m *MockLiquidityPoolBatchInsertBuilder) Add(liquidityPool LiquidityPool) error {
a := m.Called(liquidityPool)
return a.Error(0)
}

func (m *MockLiquidityPoolBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,8 @@ func (m *MockQLiquidityPools) CompactLiquidityPools(ctx context.Context, cutOffS
a := m.Called(ctx, cutOffSequence)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQLiquidityPools) NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder {
a := m.Called()
return a.Get(0).(LiquidityPoolBatchInsertBuilder)
}
36 changes: 36 additions & 0 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{}
q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder").
Return(mockLiquidityPoolBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -138,6 +144,12 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{}
q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder").
Return(mockLiquidityPoolBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -185,6 +197,12 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t
mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{}
q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder").
Return(mockLiquidityPoolBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -224,6 +242,12 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder").
Return(mockClaimantsBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{}
q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder").
Return(mockLiquidityPoolBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

runner := ProcessorRunner{
ctx: ctx,
historyQ: q,
Expand Down Expand Up @@ -360,6 +384,12 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {
mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{}
q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder").
Return(mockLiquidityPoolBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")).
Return(int64(0), nil)

Expand Down Expand Up @@ -551,6 +581,12 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont
Return(mockClaimableBalanceBatchInsertBuilder)
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil)

mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{}
q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder").
Return(mockLiquidityPoolBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{})

return []interface{}{mockAccountSignersBatchInsertBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
)

type LiquidityPoolsChangeProcessor struct {
qLiquidityPools history.QLiquidityPools
cache *ingest.ChangeCompactor
sequence uint32
qLiquidityPools history.QLiquidityPools
cache *ingest.ChangeCompactor
sequence uint32
batchInsertBuilder history.LiquidityPoolBatchInsertBuilder
}

func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32) *LiquidityPoolsChangeProcessor {
Expand All @@ -26,6 +27,7 @@ func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32

func (p *LiquidityPoolsChangeProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.batchInsertBuilder = p.qLiquidityPools.NewLiquidityPoolBatchInsertBuilder()
}

func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -43,21 +45,24 @@ func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, chang
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}

return nil
}

func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error {
defer p.reset()

changes := p.cache.GetChanges()
var lps []history.LiquidityPool
for _, change := range changes {
switch {
case change.Pre == nil && change.Post != nil:
// Created
lps = append(lps, p.ledgerEntryToRow(change.Post))
err := p.batchInsertBuilder.Add(p.ledgerEntryToRow(change.Post))
if err != nil {
return errors.Wrap(err, "error adding to LiquidityPoolsBatchInsertBuilder")
}
case change.Pre != nil && change.Post == nil:
// Removed
lp := p.ledgerEntryToRow(change.Pre)
Expand All @@ -70,6 +75,11 @@ func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error {
}
}

err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "error executing LiquidityPoolsBatchInsertBuilder")
}

if len(lps) > 0 {
if err := p.qLiquidityPools.UpsertLiquidityPools(ctx, lps); err != nil {
return errors.Wrap(err, "error upserting liquidity pools")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ func TestLiquidityPoolsChangeProcessorTestSuiteState(t *testing.T) {

type LiquidityPoolsChangeProcessorTestSuiteState struct {
suite.Suite
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
mockLiquidityPoolBatchInsertBuilder *history.MockLiquidityPoolBatchInsertBuilder
}

func (s *LiquidityPoolsChangeProcessorTestSuiteState) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQLiquidityPools{}
s.mockLiquidityPoolBatchInsertBuilder = &history.MockLiquidityPoolBatchInsertBuilder{}
s.mockQ.
On("NewLiquidityPoolBatchInsertBuilder").
Return(s.mockLiquidityPoolBatchInsertBuilder)

s.mockLiquidityPoolBatchInsertBuilder.On("Exec", s.ctx).Return(nil)
s.sequence = 456
s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence)
}
Expand Down Expand Up @@ -85,7 +91,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteState) TestCreatesLiquidityPools(
},
LastModifiedLedger: 123,
}
s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{lp}).Return(nil).Once()
s.mockLiquidityPoolBatchInsertBuilder.On("Add", lp).Return(nil).Once()

s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once()

Expand All @@ -109,16 +115,23 @@ func TestLiquidityPoolsChangeProcessorTestSuiteLedger(t *testing.T) {

type LiquidityPoolsChangeProcessorTestSuiteLedger struct {
suite.Suite
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
mockLiquidityPoolBatchInsertBuilder *history.MockLiquidityPoolBatchInsertBuilder
}

func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQLiquidityPools{}

s.mockLiquidityPoolBatchInsertBuilder = &history.MockLiquidityPoolBatchInsertBuilder{}
s.mockQ.
On("NewLiquidityPoolBatchInsertBuilder").
Return(s.mockLiquidityPoolBatchInsertBuilder).Twice()
s.mockLiquidityPoolBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

s.sequence = 456
s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence)
}
Expand Down Expand Up @@ -170,6 +183,28 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
},
},
}

liquidityPool := history.LiquidityPool{
PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 34,
TrustlineCount: 52115,
ShareCount: 412241,
AssetReserves: []history.LiquidityPoolAssetReserve{
{
xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
450,
},
{
xdr.MustNewNativeAsset(),
500,
},
},
LastModifiedLedger: 123,
}
s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).
Return(nil).Once()

err := s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeLiquidityPool,
Pre: nil,
Expand All @@ -191,7 +226,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
},
},
}

s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).Return(nil).Once()
pre.LastModifiedLedgerSeq = pre.LastModifiedLedgerSeq - 1
err = s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeLiquidityPool,
Expand All @@ -200,25 +235,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
})
s.Assert().NoError(err)

postLP := history.LiquidityPool{
PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 34,
TrustlineCount: 52115,
ShareCount: 412241,
AssetReserves: []history.LiquidityPoolAssetReserve{
{
xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
450,
},
{
xdr.MustNewNativeAsset(),
500,
},
},
LastModifiedLedger: 123,
}
s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{postLP}).Return(nil).Once()
s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).Return(nil).Once()
s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once()
}

Expand Down

0 comments on commit b90338c

Please sign in to comment.