Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

horizon/ingest: refactor initial load of orderbook offers/pools to use functional streaming #4155

Merged
merged 3 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"
"github.com/jmoiron/sqlx"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -85,7 +86,7 @@ func (lpar *LiquidityPoolAssetReserve) UnmarshalJSON(data []byte) error {
type QLiquidityPools interface {
UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error
GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]LiquidityPool, error)
GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error)
StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error
CountLiquidityPools(ctx context.Context) (int, error)
FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error)
GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error)
Expand Down Expand Up @@ -186,13 +187,27 @@ func (q *Q) GetLiquidityPools(ctx context.Context, query LiquidityPoolsQuery) ([
return results, nil
}

func (q *Q) GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error) {
var results []LiquidityPool
if err := q.Select(ctx, &results, selectLiquidityPools.Where("deleted = ?", false)); err != nil {
return nil, errors.Wrap(err, "could not run select query")
func (q *Q) StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error {
var rows *sqlx.Rows
var err error

if rows, err = q.Query(ctx, selectLiquidityPools.Where("deleted = ?", false)); err != nil {
return errors.Wrap(err, "could not run all liquidity pools select query")
}

return results, nil
defer rows.Close()
liquidityPool := LiquidityPool{}

for rows.Next() {
if err = rows.StructScan(&liquidityPool); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember one issue in our XDR decoder: an object reused for decoding was not cleared so sometimes pointer fields contained the old data. I think this can be also the case here. Can we check? The fix would be to move liquidityPool variable inside the for loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the PRs for reference: stellar/go-xdr#13 stellar/go-xdr#14.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, every field in the struct is set to a column value in results(selected from liquidity_pools table) per row iteration during StructScan, so, shouldn't need init between iterations.

return errors.Wrap(err, "could not scan row into liquidity pool struct")
}
if err = callback(liquidityPool); err != nil {
return err
}
}

return rows.Err()
}

// GetUpdatedLiquidityPools returns all liquidity pools created, updated, or deleted after the given ledger sequence.
Expand Down
13 changes: 11 additions & 2 deletions services/horizon/internal/db2/history/liquidity_pools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ func TestFindLiquidityPoolsByAssets(t *testing.T) {
tt.Assert.Len(lps, 1)

pool := lps[0]
lps, err = q.GetAllLiquidityPools(tt.Ctx)
lps = nil
err = q.StreamAllLiquidityPools(tt.Ctx, func(liqudityPool LiquidityPool) error {
lps = append(lps, liqudityPool)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(lps, 1)
tt.Assert.Equal(pool, lps[0])
Expand Down Expand Up @@ -205,7 +209,12 @@ func TestLiquidityPoolCompaction(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(lps, 0)

lps, err = q.GetAllLiquidityPools(tt.Ctx)
lps = nil
err = q.StreamAllLiquidityPools(tt.Ctx, func(liqudityPool LiquidityPool) error {
lps = append(lps, liqudityPool)
return nil
})

tt.Assert.NoError(err)
tt.Assert.Len(lps, 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func (m *MockQLiquidityPools) FindLiquidityPoolByID(ctx context.Context, liquidi
return a.Get(0).(LiquidityPool), a.Error(1)
}

func (m *MockQLiquidityPools) GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error) {
a := m.Called(ctx)
return a.Get(0).([]LiquidityPool), a.Error(1)
func (m *MockQLiquidityPools) StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}

func (m *MockQLiquidityPools) GetUpdatedLiquidityPools(ctx context.Context, sequence uint32) ([]LiquidityPool, error) {
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/db2/history/mock_q_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ type MockQOffers struct {
mock.Mock
}

func (m *MockQOffers) GetAllOffers(ctx context.Context) ([]Offer, error) {
a := m.Called(ctx)
return a.Get(0).([]Offer), a.Error(1)
func (m *MockQOffers) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}

func (m *MockQOffers) GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error) {
Expand Down
32 changes: 26 additions & 6 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"

"github.com/stellar/go/support/errors"
)

// QOffers defines offer related queries.
type QOffers interface {
GetAllOffers(ctx context.Context) ([]Offer, error)
StreamAllOffers(ctx context.Context, callback func(Offer) error) error
GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error)
CountOffers(ctx context.Context) (int, error)
GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error)
Expand Down Expand Up @@ -80,11 +81,30 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) {
return offers, nil
}

// GetAllOffers loads all non deleted offers
func (q *Q) GetAllOffers(ctx context.Context) ([]Offer, error) {
var offers []Offer
err := q.Select(ctx, &offers, selectOffers.Where("deleted = ?", false))
return offers, err
// StreamAllOffers loads all non deleted offers
func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
var rows *sqlx.Rows
var err error

if rows, err = q.Query(ctx, selectOffers.Where("deleted = ?", false)); err != nil {
return errors.Wrap(err, "could not run all offers select query")
}

defer rows.Close()
offer := Offer{}

for rows.Next() {
if err = rows.StructScan(&offer); err != nil {
return errors.Wrap(err, "could not scan row into offer struct")
}

if err = callback(offer); err != nil {
return err
}
}

return rows.Err()

}

// GetUpdatedOffers returns all offers created, updated, or deleted after the given ledger sequence.
Expand Down
43 changes: 36 additions & 7 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ func TestQueryEmptyOffers(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

offers, err := q.GetAllOffers(tt.Ctx)
var offers []Offer
err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})

tt.Assert.NoError(err)
tt.Assert.Len(offers, 0)

Expand Down Expand Up @@ -127,7 +132,11 @@ func TestInsertOffers(t *testing.T) {
err = insertOffer(tt, q, twoEurOffer)
tt.Assert.NoError(err)

offers, err := q.GetAllOffers(tt.Ctx)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 2)

Expand All @@ -154,7 +163,11 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Equal(2, afterCompactionCount)

afterCompactionOffers, err := q.GetAllOffers(tt.Ctx)
var afterCompactionOffers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
afterCompactionOffers = append(afterCompactionOffers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(afterCompactionOffers, 2)
}
Expand All @@ -168,7 +181,11 @@ func TestUpdateOffer(t *testing.T) {
err := insertOffer(tt, q, eurOffer)
tt.Assert.NoError(err)

offers, err := q.GetAllOffers(tt.Ctx)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 1)

Expand All @@ -192,7 +209,11 @@ func TestUpdateOffer(t *testing.T) {
err = q.UpsertOffers(tt.Ctx, []Offer{modifiedEurOffer})
tt.Assert.NoError(err)

offers, err = q.GetAllOffers(tt.Ctx)
offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 1)

Expand All @@ -215,7 +236,11 @@ func TestRemoveOffer(t *testing.T) {

err := insertOffer(tt, q, eurOffer)
tt.Assert.NoError(err)
offers, err := q.GetAllOffers(tt.Ctx)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 1)
tt.Assert.Equal(offers[0], eurOffer)
Expand All @@ -229,7 +254,11 @@ func TestRemoveOffer(t *testing.T) {
expectedUpdates[0].LastModifiedLedger = 1236
expectedUpdates[0].Deleted = true

offers, err = q.GetAllOffers(tt.Ctx)
offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 0)

Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ func (m *mockDBQ) GetExpStateInvalid(ctx context.Context) (bool, error) {
return args.Get(0).(bool), args.Error(1)
}

func (m *mockDBQ) GetAllOffers(ctx context.Context) ([]history.Offer, error) {
args := m.Called(ctx)
return args.Get(0).([]history.Offer), args.Error(1)
func (m *mockDBQ) StreamAllOffers(ctx context.Context, callback func(history.Offer) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}

func (m *mockDBQ) GetLatestHistoryLedger(ctx context.Context) (uint32, error) {
Expand Down
51 changes: 31 additions & 20 deletions services/horizon/internal/ingest/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,26 +132,26 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b

defer o.graph.Discard()

offers, err := o.historyQ.GetAllOffers(ctx)
if err != nil {
return true, errors.Wrap(err, "Error from GetAllOffers")
}
err := o.historyQ.StreamAllOffers(ctx, func(offer history.Offer) error {
o.graph.AddOffers(offerToXDR(offer))
return nil
})

liquidityPools, err := o.historyQ.GetAllLiquidityPools(ctx)
if err != nil {
return true, errors.Wrap(err, "Error from GetAllLiquidityPools")
return true, errors.Wrap(err, "Error loading offers into orderbook")
}

for _, offer := range offers {
o.graph.AddOffers(offerToXDR(offer))
}

for _, liquidityPool := range liquidityPools {
liquidityPoolXDR, err := liquidityPoolToXDR(liquidityPool)
if err != nil {
return true, errors.Wrap(err, "Invalid liquidity pool row")
err = o.historyQ.StreamAllLiquidityPools(ctx, func(liquidityPool history.LiquidityPool) error {
if liquidityPoolXDR, liquidityPoolErr := liquidityPoolToXDR(liquidityPool); liquidityPoolErr != nil {
return errors.Wrapf(liquidityPoolErr, "Invalid liquidity pool row %v, unable to marshal to xdr", liquidityPool)
} else {
o.graph.AddLiquidityPools(liquidityPoolXDR)
return nil
}
o.graph.AddLiquidityPools(liquidityPoolXDR)
})

if err != nil {
return true, errors.Wrap(err, "Error loading liquidity pools into orderbook")
}

if err := o.graph.Apply(status.LastIngestedLedger); err != nil {
Expand Down Expand Up @@ -209,9 +209,14 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b
}

func (o *OrderBookStream) verifyAllOffers(ctx context.Context, offers []xdr.OfferEntry) (bool, error) {
ingestionOffers, err := o.historyQ.GetAllOffers(ctx)
var ingestionOffers []history.Offer
err := o.historyQ.StreamAllOffers(ctx, func(offer history.Offer) error {
ingestionOffers = append(ingestionOffers, offer)
return nil
})

if err != nil {
return false, errors.Wrap(err, "Error from GetAllOffers")
return false, errors.Wrap(err, "Error loading all offers for orderbook verification")
}

mismatch := len(offers) != len(ingestionOffers)
Expand Down Expand Up @@ -253,9 +258,15 @@ func (o *OrderBookStream) verifyAllOffers(ctx context.Context, offers []xdr.Offe
}

func (o *OrderBookStream) verifyAllLiquidityPools(ctx context.Context, liquidityPools []xdr.LiquidityPoolEntry) (bool, error) {
ingestionLiquidityPools, err := o.historyQ.GetAllLiquidityPools(ctx)
var ingestionLiquidityPools []history.LiquidityPool

err := o.historyQ.StreamAllLiquidityPools(ctx, func(liquidityPool history.LiquidityPool) error {
ingestionLiquidityPools = append(ingestionLiquidityPools, liquidityPool)
return nil
})

if err != nil {
return false, errors.Wrap(err, "Error from GetAllLiquidityPools")
return false, errors.Wrap(err, "Error loading all liquidity pools for orderbook verification")
}

mismatch := len(liquidityPools) != len(ingestionLiquidityPools)
Expand Down Expand Up @@ -322,7 +333,7 @@ func (o *OrderBookStream) Update(ctx context.Context) error {
}

// add 15 minute jitter so that not all horizon nodes are calling
// historyQ.GetAllOffers at the same time
// historyQ.StreamAllOffers at the same time
jitter := time.Duration(rand.Int63n(int64(15 * time.Minute)))
requiresVerification := o.lastLedger > 0 &&
time.Since(o.lastVerification) >= verificationFrequency+jitter
Expand Down
Loading