diff --git a/services/horizon/internal/db2/history/mock_q_offers.go b/services/horizon/internal/db2/history/mock_q_offers.go index 11eb9ff94f..84150e49e9 100644 --- a/services/horizon/internal/db2/history/mock_q_offers.go +++ b/services/horizon/internal/db2/history/mock_q_offers.go @@ -39,8 +39,8 @@ func (m *MockQOffers) UpdateOffer(row Offer) (int64, error) { return a.Get(0).(int64), a.Error(1) } -func (m *MockQOffers) RemoveOffer(offerID int64, lastModifiedLedger uint32) (int64, error) { - a := m.Called(offerID, lastModifiedLedger) +func (m *MockQOffers) RemoveOffers(offerIDs []int64, lastModifiedLedger uint32) (int64, error) { + a := m.Called(offerIDs, lastModifiedLedger) return a.Get(0).(int64), a.Error(1) } diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index 56d42a4211..515cf3928b 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -13,7 +13,7 @@ type QOffers interface { GetUpdatedOffers(newerThanSequence uint32) ([]Offer, error) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder UpdateOffer(offer Offer) (int64, error) - RemoveOffer(offerID int64, lastModifiedLedger uint32) (int64, error) + RemoveOffers(offerIDs []int64, lastModifiedLedger uint32) (int64, error) CompactOffers(cutOffSequence uint32) (int64, error) } @@ -104,13 +104,13 @@ func (q *Q) UpdateOffer(offer Offer) (int64, error) { return result.RowsAffected() } -// RemoveOffer marks a row in the offers table as deleted. +// RemoveOffers marks rows in the offers table as deleted. // Returns number of rows affected and error. -func (q *Q) RemoveOffer(offerID int64, lastModifiedLedger uint32) (int64, error) { +func (q *Q) RemoveOffers(offerIDs []int64, lastModifiedLedger uint32) (int64, error) { sql := sq.Update("offers"). Set("deleted", true). Set("last_modified_ledger", lastModifiedLedger). - Where("offer_id = ?", offerID) + Where(map[string]interface{}{"offer_id": offerIDs}) result, err := q.Exec(sql) if err != nil { diff --git a/services/horizon/internal/db2/history/offers_test.go b/services/horizon/internal/db2/history/offers_test.go index 64224b0792..79ee19c0eb 100644 --- a/services/horizon/internal/db2/history/offers_test.go +++ b/services/horizon/internal/db2/history/offers_test.go @@ -219,7 +219,7 @@ func TestRemoveNonExistantOffer(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - numAffected, err := q.RemoveOffer(12345, 1236) + numAffected, err := q.RemoveOffers([]int64{12345}, 1236) tt.Assert.NoError(err) tt.Assert.Equal(int64(0), numAffected) } @@ -238,7 +238,7 @@ func TestRemoveOffer(t *testing.T) { tt.Assert.Equal(offers[0], eurOffer) expectedUpdates := offers - rowsAffected, err := q.RemoveOffer(eurOffer.OfferID, 1236) + rowsAffected, err := q.RemoveOffers([]int64{eurOffer.OfferID}, 1236) tt.Assert.Equal(int64(1), rowsAffected) tt.Assert.NoError(err) expectedUpdates[0].LastModifiedLedger = 1236 @@ -293,7 +293,7 @@ func TestGetOffers(t *testing.T) { // check removed offers aren't included in GetOffer queries err = insertOffer(q, threeEurOffer) tt.Assert.NoError(err) - count, err := q.RemoveOffer(threeEurOffer.OfferID, 1235) + count, err := q.RemoveOffers([]int64{threeEurOffer.OfferID}, 1235) tt.Assert.NoError(err) tt.Assert.Equal(int64(1), count) diff --git a/services/horizon/internal/db2/history/orderbook_test.go b/services/horizon/internal/db2/history/orderbook_test.go index 678bcfc283..004e4d6a27 100644 --- a/services/horizon/internal/db2/history/orderbook_test.go +++ b/services/horizon/internal/db2/history/orderbook_test.go @@ -280,7 +280,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) { for i, offer := range offers { var count int64 - count, err = q.RemoveOffer(offer.OfferID, uint32(i+2)) + count, err = q.RemoveOffers([]int64{offer.OfferID}, uint32(i+2)) assert.NoError(t, err) assert.Equal(t, int64(1), count) } diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index f244090c15..56ffcdd77a 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -16,8 +16,9 @@ type OffersProcessor struct { offersQ history.QOffers sequence uint32 - cache *io.LedgerEntryChangeCache - batch history.OffersBatchInsertBuilder + cache *io.LedgerEntryChangeCache + insertBatch history.OffersBatchInsertBuilder + removeBatch []int64 } func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcessor { @@ -27,8 +28,9 @@ func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcess } func (p *OffersProcessor) reset() { - p.batch = p.offersQ.NewOffersBatchInsertBuilder(maxBatchSize) p.cache = io.NewLedgerEntryChangeCache() + p.insertBatch = p.offersQ.NewOffersBatchInsertBuilder(maxBatchSize) + p.removeBatch = []int64{} } func (p *OffersProcessor) ProcessChange(change io.Change) error { @@ -80,14 +82,14 @@ func (p *OffersProcessor) flushCache() error { // Created action = "inserting" row := p.ledgerEntryToRow(change.Post) - err = p.batch.Add(row) + err = p.insertBatch.Add(row) rowsAffected = 1 // We don't track this when batch inserting case change.Pre != nil && change.Post == nil: // Removed action = "removing" offer := change.Pre.Data.MustOffer() - offerID = offer.OfferId - rowsAffected, err = p.offersQ.RemoveOffer(int64(offer.OfferId), p.sequence) + p.removeBatch = append(p.removeBatch, int64(offer.OfferId)) + rowsAffected = 1 // We don't track this when batch removing default: // Updated action = "updating" @@ -111,10 +113,18 @@ func (p *OffersProcessor) flushCache() error { } } - err := p.batch.Exec() + err := p.insertBatch.Exec() if err != nil { return errors.Wrap(err, "error executing batch") } + + if len(p.removeBatch) > 0 { + _, err = p.offersQ.RemoveOffers(p.removeBatch, p.sequence) + if err != nil { + return errors.Wrap(err, "error in RemoveOffers") + } + } + return nil } diff --git a/services/horizon/internal/ingest/processors/offers_processor_test.go b/services/horizon/internal/ingest/processors/offers_processor_test.go index 24156109b4..46718f7d2b 100644 --- a/services/horizon/internal/ingest/processors/offers_processor_test.go +++ b/services/horizon/internal/ingest/processors/offers_processor_test.go @@ -9,6 +9,7 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -276,7 +277,7 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { }) s.Assert().NoError(err) - s.mockQ.On("RemoveOffer", int64(3), s.sequence).Return(int64(1), nil).Once() + s.mockQ.On("RemoveOffers", []int64{3}, s.sequence).Return(int64(1), nil).Once() s.mockBatchInsertBuilder.On("Exec").Return(nil).Once() s.mockQ.On("CompactOffers", s.sequence-100).Return(int64(0), nil).Once() @@ -346,7 +347,7 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.Assert().NoError(s.processor.Commit()) } -func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { +func (s *OffersProcessorTestSuiteLedger) TestRemoveMultipleOffers() { err := s.processor.ProcessChange(io.Change{ Type: xdr.LedgerEntryTypeOffer, Pre: &xdr.LedgerEntry{ @@ -363,10 +364,30 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { }) s.Assert().NoError(err) - s.mockQ.On("RemoveOffer", int64(3), s.sequence).Return(int64(0), nil).Once() + err = s.processor.ProcessChange(io.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(4), + Price: xdr.Price{3, 1}, + }, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + s.mockBatchInsertBuilder.On("Exec").Return(nil).Once() + s.mockQ.On("CompactOffers", s.sequence-100).Return(int64(0), nil).Once() + s.mockQ.On("RemoveOffers", mock.Anything, s.sequence).Run(func(args mock.Arguments) { + // To fix order issue due to using LedgerEntryChangeCache + ids := args.Get(0).([]int64) + s.Assert().ElementsMatch(ids, []int64{3, 4}) + }).Return(int64(0), nil).Once() err = s.processor.Commit() - s.Assert().Error(err) - s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "error flushing cache: 0 rows affected when removing offer 3") + s.Assert().NoError(err) }