Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Use COPY for inserting into offers table #5111

Merged
merged 2 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockOffersBatchInsertBuilder struct {
mock.Mock
}

func (m *MockOffersBatchInsertBuilder) Add(offer Offer) error {
a := m.Called(offer)
return a.Error(0)
}

func (m *MockOffersBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,8 @@ func (m *MockQOffers) CompactOffers(ctx context.Context, cutOffSequence uint32)
a := m.Called(ctx, cutOffSequence)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQOffers) NewOffersBatchInsertBuilder() OffersBatchInsertBuilder {
a := m.Called()
return a.Get(0).(OffersBatchInsertBuilder)
}
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type QOffers interface {
GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error)
UpsertOffers(ctx context.Context, offers []Offer) error
CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error)
NewOffersBatchInsertBuilder() OffersBatchInsertBuilder
}

func (q *Q) CountOffers(ctx context.Context) (int, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package history

Check failure on line 1 in services/horizon/internal/db2/history/offers_batch_insert_builder.go

View workflow job for this annotation

GitHub Actions / golangci

1-39 lines are duplicate of `services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go:1-40` (dupl)

import (
"context"

"github.com/stellar/go/support/db"
)

// OffersBatchInsertBuilder is used to insert offers into the offers table
type OffersBatchInsertBuilder interface {
Add(offer Offer) error
Exec(ctx context.Context) error
}

// OffersBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type offersBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewOffersBatchInsertBuilder constructs a new OffersBatchInsertBuilder instance
func (q *Q) NewOffersBatchInsertBuilder() OffersBatchInsertBuilder {
return &offersBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "offers",
}
}

// Add adds a new offer to the batch
func (i *offersBatchInsertBuilder) Add(offer Offer) error {
return i.builder.RowStruct(offer)
}

// Exec writes the batch of offers to the database.
func (i *offersBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
19 changes: 19 additions & 0 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -150,6 +154,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -203,6 +211,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -248,6 +260,9 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder).Twice()

runner := ProcessorRunner{
ctx: ctx,
historyQ: q,
Expand Down Expand Up @@ -587,6 +602,10 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont

mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil)
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder)

q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{})

return []interface{}{mockAccountSignersBatchInsertBuilder,
Expand Down
22 changes: 18 additions & 4 deletions services/horizon/internal/ingest/processors/offers_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type OffersProcessor struct {
offersQ history.QOffers
sequence uint32

cache *ingest.ChangeCompactor
cache *ingest.ChangeCompactor
insertBatchBuilder history.OffersBatchInsertBuilder
}

func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcessor {
Expand All @@ -28,6 +29,7 @@ func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcess

func (p *OffersProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.insertBatchBuilder = p.offersQ.NewOffersBatchInsertBuilder()
}

func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -43,7 +45,6 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang
if err := p.flushCache(ctx); err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}

return nil
Expand All @@ -67,12 +68,20 @@ func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer
}

