Skip to content

Commit

Permalink
services/horizon: Use FastBatchInsertBuilder(COPY) for inserting into…
Browse files Browse the repository at this point in the history
… offers table
  • Loading branch information
urvisavla committed Nov 14, 2023
1 parent 7d248ce commit 6a5491e
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockOffersBatchInsertBuilder struct {
mock.Mock
}

func (m *MockOffersBatchInsertBuilder) Add(offer Offer) error {
a := m.Called(offer)
return a.Error(0)
}

func (m *MockOffersBatchInsertBuilder) Exec(ctx context.Context) error {
a := m.Called(ctx)
return a.Error(0)
}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,8 @@ func (m *MockQOffers) CompactOffers(ctx context.Context, cutOffSequence uint32)
a := m.Called(ctx, cutOffSequence)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockQOffers) NewOffersBatchInsertBuilder() OffersBatchInsertBuilder {
a := m.Called()
return a.Get(0).(OffersBatchInsertBuilder)
}
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type QOffers interface {
GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error)
UpsertOffers(ctx context.Context, offers []Offer) error
CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error)
NewOffersBatchInsertBuilder() OffersBatchInsertBuilder
}

func (q *Q) CountOffers(ctx context.Context) (int, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package history

Check failure on line 1 in services/horizon/internal/db2/history/offers_batch_insert_builder.go

View workflow job for this annotation

GitHub Actions / golangci

1-39 lines are duplicate of `services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go:1-40` (dupl)

import (
"context"

"github.com/stellar/go/support/db"
)

// OffersBatchInsertBuilder is used to insert offers into the offers table
type OffersBatchInsertBuilder interface {
Add(offer Offer) error
Exec(ctx context.Context) error
}

// OffersBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder
type offersBatchInsertBuilder struct {
session db.SessionInterface
builder db.FastBatchInsertBuilder
table string
}

// NewOffersBatchInsertBuilder constructs a new OffersBatchInsertBuilder instance
func (q *Q) NewOffersBatchInsertBuilder() OffersBatchInsertBuilder {
return &offersBatchInsertBuilder{
session: q,
builder: db.FastBatchInsertBuilder{},
table: "offers",
}
}

// Add adds a new claimant to the batch
func (i *offersBatchInsertBuilder) Add(offer Offer) error {
return i.builder.RowStruct(offer)
}

// Exec writes the batch of claimants to the database.
func (i *offersBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx, i.session, i.table)
}
19 changes: 19 additions & 0 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -138,6 +142,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -185,6 +193,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t
mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()

mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once()
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Once()

q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000).
Return(nil)

Expand Down Expand Up @@ -224,6 +236,9 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder").
Return(mockClaimantsBatchInsertBuilder).Twice()

mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder).Twice()

runner := ProcessorRunner{
ctx: ctx,
historyQ: q,
Expand Down Expand Up @@ -551,6 +566,10 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont
Return(mockClaimableBalanceBatchInsertBuilder)
mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil)

mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{}
mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil)
q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder)

q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{})

return []interface{}{mockAccountSignersBatchInsertBuilder,
Expand Down
22 changes: 18 additions & 4 deletions services/horizon/internal/ingest/processors/offers_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type OffersProcessor struct {
offersQ history.QOffers
sequence uint32

cache *ingest.ChangeCompactor
cache *ingest.ChangeCompactor
insertBatchBuilder history.OffersBatchInsertBuilder
}

func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcessor {
Expand All @@ -28,6 +29,7 @@ func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcess

func (p *OffersProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.insertBatchBuilder = p.offersQ.NewOffersBatchInsertBuilder()
}

func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
Expand All @@ -43,7 +45,6 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang
if err := p.flushCache(ctx); err != nil {
return errors.Wrap(err, "error in Commit")
}
p.reset()
}

return nil
Expand All @@ -67,12 +68,20 @@ func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer
}

