Skip to content

Commit

Permalink
horizon/ingest: refactor initial load of orderbook offers/pools to us…
Browse files Browse the repository at this point in the history
…e functional streaming (#4155)
  • Loading branch information
sreuland authored Jan 4, 2022
1 parent 38fca01 commit b784365
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 83 deletions.
27 changes: 21 additions & 6 deletions services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"
"github.com/jmoiron/sqlx"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -85,7 +86,7 @@ func (lpar *LiquidityPoolAssetReserve) UnmarshalJSON(data []byte) error {
type QLiquidityPools interface {
UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error
GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]LiquidityPool, error)
GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error)
StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error
CountLiquidityPools(ctx context.Context) (int, error)
FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error)
GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error)
Expand Down Expand Up @@ -186,13 +187,27 @@ func (q *Q) GetLiquidityPools(ctx context.Context, query LiquidityPoolsQuery) ([
return results, nil
}

func (q *Q) GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error) {
var results []LiquidityPool
if err := q.Select(ctx, &results, selectLiquidityPools.Where("deleted = ?", false)); err != nil {
return nil, errors.Wrap(err, "could not run select query")
func (q *Q) StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error {
var rows *sqlx.Rows
var err error

if rows, err = q.Query(ctx, selectLiquidityPools.Where("deleted = ?", false)); err != nil {
return errors.Wrap(err, "could not run all liquidity pools select query")
}

return results, nil
defer rows.Close()
liquidityPool := LiquidityPool{}

for rows.Next() {
if err = rows.StructScan(&liquidityPool); err != nil {
return errors.Wrap(err, "could not scan row into liquidity pool struct")
}
if err = callback(liquidityPool); err != nil {
return err
}
}

return rows.Err()
}

// GetUpdatedLiquidityPools returns all liquidity pools created, updated, or deleted after the given ledger sequence.
Expand Down
13 changes: 11 additions & 2 deletions services/horizon/internal/db2/history/liquidity_pools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ func TestFindLiquidityPoolsByAssets(t *testing.T) {
tt.Assert.Len(lps, 1)

pool := lps[0]
lps, err = q.GetAllLiquidityPools(tt.Ctx)
lps = nil
err = q.StreamAllLiquidityPools(tt.Ctx, func(liqudityPool LiquidityPool) error {
lps = append(lps, liqudityPool)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(lps, 1)
tt.Assert.Equal(pool, lps[0])
Expand Down Expand Up @@ -205,7 +209,12 @@ func TestLiquidityPoolCompaction(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(lps, 0)

lps, err = q.GetAllLiquidityPools(tt.Ctx)
lps = nil
err = q.StreamAllLiquidityPools(tt.Ctx, func(liqudityPool LiquidityPool) error {
lps = append(lps, liqudityPool)
return nil
})

tt.Assert.NoError(err)
tt.Assert.Len(lps, 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func (m *MockQLiquidityPools) FindLiquidityPoolByID(ctx context.Context, liquidi
return a.Get(0).(LiquidityPool), a.Error(1)
}

func (m *MockQLiquidityPools) GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error) {
a := m.Called(ctx)
return a.Get(0).([]LiquidityPool), a.Error(1)
func (m *MockQLiquidityPools) StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}

func (m *MockQLiquidityPools) GetUpdatedLiquidityPools(ctx context.Context, sequence uint32) ([]LiquidityPool, error) {
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/db2/history/mock_q_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ type MockQOffers struct {
mock.Mock
}

func (m *MockQOffers) GetAllOffers(ctx context.Context) ([]Offer, error) {
a := m.Called(ctx)
return a.Get(0).([]Offer), a.Error(1)
func (m *MockQOffers) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}

func (m *MockQOffers) GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error) {
Expand Down
32 changes: 26 additions & 6 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"

"github.com/stellar/go/support/errors"
)

// QOffers defines offer related queries.
type QOffers interface {
GetAllOffers(ctx context.Context) ([]Offer, error)
StreamAllOffers(ctx context.Context, callback func(Offer) error) error
GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error)
CountOffers(ctx context.Context) (int, error)
GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error)
Expand Down Expand Up @@ -80,11 +81,30 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) {
return offers, nil
}

// GetAllOffers loads all non deleted offers
func (q *Q) GetAllOffers(ctx context.Context) ([]Offer, error) {
var offers []Offer
err := q.Select(ctx, &offers, selectOffers.Where("deleted = ?", false))
return offers, err
// StreamAllOffers loads all non deleted offers
func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
var rows *sqlx.Rows
var err error

if rows, err = q.Query(ctx, selectOffers.Where("deleted = ?", false)); err != nil {
return errors.Wrap(err, "could not run all offers select query")
}

defer rows.Close()
offer := Offer{}

for rows.Next() {
if err = rows.StructScan(&offer); err != nil {
return errors.Wrap(err, "could not scan row into offer struct")
}

if err = callback(offer); err != nil {
return err
}
}

return rows.Err()

}

// GetUpdatedOffers returns all offers created, updated, or deleted after the given ledger sequence.
Expand Down
43 changes: 36 additions & 7 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ func TestQueryEmptyOffers(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

offers, err := q.GetAllOffers(tt.Ctx)
var offers []Offer
err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})

tt.Assert.NoError(err)
tt.Assert.Len(offers, 0)

Expand Down Expand Up @@ -127,7 +132,11 @@ func TestInsertOffers(t *testing.T) {
err = insertOffer(tt, q, twoEurOffer)
tt.Assert.NoError(err)

offers, err := q.GetAllOffers(tt.Ctx)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 2)

Expand All @@ -154,7 +163,11 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Equal(2, afterCompactionCount)

afterCompactionOffers, err := q.GetAllOffers(tt.Ctx)
var afterCompactionOffers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
afterCompactionOffers = append(afterCompactionOffers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(afterCompactionOffers, 2)
}
Expand All @@ -168,7 +181,11 @@ func TestUpdateOffer(t *testing.T) {
err := insertOffer(tt, q, eurOffer)
tt.Assert.NoError(err)

offers, err := q.GetAllOffers(tt.Ctx)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 1)

Expand All @@ -192,7 +209,11 @@ func TestUpdateOffer(t *testing.T) {
err = q.UpsertOffers(tt.Ctx, []Offer{modifiedEurOffer})
tt.Assert.NoError(err)

offers, err = q.GetAllOffers(tt.Ctx)
offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 1)

Expand All @@ -215,7 +236,11 @@ func TestRemoveOffer(t *testing.T) {

err := insertOffer(tt, q, eurOffer)
tt.Assert.NoError(err)
offers, err := q.GetAllOffers(tt.Ctx)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 1)
tt.Assert.Equal(offers[0], eurOffer)
Expand All @@ -229,7 +254,11 @@ func TestRemoveOffer(t *testing.T) {
expectedUpdates[0].LastModifiedLedger = 1236
expectedUpdates[0].Deleted = true

offers, err = q.GetAllOffers(tt.Ctx)
offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 0)

Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ func (m *mockDBQ) GetExpStateInvalid(ctx context.Context) (bool, error) {
return args.Get(0).(bool), args.Error(1)
}

func (m *mockDBQ) GetAllOffers(ctx context.Context) ([]history.Offer, error) {
args := m.Called(ctx)
return args.Get(0).([]history.Offer), args.Error(1)
func (m *mockDBQ) StreamAllOffers(ctx context.Context, callback func(history.Offer) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}

func (m *mockDBQ) GetLatestHistoryLedger(ctx context.Context) (uint32, error) {
Expand Down
51 changes: 31 additions & 20 deletions services/horizon/internal/ingest/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,26 +132,26 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b

defer o.graph.Discard()

offers, err := o.historyQ.GetAllOffers(ctx)
if err != nil {
return true, errors.Wrap(err, "Error from GetAllOffers")
}
err := o.historyQ.StreamAllOffers(ctx, func(offer history.Offer) error {
o.graph.AddOffers(offerToXDR(offer))
return nil
})

liquidityPools, err := o.historyQ.GetAllLiquidityPools(ctx)
if err != nil {
return true, errors.Wrap(err, "Error from GetAllLiquidityPools")
return true, errors.Wrap(err, "Error loading offers into orderbook")
}

for _, offer := range offers {
o.graph.AddOffers(offerToXDR(offer))
}

for _, liquidityPool := range liquidityPools {
liquidityPoolXDR, err := liquidityPoolToXDR(liquidityPool)
if err != nil {
return true, errors.Wrap(err, "Invalid liquidity pool row")
err = o.historyQ.StreamAllLiquidityPools(ctx, func(liquidityPool history.LiquidityPool) error {
if liquidityPoolXDR, liquidityPoolErr := liquidityPoolToXDR(liquidityPool); liquidityPoolErr != nil {
return errors.Wrapf(liquidityPoolErr, "Invalid liquidity pool row %v, unable to marshal to xdr", liquidityPool)
} else {
o.graph.AddLiquidityPools(liquidityPoolXDR)
return nil
}
o.graph.AddLiquidityPools(liquidityPoolXDR)
})

if err != nil {
return true, errors.Wrap(err, "Error loading liquidity pools into orderbook")
}

if err := o.graph.Apply(status.LastIngestedLedger); err != nil {
Expand Down Expand Up @@ -209,9 +209,14 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b
}

func (o *OrderBookStream) verifyAllOffers(ctx context.Context, offers []xdr.OfferEntry) (bool, error) {
ingestionOffers, err := o.historyQ.GetAllOffers(ctx)
var ingestionOffers []history.Offer
err := o.historyQ.StreamAllOffers(ctx, func(offer history.Offer) error {
ingestionOffers = append(ingestionOffers, offer)
return nil
})

if err != nil {
return false, errors.Wrap(err, "Error from GetAllOffers")
return false, errors.Wrap(err, "Error loading all offers for orderbook verification")
}

mismatch := len(offers) != len(ingestionOffers)
Expand Down Expand Up @@ -253,9 +258,15 @@ func (o *OrderBookStream) verifyAllOffers(ctx context.Context, offers []xdr.Offe
}

func (o *OrderBookStream) verifyAllLiquidityPools(ctx context.Context, liquidityPools []xdr.LiquidityPoolEntry) (bool, error) {
ingestionLiquidityPools, err := o.historyQ.GetAllLiquidityPools(ctx)
var ingestionLiquidityPools []history.LiquidityPool

err := o.historyQ.StreamAllLiquidityPools(ctx, func(liquidityPool history.LiquidityPool) error {
ingestionLiquidityPools = append(ingestionLiquidityPools, liquidityPool)
return nil
})

if err != nil {
return false, errors.Wrap(err, "Error from GetAllLiquidityPools")
return false, errors.Wrap(err, "Error loading all liquidity pools for orderbook verification")
}

mismatch := len(liquidityPools) != len(ingestionLiquidityPools)
Expand Down Expand Up @@ -322,7 +333,7 @@ func (o *OrderBookStream) Update(ctx context.Context) error {
}

// add 15 minute jitter so that not all horizon nodes are calling
// historyQ.GetAllOffers at the same time
// historyQ.StreamAllOffers at the same time
jitter := time.Duration(rand.Int63n(int64(15 * time.Minute)))
requiresVerification := o.lastLedger > 0 &&
time.Since(o.lastVerification) >= verificationFrequency+jitter
Expand Down
Loading

0 comments on commit b784365

Please sign in to comment.