func (p *OffersProcessor) flushCache(ctx context.Context) error {
defer p.reset()

var batchUpsertOffers []history.Offer
changes := p.cache.GetChanges()
for _, change := range changes {
switch {
case change.Post != nil:
// Created and updated
case change.Pre == nil && change.Post != nil:
// Created
err := p.insertBatchBuilder.Add(p.ledgerEntryToRow(change.Post))
if err != nil {
return errors.New("Error adding to OffersBatchInsertBuilder")
}
case change.Pre != nil && change.Post != nil:
// Updated
row := p.ledgerEntryToRow(change.Post)
batchUpsertOffers = append(batchUpsertOffers, row)
case change.Pre != nil && change.Post == nil:
Expand All @@ -86,6 +95,11 @@ func (p *OffersProcessor) flushCache(ctx context.Context) error {
}
}

err := p.insertBatchBuilder.Exec(ctx)
if err != nil {
return errors.New("Error executing OffersBatchInsertBuilder")
}

if len(batchUpsertOffers) > 0 {
err := p.offersQ.UpsertOffers(ctx, batchUpsertOffers)
if err != nil {
Expand Down
119 changes: 76 additions & 43 deletions services/horizon/internal/ingest/processors/offers_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ func TestOffersProcessorTestSuiteState(t *testing.T) {

type OffersProcessorTestSuiteState struct {
suite.Suite
ctx context.Context
processor *OffersProcessor
mockQ *history.MockQOffers
sequence uint32
ctx context.Context
processor *OffersProcessor
mockQ *history.MockQOffers
sequence uint32
mockOffersBatchInsertBuilder *history.MockOffersBatchInsertBuilder
}

func (s *OffersProcessorTestSuiteState) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQOffers{}

s.mockOffersBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{}
s.mockQ.On("NewOffersBatchInsertBuilder").Return(s.mockOffersBatchInsertBuilder).Twice()
s.mockOffersBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

s.sequence = 456
s.processor = NewOffersProcessor(s.mockQ, s.sequence)
}
Expand All @@ -57,15 +62,13 @@ func (s *OffersProcessorTestSuiteState) TestCreateOffer() {
LastModifiedLedgerSeq: lastModifiedLedgerSeq,
}

s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{
{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 1,
Pricen: int32(1),
Priced: int32(2),
Price: float64(0.5),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
urvisavla marked this conversation as resolved.
Show resolved Hide resolved
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 1,
Pricen: int32(1),
Priced: int32(2),
Price: float64(0.5),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()

err := s.processor.ProcessChange(s.ctx, ingest.Change{
Expand All @@ -82,16 +85,21 @@ func TestOffersProcessorTestSuiteLedger(t *testing.T) {

type OffersProcessorTestSuiteLedger struct {
suite.Suite
ctx context.Context
processor *OffersProcessor
mockQ *history.MockQOffers
sequence uint32
ctx context.Context
processor *OffersProcessor
mockQ *history.MockQOffers
sequence uint32
mockOffersBatchInsertBuilder *history.MockOffersBatchInsertBuilder
}

func (s *OffersProcessorTestSuiteLedger) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQOffers{}

s.mockOffersBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{}
s.mockQ.On("NewOffersBatchInsertBuilder").Return(s.mockOffersBatchInsertBuilder).Twice()
s.mockOffersBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

s.sequence = 456
s.processor = NewOffersProcessor(s.mockQ, s.sequence)
}
Expand Down Expand Up @@ -166,15 +174,13 @@ func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() {
s.Assert().NoError(err)

// We use LedgerEntryChangesCache so all changes are squashed
s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{
{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 2,
Pricen: int32(1),
Priced: int32(6),
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 2,
Pricen: int32(1),
Priced: int32(6),
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()
}

Expand Down Expand Up @@ -223,6 +229,12 @@ func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() {
Price: xdr.Price{2, 3},
}

yetAnotherOffer := xdr.OfferEntry{
SellerId: xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"),
OfferId: xdr.Int64(4),
Price: xdr.Price{2, 6},
}

updatedEntry := xdr.LedgerEntry{
LastModifiedLedgerSeq: lastModifiedLedgerSeq,
Data: xdr.LedgerEntryData{
Expand Down Expand Up @@ -257,6 +269,37 @@ func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() {
})
s.Assert().NoError(err)

err = s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeOffer,
Pre: nil,
Post: &xdr.LedgerEntry{
LastModifiedLedgerSeq: lastModifiedLedgerSeq,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeOffer,
Offer: &yetAnotherOffer,
},
},
})
s.Assert().NoError(err)

s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P",
OfferID: 3,
Pricen: int32(2),
Priced: int32(3),
Price: float64(2) / float64(3),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()

s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
SellerID: "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
OfferID: 4,
Pricen: int32(2),
Priced: int32(6),
Price: float64(2) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()

s.mockQ.On("UpsertOffers", s.ctx, mock.Anything).Run(func(args mock.Arguments) {
// To fix order issue due to using ChangeCompactor
offers := args.Get(1).([]history.Offer)
Expand All @@ -271,14 +314,6 @@ func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() {
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
{
SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P",
OfferID: 3,
Pricen: int32(2),
Priced: int32(3),
Price: float64(2) / float64(3),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
},
)
}).Return(nil).Once()
Expand Down Expand Up @@ -367,15 +402,13 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() {
s.Assert().NoError(err)

// We use LedgerEntryChangesCache so all changes are squashed
s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{
{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 2,
Pricen: 1,
Priced: 6,
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 2,
Pricen: 1,
Priced: 6,
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()

s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once()
Expand Down
Loading