func (p *OffersProcessor) flushCache(ctx context.Context) error {
defer p.reset()

var batchUpsertOffers []history.Offer
changes := p.cache.GetChanges()
for _, change := range changes {
switch {
case change.Post != nil:
// Created and updated
case change.Pre == nil && change.Post != nil:
// Created
err := p.insertBatchBuilder.Add(p.ledgerEntryToRow(change.Post))
if err != nil {
return errors.New("Error adding to OffersBatchInsertBuilder")
}
case change.Pre != nil && change.Post != nil:
// Updated
row := p.ledgerEntryToRow(change.Post)
batchUpsertOffers = append(batchUpsertOffers, row)
case change.Pre != nil && change.Post == nil:
Expand All @@ -86,6 +95,11 @@ func (p *OffersProcessor) flushCache(ctx context.Context) error {
}
}

err := p.insertBatchBuilder.Exec(ctx)
if err != nil {
return errors.New("Error executing OffersBatchInsertBuilder")
}

if len(batchUpsertOffers) > 0 {
err := p.offersQ.UpsertOffers(ctx, batchUpsertOffers)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ func TestOffersProcessorTestSuiteState(t *testing.T) {

type OffersProcessorTestSuiteState struct {
suite.Suite
ctx context.Context
processor *OffersProcessor
mockQ *history.MockQOffers
sequence uint32
ctx context.Context
processor *OffersProcessor
mockQ *history.MockQOffers
sequence uint32
mockOffersBatchInsertBuilder *history.MockOffersBatchInsertBuilder
}

func (s *OffersProcessorTestSuiteState) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQOffers{}

s.mockOffersBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{}
s.mockQ.On("NewOffersBatchInsertBuilder").Return(s.mockOffersBatchInsertBuilder).Twice()
s.mockOffersBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

s.sequence = 456
s.processor = NewOffersProcessor(s.mockQ, s.sequence)
}
Expand All @@ -57,15 +62,13 @@ func (s *OffersProcessorTestSuiteState) TestCreateOffer() {
LastModifiedLedgerSeq: lastModifiedLedgerSeq,
}

s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{
{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 1,
Pricen: int32(1),
Priced: int32(2),
Price: float64(0.5),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 1,
Pricen: int32(1),
Priced: int32(2),
Price: float64(0.5),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()

err := s.processor.ProcessChange(s.ctx, ingest.Change{
Expand All @@ -82,16 +85,21 @@ func TestOffersProcessorTestSuiteLedger(t *testing.T) {

type OffersProcessorTestSuiteLedger struct {
suite.Suite
ctx context.Context
processor *OffersProcessor
mockQ *history.MockQOffers
sequence uint32
ctx context.Context
processor *OffersProcessor
mockQ *history.MockQOffers
sequence uint32
mockOffersBatchInsertBuilder *history.MockOffersBatchInsertBuilder
}

func (s *OffersProcessorTestSuiteLedger) SetupTest() {
s.ctx = context.Background()
s.mockQ = &history.MockQOffers{}

s.mockOffersBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{}
s.mockQ.On("NewOffersBatchInsertBuilder").Return(s.mockOffersBatchInsertBuilder).Twice()
s.mockOffersBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

s.sequence = 456
s.processor = NewOffersProcessor(s.mockQ, s.sequence)
}
Expand Down Expand Up @@ -166,15 +174,13 @@ func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() {
s.Assert().NoError(err)

// We use LedgerEntryChangesCache so all changes are squashed
s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{
{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 2,
Pricen: int32(1),
Priced: int32(6),
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 2,
Pricen: int32(1),
Priced: int32(6),
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()
}

Expand Down Expand Up @@ -257,6 +263,15 @@ func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() {
})
s.Assert().NoError(err)

s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P",
OfferID: 3,
Pricen: int32(2),
Priced: int32(3),
Price: float64(2) / float64(3),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()

s.mockQ.On("UpsertOffers", s.ctx, mock.Anything).Run(func(args mock.Arguments) {
// To fix order issue due to using ChangeCompactor
offers := args.Get(1).([]history.Offer)
Expand All @@ -271,14 +286,6 @@ func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() {
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
{
SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P",
OfferID: 3,
Pricen: int32(2),
Priced: int32(3),
Price: float64(2) / float64(3),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
},
)
}).Return(nil).Once()
Expand Down Expand Up @@ -367,15 +374,13 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() {
s.Assert().NoError(err)

// We use LedgerEntryChangesCache so all changes are squashed
s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{
{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 2,
Pricen: 1,
Priced: 6,
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
},
s.mockOffersBatchInsertBuilder.On("Add", history.Offer{
SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML",
OfferID: 2,
Pricen: 1,
Priced: 6,
Price: float64(1) / float64(6),
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
}).Return(nil).Once()

s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once()
Expand Down

0 comments on commit 6a5491e

Please sign in to comment.