diff --git a/services/horizon/internal/action_offers_test.go b/services/horizon/internal/action_offers_test.go index 737afe2fac..13458db9fe 100644 --- a/services/horizon/internal/action_offers_test.go +++ b/services/horizon/internal/action_offers_test.go @@ -68,12 +68,8 @@ func TestOfferActions_Show(t *testing.T) { LastModifiedLedger: uint32(4), } - batch := q.NewOffersBatchInsertBuilder(3) - err = batch.Add(ctx, eurOffer) + err = q.UpsertOffers(ctx, []history.Offer{eurOffer, usdOffer}) ht.Assert.NoError(err) - err = batch.Add(ctx, usdOffer) - ht.Assert.NoError(err) - ht.Assert.NoError(batch.Exec(ctx)) w := ht.Get("/offers") if ht.Assert.Equal(200, w.Code) { diff --git a/services/horizon/internal/actions/offer_test.go b/services/horizon/internal/actions/offer_test.go index 781265aaf0..108af3236d 100644 --- a/services/horizon/internal/actions/offer_test.go +++ b/services/horizon/internal/actions/offer_test.go @@ -89,12 +89,8 @@ func TestGetOfferByIDHandler(t *testing.T) { }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) - batch := q.NewOffersBatchInsertBuilder(0) - err = batch.Add(tt.Ctx, eurOffer) + err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, usdOffer}) tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) - tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) for _, testCase := range []struct { name string @@ -200,14 +196,8 @@ func TestGetOffersHandler(t *testing.T) { }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) - batch := q.NewOffersBatchInsertBuilder(0) - err = batch.Add(tt.Ctx, eurOffer) - tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, twoEurOffer) + err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer}) tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) - tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) t.Run("No filter", func(t *testing.T) { records, err := handler.GetResourcePage( @@ -477,13 +467,8 @@ func TestGetAccountOffersHandler(t *testing.T) { q := &history.Q{tt.HorizonSession()} handler := GetAccountOffersHandler{} - batch := q.NewOffersBatchInsertBuilder(0) - err := batch.Add(tt.Ctx, eurOffer) - err = batch.Add(tt.Ctx, twoEurOffer) - tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) + err := q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer}) tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) records, err := handler.GetResourcePage( httptest.NewRecorder(), diff --git a/services/horizon/internal/actions/orderbook_test.go b/services/horizon/internal/actions/orderbook_test.go index eef810d606..b73191361a 100644 --- a/services/horizon/internal/actions/orderbook_test.go +++ b/services/horizon/internal/actions/orderbook_test.go @@ -575,12 +575,7 @@ func TestOrderbookGetResource(t *testing.T) { } assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"})) - - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 0ef6a958ae..4ec9aa36d9 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -8,12 +8,14 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "strings" "sync" "time" sq "github.com/Masterminds/squirrel" "github.com/guregu/null" "github.com/jmoiron/sqlx" + "github.com/lib/pq" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/support/db" @@ -573,6 +575,14 @@ type ManageOffer struct { OfferID int64 `json:"offer_id"` } +// upsertField is used in upsertRows function generating upsert query for +// different tables. +type upsertField struct { + name string + dbType string + objects []interface{} +} + // Offer is row of data from the `offers` table from horizon DB type Offer struct { SellerID string `db:"seller_id"` @@ -591,16 +601,6 @@ type Offer struct { Sponsor null.String `db:"sponsor"` } -type OffersBatchInsertBuilder interface { - Add(ctx context.Context, offer Offer) error - Exec(ctx context.Context) error -} - -// offersBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder -type offersBatchInsertBuilder struct { - builder db.BatchInsertBuilder -} - // OperationsQ is a helper struct to aid in configuring queries that loads // slices of Operation structs. type OperationsQ struct { @@ -765,15 +765,6 @@ func (q *Q) NewAccountDataBatchInsertBuilder(maxBatchSize int) AccountDataBatchI } } -func (q *Q) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder { - return &offersBatchInsertBuilder{ - builder: db.BatchInsertBuilder{ - Table: q.GetTable("offers"), - MaxBatchSize: maxBatchSize, - }, - } -} - func (q *Q) NewTrustLinesBatchInsertBuilder(maxBatchSize int) TrustLinesBatchInsertBuilder { return &trustLinesBatchInsertBuilder{ builder: db.BatchInsertBuilder{ @@ -853,3 +844,67 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error { } return nil } + +// upsertRows builds and executes an upsert query that allows very fast upserts +// to a given table. The final query is of form: +// +// WITH r AS +// (SELECT +// /* unnestPart */ +// unnest(?::type1[]), /* field1 */ +// unnest(?::type2[]), /* field2 */ +// ... +// ) +// INSERT INTO table ( +// /* insertFieldsPart */ +// field1, +// field2, +// ... +// ) +// SELECT * from r +// ON CONFLICT (conflictField) DO UPDATE SET +// /* onConflictPart */ +// field1 = excluded.field1, +// field2 = excluded.field2, +// ... +func (q *Q) upsertRows(ctx context.Context, table string, conflictField string, fields []upsertField) error { + unnestPart := make([]string, 0, len(fields)) + insertFieldsPart := make([]string, 0, len(fields)) + onConflictPart := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + insertFieldsPart = append( + insertFieldsPart, + field.name, + ) + onConflictPart = append( + onConflictPart, + fmt.Sprintf("%s = excluded.%s", field.name, field.name), + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + + sql := ` + WITH r AS + (SELECT ` + strings.Join(unnestPart, ",") + `) + INSERT INTO ` + table + ` + (` + strings.Join(insertFieldsPart, ",") + `) + SELECT * from r + ON CONFLICT (` + conflictField + `) DO UPDATE SET + ` + strings.Join(onConflictPart, ",") + + _, err := q.ExecRaw( + context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType), + sql, + pqArrays..., + ) + return err +} diff --git a/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go deleted file mode 100644 index 75e88f3263..0000000000 --- a/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go +++ /dev/null @@ -1,20 +0,0 @@ -package history - -import ( - "context" - "github.com/stretchr/testify/mock" -) - -type MockOffersBatchInsertBuilder struct { - mock.Mock -} - -func (m *MockOffersBatchInsertBuilder) Add(ctx context.Context, row Offer) error { - a := m.Called(ctx, row) - return a.Error(0) -} - -func (m *MockOffersBatchInsertBuilder) Exec(ctx context.Context) error { - a := m.Called(ctx) - return a.Error(0) -} diff --git a/services/horizon/internal/db2/history/mock_q_offers.go b/services/horizon/internal/db2/history/mock_q_offers.go index 0b23720b7f..9cbf673fc5 100644 --- a/services/horizon/internal/db2/history/mock_q_offers.go +++ b/services/horizon/internal/db2/history/mock_q_offers.go @@ -2,6 +2,7 @@ package history import ( "context" + "github.com/stretchr/testify/mock" ) @@ -30,14 +31,9 @@ func (m *MockQOffers) CountOffers(ctx context.Context) (int, error) { return a.Get(0).(int), a.Error(1) } -func (m *MockQOffers) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder { - a := m.Called(maxBatchSize) - return a.Get(0).(OffersBatchInsertBuilder) -} - -func (m *MockQOffers) UpdateOffer(ctx context.Context, row Offer) (int64, error) { - a := m.Called(ctx, row) - return a.Get(0).(int64), a.Error(1) +func (m *MockQOffers) UpsertOffers(ctx context.Context, rows []Offer) error { + a := m.Called(ctx, rows) + return a.Error(0) } func (m *MockQOffers) RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error) { diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index fe37ba91c9..5b875b72cf 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -14,8 +14,7 @@ type QOffers interface { GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error) CountOffers(ctx context.Context) (int, error) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error) - NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder - UpdateOffer(ctx context.Context, offer Offer) (int64, error) + UpsertOffers(ctx context.Context, offers []Offer) error RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error) CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error) } @@ -96,15 +95,43 @@ func (q *Q) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]O return offers, err } -// UpdateOffer updates a row in the offers table. -// Returns number of rows affected and error. -func (q *Q) UpdateOffer(ctx context.Context, offer Offer) (int64, error) { - updateBuilder := q.GetTable("offers").Update() - result, err := updateBuilder.SetStruct(offer, []string{}).Where("offer_id = ?", offer.OfferID).Exec(ctx) - if err != nil { - return 0, err +// UpsertOffers upserts a batch of offers in the offerss table. +// There's currently no limit of the number of offers this method can +// accept other than 2GB limit of the query string length what should be enough +// for each ledger with the current limits. +func (q *Q) UpsertOffers(ctx context.Context, offers []Offer) error { + var sellerID, sellingAsset, buyingAsset, offerID, amount, priceN, priceD, + price, flags, lastModifiedLedger, sponsor []interface{} + + for _, offer := range offers { + sellerID = append(sellerID, offer.SellerID) + offerID = append(offerID, offer.OfferID) + sellingAsset = append(sellingAsset, offer.SellingAsset) + buyingAsset = append(buyingAsset, offer.BuyingAsset) + amount = append(amount, offer.Amount) + priceN = append(priceN, offer.Pricen) + priceD = append(priceD, offer.Priced) + price = append(price, offer.Price) + flags = append(flags, offer.Flags) + lastModifiedLedger = append(lastModifiedLedger, offer.LastModifiedLedger) + sponsor = append(sponsor, offer.Sponsor) } - return result.RowsAffected() + + upsertFields := []upsertField{ + {"seller_id", "text", sellerID}, + {"offer_id", "bigint", offerID}, + {"selling_asset", "text", sellingAsset}, + {"buying_asset", "text", buyingAsset}, + {"amount", "bigint", amount}, + {"pricen", "integer", priceN}, + {"priced", "integer", priceD}, + {"price", "double precision", price}, + {"flags", "integer", flags}, + {"last_modified_ledger", "integer", lastModifiedLedger}, + {"sponsor", "text", sponsor}, + } + + return q.upsertRows(ctx, "offers", "offer_id", upsertFields) } // RemoveOffers marks rows in the offers table as deleted. diff --git a/services/horizon/internal/db2/history/offers_batch_insert_builder.go b/services/horizon/internal/db2/history/offers_batch_insert_builder.go deleted file mode 100644 index c1d3b628a9..0000000000 --- a/services/horizon/internal/db2/history/offers_batch_insert_builder.go +++ /dev/null @@ -1,14 +0,0 @@ -package history - -import ( - "context" -) - -// Add adds a new offer entry to the batch. -func (i *offersBatchInsertBuilder) Add(ctx context.Context, offer Offer) error { - return i.builder.RowStruct(ctx, offer) -} - -func (i *offersBatchInsertBuilder) Exec(ctx context.Context) error { - return i.builder.Exec(ctx) -} diff --git a/services/horizon/internal/db2/history/offers_test.go b/services/horizon/internal/db2/history/offers_test.go index 2218ffc71a..61f228e33a 100644 --- a/services/horizon/internal/db2/history/offers_test.go +++ b/services/horizon/internal/db2/history/offers_test.go @@ -64,12 +64,7 @@ var ( ) func insertOffer(tt *test.T, q *Q, offer Offer) error { - batch := q.NewOffersBatchInsertBuilder(0) - err := batch.Add(tt.Ctx, offer) - if err != nil { - return err - } - return batch.Exec(tt.Ctx) + return q.UpsertOffers(tt.Ctx, []Offer{offer}) } func TestGetOfferByID(t *testing.T) { @@ -194,9 +189,8 @@ func TestUpdateOffer(t *testing.T) { modifiedEurOffer := eurOffer modifiedEurOffer.Amount -= 10 - rowsAffected, err := q.UpdateOffer(tt.Ctx, modifiedEurOffer) + err = q.UpsertOffers(tt.Ctx, []Offer{modifiedEurOffer}) tt.Assert.NoError(err) - tt.Assert.Equal(int64(1), rowsAffected) offers, err = q.GetAllOffers(tt.Ctx) tt.Assert.NoError(err) diff --git a/services/horizon/internal/db2/history/orderbook_test.go b/services/horizon/internal/db2/history/orderbook_test.go index 30b27f4a64..84b3213b69 100644 --- a/services/horizon/internal/db2/history/orderbook_test.go +++ b/services/horizon/internal/db2/history/orderbook_test.go @@ -213,12 +213,7 @@ func TestGetOrderBookSummary(t *testing.T) { } { t.Run(testCase.name, func(t *testing.T) { assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"})) - - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range testCase.offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, testCase.offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, @@ -260,11 +255,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) { sellEurOffer, } - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 48ee414237..a2c4975bd4 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -24,12 +24,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { q := &mockDBQ{} // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -113,12 +107,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { ).Once() // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -185,11 +173,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t defer mock.AssertExpectationsForObjects(t, historyAdapter) // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). @@ -227,8 +210,6 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { defer mock.AssertExpectationsForObjects(t, q) // Twice = checking ledgerSource and historyArchiveSource - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(&history.MockOffersBatchInsertBuilder{}).Twice() q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). Return(&history.MockAccountDataBatchInsertBuilder{}).Twice() q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). @@ -330,12 +311,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { } // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -400,11 +375,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t } // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index 5a99d0a24c..d09610e025 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -18,7 +18,6 @@ type OffersProcessor struct { sequence uint32 cache *ingest.ChangeCompactor - insertBatch history.OffersBatchInsertBuilder removeBatch []int64 } @@ -30,7 +29,6 @@ func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcess func (p *OffersProcessor) reset() { p.cache = ingest.NewChangeCompactor() - p.insertBatch = p.offersQ.NewOffersBatchInsertBuilder(maxBatchSize) p.removeBatch = []int64{} } @@ -71,59 +69,43 @@ func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer } func (p *OffersProcessor) flushCache(ctx context.Context) error { + batchUpsertOffers := []history.Offer{} changes := p.cache.GetChanges() for _, change := range changes { - var rowsAffected int64 - var err error - var action string - var offerID xdr.Int64 - switch { - case change.Pre == nil && change.Post != nil: - // Created - action = "inserting" + case change.Post != nil: + // Created and updated row := p.ledgerEntryToRow(change.Post) - err = p.insertBatch.Add(ctx, row) - rowsAffected = 1 // We don't track this when batch inserting + batchUpsertOffers = append(batchUpsertOffers, row) case change.Pre != nil && change.Post == nil: // Removed - action = "removing" offer := change.Pre.Data.MustOffer() p.removeBatch = append(p.removeBatch, int64(offer.OfferId)) - rowsAffected = 1 // We don't track this when batch removing default: - // Updated - action = "updating" - offer := change.Post.Data.MustOffer() - offerID = offer.OfferId - row := p.ledgerEntryToRow(change.Post) - rowsAffected, err = p.offersQ.UpdateOffer(ctx, row) + return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") } + } + if len(batchUpsertOffers) > 0 { + err := p.offersQ.UpsertOffers(ctx, batchUpsertOffers) if err != nil { - return err - } - - if rowsAffected != 1 { - return ingest.NewStateError(errors.Errorf( - "%d rows affected when %s offer %d", - rowsAffected, - action, - offerID, - )) + return errors.Wrap(err, "errors in UpsertOffers") } } - err := p.insertBatch.Exec(ctx) - if err != nil { - return errors.Wrap(err, "error executing batch") - } - if len(p.removeBatch) > 0 { - _, err = p.offersQ.RemoveOffers(ctx, p.removeBatch, p.sequence) + rowsAffected, err := p.offersQ.RemoveOffers(ctx, p.removeBatch, p.sequence) if err != nil { return errors.Wrap(err, "error in RemoveOffers") } + + if rowsAffected != int64(len(p.removeBatch)) { + return ingest.NewStateError(errors.Errorf( + "%d rows affected when removing %d offers", + rowsAffected, + len(p.removeBatch), + )) + } } return nil diff --git a/services/horizon/internal/ingest/processors/offers_processor_test.go b/services/horizon/internal/ingest/processors/offers_processor_test.go index c49bbaf26a..86eb67941d 100644 --- a/services/horizon/internal/ingest/processors/offers_processor_test.go +++ b/services/horizon/internal/ingest/processors/offers_processor_test.go @@ -59,33 +59,25 @@ func TestOffersProcessorTestSuiteState(t *testing.T) { type OffersProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *OffersProcessor - mockQ *history.MockQOffers - mockBatchInsertBuilder *history.MockOffersBatchInsertBuilder - sequence uint32 + ctx context.Context + processor *OffersProcessor + mockQ *history.MockQOffers + sequence uint32 } func (s *OffersProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQOffers{} - s.mockBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{} - - s.mockQ. - On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder).Once() s.sequence = 456 s.processor = NewOffersProcessor(s.mockQ, s.sequence) } func (s *OffersProcessorTestSuiteState) TearDownTest() { - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) s.mockQ.AssertExpectations(s.T()) - s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *OffersProcessorTestSuiteState) TestCreateOffer() { @@ -103,13 +95,15 @@ func (s *OffersProcessorTestSuiteState) TestCreateOffer() { LastModifiedLedgerSeq: lastModifiedLedgerSeq, } - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 1, - Pricen: int32(1), - Priced: int32(2), - Price: float64(0.5), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 1, + Pricen: int32(1), + Priced: int32(2), + Price: float64(0.5), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ @@ -126,21 +120,15 @@ func TestOffersProcessorTestSuiteLedger(t *testing.T) { type OffersProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *OffersProcessor - mockQ *history.MockQOffers - mockBatchInsertBuilder *history.MockOffersBatchInsertBuilder - sequence uint32 + ctx context.Context + processor *OffersProcessor + mockQ *history.MockQOffers + sequence uint32 } func (s *OffersProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQOffers{} - s.mockBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{} - - s.mockQ. - On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder).Once() s.sequence = 456 s.processor = NewOffersProcessor(s.mockQ, s.sequence) @@ -148,7 +136,6 @@ func (s *OffersProcessorTestSuiteLedger) SetupTest() { func (s *OffersProcessorTestSuiteLedger) TearDownTest() { s.mockQ.AssertExpectations(s.T()) - s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() { @@ -217,16 +204,16 @@ func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() - - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() } func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { @@ -254,7 +241,7 @@ func (s *OffersProcessorTestSuiteLedger) TestCompactionError() { s.Assert().EqualError(s.processor.Commit(s.ctx), "could not compact offers: compaction error") } -func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { +func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() { lastModifiedLedgerSeq := xdr.Uint32(1234) offer := xdr.OfferEntry{ @@ -268,6 +255,12 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { Price: xdr.Price{1, 6}, } + anotherOffer := xdr.OfferEntry{ + SellerId: xdr.MustAddress("GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P"), + OfferId: xdr.Int64(3), + Price: xdr.Price{2, 3}, + } + updatedEntry := xdr.LedgerEntry{ LastModifiedLedgerSeq: lastModifiedLedgerSeq, Data: xdr.LedgerEntryData{ @@ -289,19 +282,46 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { }) s.Assert().NoError(err) - s.mockQ.On("UpdateOffer", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - }).Return(int64(0), nil).Once() + err = s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &anotherOffer, + }, + }, + }) + s.Assert().NoError(err) - err = s.processor.Commit(s.ctx) - s.Assert().Error(err) - s.Assert().IsType(ingest.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "error flushing cache: 0 rows affected when updating offer 2") + s.mockQ.On("UpsertOffers", s.ctx, mock.Anything).Run(func(args mock.Arguments) { + // To fix order issue due to using ChangeCompactor + offers := args.Get(1).([]history.Offer) + s.Assert().ElementsMatch( + offers, + []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, + { + SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P", + OfferID: 3, + Pricen: int32(2), + Priced: int32(3), + Price: float64(2) / float64(3), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, + }, + ) + }).Return(nil).Once() + s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() + s.Assert().NoError(s.processor.Commit(s.ctx)) } func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { @@ -322,8 +342,6 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { s.Assert().NoError(err) s.mockQ.On("RemoveOffers", s.ctx, []int64{3}, s.sequence).Return(int64(1), nil).Once() - - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) } @@ -377,16 +395,17 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) } @@ -424,14 +443,57 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveMultipleOffers() { }) s.Assert().NoError(err) - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.mockQ.On("RemoveOffers", s.ctx, mock.Anything, s.sequence).Run(func(args mock.Arguments) { // To fix order issue due to using ChangeCompactor ids := args.Get(1).([]int64) s.Assert().ElementsMatch(ids, []int64{3, 4}) - }).Return(int64(0), nil).Once() + }).Return(int64(2), nil).Once() err = s.processor.Commit(s.ctx) s.Assert().NoError(err) } + +func (s *OffersProcessorTestSuiteLedger) TestRemoveMultipleOffersRowsAffectedCheck() { + err := s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), + Price: xdr.Price{3, 1}, + }, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + err = s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(4), + Price: xdr.Price{3, 1}, + }, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + s.mockQ.On("RemoveOffers", s.ctx, mock.Anything, s.sequence).Run(func(args mock.Arguments) { + // To fix order issue due to using ChangeCompactor + ids := args.Get(1).([]int64) + s.Assert().ElementsMatch(ids, []int64{3, 4}) + }).Return(int64(0), nil).Once() + + err = s.processor.Commit(s.ctx) + s.Assert().IsType(ingest.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "error flushing cache: 0 rows affected when removing 2 offers") +}