diff --git a/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go new file mode 100644 index 0000000000..5caa81f008 --- /dev/null +++ b/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go @@ -0,0 +1,21 @@ +package history + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +type MockOffersBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockOffersBatchInsertBuilder) Add(offer Offer) error { + a := m.Called(offer) + return a.Error(0) +} + +func (m *MockOffersBatchInsertBuilder) Exec(ctx context.Context) error { + a := m.Called(ctx) + return a.Error(0) +} diff --git a/services/horizon/internal/db2/history/mock_q_offers.go b/services/horizon/internal/db2/history/mock_q_offers.go index 0c4bc5e9bb..66ae502542 100644 --- a/services/horizon/internal/db2/history/mock_q_offers.go +++ b/services/horizon/internal/db2/history/mock_q_offers.go @@ -40,3 +40,8 @@ func (m *MockQOffers) CompactOffers(ctx context.Context, cutOffSequence uint32) a := m.Called(ctx, cutOffSequence) return a.Get(0).(int64), a.Error(1) } + +func (m *MockQOffers) NewOffersBatchInsertBuilder() OffersBatchInsertBuilder { + a := m.Called() + return a.Get(0).(OffersBatchInsertBuilder) +} diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index c80d67c854..98a08fef87 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -20,6 +20,7 @@ type QOffers interface { GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error) UpsertOffers(ctx context.Context, offers []Offer) error CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error) + NewOffersBatchInsertBuilder() OffersBatchInsertBuilder } func (q *Q) CountOffers(ctx context.Context) (int, error) { diff --git a/services/horizon/internal/db2/history/offers_batch_insert_builder.go b/services/horizon/internal/db2/history/offers_batch_insert_builder.go new file mode 100644 index 0000000000..8a345d08a5 --- /dev/null +++ b/services/horizon/internal/db2/history/offers_batch_insert_builder.go @@ -0,0 +1,39 @@ +package history + +import ( + "context" + + "github.com/stellar/go/support/db" +) + +// OffersBatchInsertBuilder is used to insert offers into the offers table +type OffersBatchInsertBuilder interface { + Add(offer Offer) error + Exec(ctx context.Context) error +} + +// OffersBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder +type offersBatchInsertBuilder struct { + session db.SessionInterface + builder db.FastBatchInsertBuilder + table string +} + +// NewOffersBatchInsertBuilder constructs a new OffersBatchInsertBuilder instance +func (q *Q) NewOffersBatchInsertBuilder() OffersBatchInsertBuilder { + return &offersBatchInsertBuilder{ + session: q, + builder: db.FastBatchInsertBuilder{}, + table: "offers", + } +} + +// Add adds a new offer to the batch +func (i *offersBatchInsertBuilder) Add(offer Offer) error { + return i.builder.RowStruct(offer) +} + +// Exec writes the batch of offers to the database. +func (i *offersBatchInsertBuilder) Exec(ctx context.Context) error { + return i.builder.Exec(ctx, i.session, i.table) +} diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index f1ac314538..a8470556a4 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -61,6 +61,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} + mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -138,6 +142,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} + mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Twice() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -185,6 +193,10 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} + mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOffersBatchInsertBuilder).Once() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -224,6 +236,9 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). Return(mockClaimantsBatchInsertBuilder).Twice() + mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} + q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder).Twice() + runner := ProcessorRunner{ ctx: ctx, historyQ: q, @@ -551,6 +566,10 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont Return(mockClaimableBalanceBatchInsertBuilder) mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil) + mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} + mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil) + q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder) + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) return []interface{}{mockAccountSignersBatchInsertBuilder, diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index af1d80693e..13ee130d9e 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -17,7 +17,8 @@ type OffersProcessor struct { offersQ history.QOffers sequence uint32 - cache *ingest.ChangeCompactor + cache *ingest.ChangeCompactor + insertBatchBuilder history.OffersBatchInsertBuilder } func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcessor { @@ -28,6 +29,7 @@ func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcess func (p *OffersProcessor) reset() { p.cache = ingest.NewChangeCompactor() + p.insertBatchBuilder = p.offersQ.NewOffersBatchInsertBuilder() } func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { @@ -43,7 +45,6 @@ func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Chang if err := p.flushCache(ctx); err != nil { return errors.Wrap(err, "error in Commit") } - p.reset() } return nil @@ -67,12 +68,20 @@ func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer } func (p *OffersProcessor) flushCache(ctx context.Context) error { + defer p.reset() + var batchUpsertOffers []history.Offer changes := p.cache.GetChanges() for _, change := range changes { switch { - case change.Post != nil: - // Created and updated + case change.Pre == nil && change.Post != nil: + // Created + err := p.insertBatchBuilder.Add(p.ledgerEntryToRow(change.Post)) + if err != nil { + return errors.New("Error adding to OffersBatchInsertBuilder") + } + case change.Pre != nil && change.Post != nil: + // Updated row := p.ledgerEntryToRow(change.Post) batchUpsertOffers = append(batchUpsertOffers, row) case change.Pre != nil && change.Post == nil: @@ -86,6 +95,11 @@ func (p *OffersProcessor) flushCache(ctx context.Context) error { } } + err := p.insertBatchBuilder.Exec(ctx) + if err != nil { + return errors.New("Error executing OffersBatchInsertBuilder") + } + if len(batchUpsertOffers) > 0 { err := p.offersQ.UpsertOffers(ctx, batchUpsertOffers) if err != nil { diff --git a/services/horizon/internal/ingest/processors/offers_processor_test.go b/services/horizon/internal/ingest/processors/offers_processor_test.go index 21905dc1df..5cda515a28 100644 --- a/services/horizon/internal/ingest/processors/offers_processor_test.go +++ b/services/horizon/internal/ingest/processors/offers_processor_test.go @@ -21,16 +21,21 @@ func TestOffersProcessorTestSuiteState(t *testing.T) { type OffersProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *OffersProcessor - mockQ *history.MockQOffers - sequence uint32 + ctx context.Context + processor *OffersProcessor + mockQ *history.MockQOffers + sequence uint32 + mockOffersBatchInsertBuilder *history.MockOffersBatchInsertBuilder } func (s *OffersProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQOffers{} + s.mockOffersBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{} + s.mockQ.On("NewOffersBatchInsertBuilder").Return(s.mockOffersBatchInsertBuilder).Twice() + s.mockOffersBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.sequence = 456 s.processor = NewOffersProcessor(s.mockQ, s.sequence) } @@ -57,15 +62,13 @@ func (s *OffersProcessorTestSuiteState) TestCreateOffer() { LastModifiedLedgerSeq: lastModifiedLedgerSeq, } - s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ - { - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 1, - Pricen: int32(1), - Priced: int32(2), - Price: float64(0.5), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - }, + s.mockOffersBatchInsertBuilder.On("Add", history.Offer{ + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 1, + Pricen: int32(1), + Priced: int32(2), + Price: float64(0.5), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), }).Return(nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ @@ -82,16 +85,21 @@ func TestOffersProcessorTestSuiteLedger(t *testing.T) { type OffersProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *OffersProcessor - mockQ *history.MockQOffers - sequence uint32 + ctx context.Context + processor *OffersProcessor + mockQ *history.MockQOffers + sequence uint32 + mockOffersBatchInsertBuilder *history.MockOffersBatchInsertBuilder } func (s *OffersProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQOffers{} + s.mockOffersBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{} + s.mockQ.On("NewOffersBatchInsertBuilder").Return(s.mockOffersBatchInsertBuilder).Twice() + s.mockOffersBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.sequence = 456 s.processor = NewOffersProcessor(s.mockQ, s.sequence) } @@ -166,15 +174,13 @@ func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ - { - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - }, + s.mockOffersBatchInsertBuilder.On("Add", history.Offer{ + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), }).Return(nil).Once() } @@ -257,6 +263,15 @@ func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() { }) s.Assert().NoError(err) + s.mockOffersBatchInsertBuilder.On("Add", history.Offer{ + SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P", + OfferID: 3, + Pricen: int32(2), + Priced: int32(3), + Price: float64(2) / float64(3), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }).Return(nil).Once() + s.mockQ.On("UpsertOffers", s.ctx, mock.Anything).Run(func(args mock.Arguments) { // To fix order issue due to using ChangeCompactor offers := args.Get(1).([]history.Offer) @@ -271,14 +286,6 @@ func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() { Price: float64(1) / float64(6), LastModifiedLedger: uint32(lastModifiedLedgerSeq), }, - { - SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P", - OfferID: 3, - Pricen: int32(2), - Priced: int32(3), - Price: float64(2) / float64(3), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - }, }, ) }).Return(nil).Once() @@ -367,15 +374,13 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ - { - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: 1, - Priced: 6, - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - }, + s.mockOffersBatchInsertBuilder.On("Add", history.Offer{ + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: 1, + Priced: 6, + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), }).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once()