Skip to content

Commit

Permalink
services/horizon: Remove the optimization that uses COPY for liquidit…
Browse files Browse the repository at this point in the history
…y pool insertion and revert to the previous method (#5135)
  • Loading branch information
urvisavla authored Dec 4, 2023
1 parent 0166808 commit 854a8db
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 136 deletions.

This file was deleted.

6 changes: 1 addition & 5 deletions services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ type LiquidityPool struct {
type LiquidityPoolAssetReserves []LiquidityPoolAssetReserve

func (c LiquidityPoolAssetReserves) Value() (driver.Value, error) {
// Convert the byte array into a string as a workaround to bypass buggy encoding in the pq driver
// (More info about this bug here https://github.com/stellar/go/issues/5086#issuecomment-1773215436).
val, err := json.Marshal(c)
return string(val), err
return json.Marshal(c)
}

func (c *LiquidityPoolAssetReserves) Scan(value interface{}) error {
Expand Down Expand Up @@ -94,7 +91,6 @@ type QLiquidityPools interface {
FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error)
GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error)
CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (int64, error)
NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder
}

// UpsertLiquidityPools upserts a batch of liquidity pools in the liquidity_pools table.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,3 @@ func (m *MockQLiquidityPools) CompactLiquidityPools(ctx context.Context, cutOffS
a := m.Called(ctx, cutOffSequence)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQLiquidityPools) NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder {
a := m.Called()
return a.Get(0).(LiquidityPoolBatchInsertBuilder)
}
8 changes: 0 additions & 8 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,6 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder").
Return(mockClaimableBalanceBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{}
if mockExec {
mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
}
q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder").
Return(mockLiquidityPoolBatchInsertBuilder).Twice()

mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
if mockExec {
mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
Expand All @@ -630,7 +623,6 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
mockAccountsBatchInsertBuilder,
mockClaimableBalanceBatchInsertBuilder,
mockClaimableBalanceClaimantBatchInsertBuilder,
mockLiquidityPoolBatchInsertBuilder,
mockOfferBatchInsertBuilder,
mockAccountDataBatchInsertBuilder,
mockTrustLinesBatchInsertBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
)

type LiquidityPoolsChangeProcessor struct {
qLiquidityPools history.QLiquidityPools
cache *ingest.ChangeCompactor
sequence uint32
batchInsertBuilder history.LiquidityPoolBatchInsertBuilder
qLiquidityPools history.QLiquidityPools
cache *ingest.ChangeCompactor
sequence uint32
}

func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32) *LiquidityPoolsChangeProcessor {
Expand All @@ -27,7 +26,6 @@ func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32

func (p *LiquidityPoolsChangeProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.batchInsertBuilder = p.qLiquidityPools.NewLiquidityPoolBatchInsertBuilder()
}

func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -45,24 +43,21 @@ func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, chang
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}

return nil
}

func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error {
defer p.reset()

changes := p.cache.GetChanges()
var lps []history.LiquidityPool
for _, change := range changes {
switch {
case change.Pre == nil && change.Post != nil:
// Created
err := p.batchInsertBuilder.Add(p.ledgerEntryToRow(change.Post))
if err != nil {
return errors.Wrap(err, "error adding to LiquidityPoolsBatchInsertBuilder")
}
lps = append(lps, p.ledgerEntryToRow(change.Post))
case change.Pre != nil && change.Post == nil:
// Removed
lp := p.ledgerEntryToRow(change.Pre)
Expand All @@ -75,11 +70,6 @@ func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error {
}
}

err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "error executing LiquidityPoolsBatchInsertBuilder")
}

if len(lps) > 0 {
if err := p.qLiquidityPools.UpsertLiquidityPools(ctx, lps); err != nil {
return errors.Wrap(err, "error upserting liquidity pools")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@ func TestLiquidityPoolsChangeProcessorTestSuiteState(t *testing.T) {

type LiquidityPoolsChangeProcessorTestSuiteState struct {
suite.Suite
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
mockLiquidityPoolBatchInsertBuilder *history.MockLiquidityPoolBatchInsertBuilder
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
}

func (s *LiquidityPoolsChangeProcessorTestSuiteState) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQLiquidityPools{}
s.mockLiquidityPoolBatchInsertBuilder = &history.MockLiquidityPoolBatchInsertBuilder{}
s.mockQ.
On("NewLiquidityPoolBatchInsertBuilder").
Return(s.mockLiquidityPoolBatchInsertBuilder)

s.mockLiquidityPoolBatchInsertBuilder.On("Exec", s.ctx).Return(nil)
s.sequence = 456
s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence)
}
Expand Down Expand Up @@ -91,7 +85,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteState) TestCreatesLiquidityPools(
},
LastModifiedLedger: 123,
}
s.mockLiquidityPoolBatchInsertBuilder.On("Add", lp).Return(nil).Once()
s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{lp}).Return(nil).Once()

s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once()

Expand All @@ -115,23 +109,16 @@ func TestLiquidityPoolsChangeProcessorTestSuiteLedger(t *testing.T) {

type LiquidityPoolsChangeProcessorTestSuiteLedger struct {
suite.Suite
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
mockLiquidityPoolBatchInsertBuilder *history.MockLiquidityPoolBatchInsertBuilder
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
}

func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQLiquidityPools{}

s.mockLiquidityPoolBatchInsertBuilder = &history.MockLiquidityPoolBatchInsertBuilder{}
s.mockQ.
On("NewLiquidityPoolBatchInsertBuilder").
Return(s.mockLiquidityPoolBatchInsertBuilder).Twice()
s.mockLiquidityPoolBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

s.sequence = 456
s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence)
}
Expand Down Expand Up @@ -183,28 +170,6 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
},
},
}

liquidityPool := history.LiquidityPool{
PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 34,
TrustlineCount: 52115,
ShareCount: 412241,
AssetReserves: []history.LiquidityPoolAssetReserve{
{
xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
450,
},
{
xdr.MustNewNativeAsset(),
500,
},
},
LastModifiedLedger: 123,
}
s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).
Return(nil).Once()

err := s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeLiquidityPool,
Pre: nil,
Expand All @@ -226,7 +191,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
},
},
}
s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).Return(nil).Once()

pre.LastModifiedLedgerSeq = pre.LastModifiedLedgerSeq - 1
err = s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeLiquidityPool,
Expand All @@ -235,7 +200,25 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
})
s.Assert().NoError(err)

s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).Return(nil).Once()
postLP := history.LiquidityPool{
PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 34,
TrustlineCount: 52115,
ShareCount: 412241,
AssetReserves: []history.LiquidityPoolAssetReserve{
{
xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
450,
},
{
xdr.MustNewNativeAsset(),
500,
},
},
LastModifiedLedger: 123,
}
s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{postLP}).Return(nil).Once()
s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once()
}

Expand Down

0 comments on commit 854a8db

Please sign in to comment.