Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Remove the optimization that uses COPY for liquidity pool insertion #5135

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

6 changes: 1 addition & 5 deletions services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ type LiquidityPool struct {
type LiquidityPoolAssetReserves []LiquidityPoolAssetReserve

func (c LiquidityPoolAssetReserves) Value() (driver.Value, error) {
// Convert the byte array into a string as a workaround to bypass buggy encoding in the pq driver
// (More info about this bug here https://github.com/stellar/go/issues/5086#issuecomment-1773215436).
val, err := json.Marshal(c)
return string(val), err
return json.Marshal(c)
}

func (c *LiquidityPoolAssetReserves) Scan(value interface{}) error {
Expand Down Expand Up @@ -94,7 +91,6 @@ type QLiquidityPools interface {
FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error)
GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error)
CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (int64, error)
NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder
}

// UpsertLiquidityPools upserts a batch of liquidity pools in the liquidity_pools table.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,3 @@ func (m *MockQLiquidityPools) CompactLiquidityPools(ctx context.Context, cutOffS
a := m.Called(ctx, cutOffSequence)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQLiquidityPools) NewLiquidityPoolBatchInsertBuilder() LiquidityPoolBatchInsertBuilder {
a := m.Called()
return a.Get(0).(LiquidityPoolBatchInsertBuilder)
}
8 changes: 0 additions & 8 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,6 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder").
Return(mockClaimableBalanceBatchInsertBuilder).Twice()

mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{}
if mockExec {
mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
}
q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder").
Return(mockLiquidityPoolBatchInsertBuilder).Twice()

mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
if mockExec {
mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
Expand All @@ -630,7 +623,6 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec
mockAccountsBatchInsertBuilder,
mockClaimableBalanceBatchInsertBuilder,
mockClaimableBalanceClaimantBatchInsertBuilder,
mockLiquidityPoolBatchInsertBuilder,
mockOfferBatchInsertBuilder,
mockAccountDataBatchInsertBuilder,
mockTrustLinesBatchInsertBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
)

type LiquidityPoolsChangeProcessor struct {
qLiquidityPools history.QLiquidityPools
cache *ingest.ChangeCompactor
sequence uint32
batchInsertBuilder history.LiquidityPoolBatchInsertBuilder
qLiquidityPools history.QLiquidityPools
cache *ingest.ChangeCompactor
sequence uint32
}

func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32) *LiquidityPoolsChangeProcessor {
Expand All @@ -27,7 +26,6 @@ func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32

func (p *LiquidityPoolsChangeProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.batchInsertBuilder = p.qLiquidityPools.NewLiquidityPoolBatchInsertBuilder()
}

func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -45,24 +43,21 @@ func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, chang
if err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
urvisavla marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error {
defer p.reset()

changes := p.cache.GetChanges()
var lps []history.LiquidityPool
for _, change := range changes {
switch {
case change.Pre == nil && change.Post != nil:
// Created
err := p.batchInsertBuilder.Add(p.ledgerEntryToRow(change.Post))
if err != nil {
return errors.Wrap(err, "error adding to LiquidityPoolsBatchInsertBuilder")
}
lps = append(lps, p.ledgerEntryToRow(change.Post))
case change.Pre != nil && change.Post == nil:
// Removed
lp := p.ledgerEntryToRow(change.Pre)
Expand All @@ -75,11 +70,6 @@ func (p *LiquidityPoolsChangeProcessor) Commit(ctx context.Context) error {
}
}

err := p.batchInsertBuilder.Exec(ctx)
if err != nil {
return errors.Wrap(err, "error executing LiquidityPoolsBatchInsertBuilder")
}

if len(lps) > 0 {
if err := p.qLiquidityPools.UpsertLiquidityPools(ctx, lps); err != nil {
return errors.Wrap(err, "error upserting liquidity pools")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@ func TestLiquidityPoolsChangeProcessorTestSuiteState(t *testing.T) {

type LiquidityPoolsChangeProcessorTestSuiteState struct {
suite.Suite
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
mockLiquidityPoolBatchInsertBuilder *history.MockLiquidityPoolBatchInsertBuilder
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
}

func (s *LiquidityPoolsChangeProcessorTestSuiteState) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQLiquidityPools{}
s.mockLiquidityPoolBatchInsertBuilder = &history.MockLiquidityPoolBatchInsertBuilder{}
s.mockQ.
On("NewLiquidityPoolBatchInsertBuilder").
Return(s.mockLiquidityPoolBatchInsertBuilder)

s.mockLiquidityPoolBatchInsertBuilder.On("Exec", s.ctx).Return(nil)
s.sequence = 456
s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence)
}
Expand Down Expand Up @@ -91,7 +85,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteState) TestCreatesLiquidityPools(
},
LastModifiedLedger: 123,
}
s.mockLiquidityPoolBatchInsertBuilder.On("Add", lp).Return(nil).Once()
s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{lp}).Return(nil).Once()

s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once()

Expand All @@ -115,23 +109,16 @@ func TestLiquidityPoolsChangeProcessorTestSuiteLedger(t *testing.T) {

type LiquidityPoolsChangeProcessorTestSuiteLedger struct {
suite.Suite
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
mockLiquidityPoolBatchInsertBuilder *history.MockLiquidityPoolBatchInsertBuilder
ctx context.Context
processor *LiquidityPoolsChangeProcessor
mockQ *history.MockQLiquidityPools
sequence uint32
}

func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQLiquidityPools{}

s.mockLiquidityPoolBatchInsertBuilder = &history.MockLiquidityPoolBatchInsertBuilder{}
s.mockQ.
On("NewLiquidityPoolBatchInsertBuilder").
Return(s.mockLiquidityPoolBatchInsertBuilder).Twice()
s.mockLiquidityPoolBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

s.sequence = 456
s.processor = NewLiquidityPoolsChangeProcessor(s.mockQ, s.sequence)
}
Expand Down Expand Up @@ -183,28 +170,6 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
},
},
}

liquidityPool := history.LiquidityPool{
PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 34,
TrustlineCount: 52115,
ShareCount: 412241,
AssetReserves: []history.LiquidityPoolAssetReserve{
{
xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
450,
},
{
xdr.MustNewNativeAsset(),
500,
},
},
LastModifiedLedger: 123,
}
s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).
Return(nil).Once()

err := s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeLiquidityPool,
Pre: nil,
Expand All @@ -226,7 +191,7 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
},
},
}
s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).Return(nil).Once()

pre.LastModifiedLedgerSeq = pre.LastModifiedLedgerSeq - 1
err = s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeLiquidityPool,
Expand All @@ -235,7 +200,25 @@ func (s *LiquidityPoolsChangeProcessorTestSuiteLedger) TestNewLiquidityPool() {
})
s.Assert().NoError(err)

s.mockLiquidityPoolBatchInsertBuilder.On("Add", liquidityPool).Return(nil).Once()
postLP := history.LiquidityPool{
PoolID: "cafebabedeadbeef000000000000000000000000000000000000000000000000",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 34,
TrustlineCount: 52115,
ShareCount: 412241,
AssetReserves: []history.LiquidityPoolAssetReserve{
{
xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
450,
},
{
xdr.MustNewNativeAsset(),
500,
},
},
LastModifiedLedger: 123,
}
s.mockQ.On("UpsertLiquidityPools", s.ctx, []history.LiquidityPool{postLP}).Return(nil).Once()
s.mockQ.On("CompactLiquidityPools", s.ctx, s.sequence-100).Return(int64(0), nil).Once()
}

Expand Down
Loading