diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index 83bf3f8e17..c80d67c854 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -2,6 +2,7 @@ package history import ( "context" + "database/sql" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" @@ -9,6 +10,8 @@ import ( "github.com/stellar/go/support/errors" ) +const offersBatchSize = 50000 + // QOffers defines offer related queries. type QOffers interface { StreamAllOffers(ctx context.Context, callback func(Offer) error) error @@ -83,28 +86,52 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) { // StreamAllOffers loads all non deleted offers func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error { + if tx := q.GetTx(); tx == nil { + return errors.New("cannot be called outside of a transaction") + } + if opts := q.GetTxOptions(); opts == nil || !opts.ReadOnly || opts.Isolation != sql.LevelRepeatableRead { + return errors.New("should only be called in a repeatable read transaction") + } + + lastID := int64(0) + for { + nextID, err := q.streamAllOffersBatch(ctx, lastID, offersBatchSize, callback) + if err != nil { + return err + } + if lastID == nextID { + return nil + } + lastID = nextID + } +} + +func (q *Q) streamAllOffersBatch(ctx context.Context, lastId int64, limit uint64, callback func(Offer) error) (int64, error) { var rows *sqlx.Rows var err error - if rows, err = q.Query(ctx, selectOffers.Where("deleted = ?", false)); err != nil { - return errors.Wrap(err, "could not run all offers select query") + rows, err = q.Query(ctx, selectOffers. + Where("deleted = ?", false). + Where("offer_id > ? ", lastId). + OrderBy("offer_id asc").Limit(limit)) + if err != nil { + return 0, errors.Wrap(err, "could not run all offers select query") } defer rows.Close() - for rows.Next() { offer := Offer{} if err = rows.StructScan(&offer); err != nil { - return errors.Wrap(err, "could not scan row into offer struct") + return 0, errors.Wrap(err, "could not scan row into offer struct") } if err = callback(offer); err != nil { - return err + return 0, err } + lastId = offer.OfferID } - return rows.Err() - + return lastId, rows.Err() } // GetUpdatedOffers returns all offers created, updated, or deleted after the given ledger sequence. diff --git a/services/horizon/internal/db2/history/offers_test.go b/services/horizon/internal/db2/history/offers_test.go index 0aac2522f7..86d93ff958 100644 --- a/services/horizon/internal/db2/history/offers_test.go +++ b/services/horizon/internal/db2/history/offers_test.go @@ -1,6 +1,9 @@ package history import ( + "context" + "database/sql" + "github.com/stretchr/testify/assert" "strconv" "testing" @@ -105,6 +108,34 @@ func TestGetNonExistentOfferByID(t *testing.T) { tt.Assert.True(q.NoRows(err)) } +func streamAllOffersInTx(q *Q, ctx context.Context, f func(offer Offer) error) error { + err := q.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead}) + if err != nil { + return err + } + defer q.Rollback() + return q.StreamAllOffers(ctx, f) +} + +func TestStreamAllOffersRequiresTx(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + return nil + }) + assert.EqualError(t, err, "cannot be called outside of a transaction") + + assert.NoError(t, q.Begin()) + defer q.Rollback() + err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + return nil + }) + assert.EqualError(t, err, "should only be called in a repeatable read transaction") +} + func TestQueryEmptyOffers(t *testing.T) { tt := test.Start(t) defer tt.Finish() @@ -112,7 +143,7 @@ func TestQueryEmptyOffers(t *testing.T) { q := &Q{tt.HorizonSession()} var offers []Offer - err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err := streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -150,7 +181,7 @@ func TestInsertOffers(t *testing.T) { tt.Assert.NoError(err) var offers []Offer - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -183,7 +214,7 @@ func TestInsertOffers(t *testing.T) { tt.Assert.Equal(3, afterCompactionCount) var afterCompactionOffers []Offer - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { afterCompactionOffers = append(afterCompactionOffers, offer) return nil }) @@ -201,7 +232,7 @@ func TestUpdateOffer(t *testing.T) { tt.Assert.NoError(err) var offers []Offer - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -229,7 +260,7 @@ func TestUpdateOffer(t *testing.T) { tt.Assert.NoError(err) offers = nil - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -256,7 +287,7 @@ func TestRemoveOffer(t *testing.T) { err := insertOffer(tt, q, eurOffer) tt.Assert.NoError(err) var offers []Offer - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) @@ -274,7 +305,7 @@ func TestRemoveOffer(t *testing.T) { expectedUpdates[0].Deleted = true offers = nil - err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { + err = streamAllOffersInTx(q, tt.Ctx, func(offer Offer) error { offers = append(offers, offer) return nil }) diff --git a/services/horizon/internal/ingest/orderbook.go b/services/horizon/internal/ingest/orderbook.go index 3fd421cde3..e62cc7a8b6 100644 --- a/services/horizon/internal/ingest/orderbook.go +++ b/services/horizon/internal/ingest/orderbook.go @@ -136,7 +136,6 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b o.graph.AddOffers(offerToXDR(offer)) return nil }) - if err != nil { return true, errors.Wrap(err, "Error loading offers into orderbook") }