Skip to content

Commit

Permalink
Fix StreamAllLiquidityPools and StreamAllOffers (#4236)
Browse files Browse the repository at this point in the history
Co-authored-by: Paul Bellamy <[email protected]>
  • Loading branch information
tamirms and Paul Bellamy authored Feb 24, 2022
1 parent 2bb9ab2 commit e125e7b
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 6 deletions.
16 changes: 15 additions & 1 deletion services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ func (q *Q) StreamAllLiquidityPools(ctx context.Context, callback func(Liquidity
}

defer rows.Close()
liquidityPool := LiquidityPool{}

for rows.Next() {
liquidityPool := LiquidityPool{}
if err = rows.StructScan(&liquidityPool); err != nil {
return errors.Wrap(err, "could not scan row into liquidity pool struct")
}
Expand Down Expand Up @@ -246,9 +246,23 @@ var liquidityPoolsSelectStatement = "lp.id, " +

var selectLiquidityPools = sq.Select(liquidityPoolsSelectStatement).From("liquidity_pools lp")

func cloneAsset(a xdr.Asset) xdr.Asset {
b64, err := xdr.MarshalBase64(a)
if err != nil {
panic(err)
}
var b xdr.Asset
if err = xdr.SafeUnmarshalBase64(b64, &b); err != nil {
panic(err)
}
return b
}

// MakeTestPool is a helper to make liquidity pools for testing purposes. It's
// public because it's used in other test suites.
func MakeTestPool(A xdr.Asset, a uint64, B xdr.Asset, b uint64) LiquidityPool {
A = cloneAsset(A)
B = cloneAsset(B)
if !A.LessThan(B) {
B, A = A, B
b, a = a, b
Expand Down
28 changes: 28 additions & 0 deletions services/horizon/internal/db2/history/liquidity_pools_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package history

import (
"sort"
"testing"

"github.com/stellar/go/services/horizon/internal/db2"
Expand Down Expand Up @@ -90,6 +91,33 @@ func TestRemoveLiquidityPool(t *testing.T) {
tt.Assert.Equal(lp, lpObtained)
}

func TestStreamAllLiquidity(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

lp := MakeTestPool(usdAsset, 450, xlmAsset, 450)
otherLP := MakeTestPool(usdAsset, 10, eurAsset, 20)
expected := []LiquidityPool{lp, otherLP}
sort.Slice(expected, func(i, j int) bool {
return expected[i].PoolID < expected[j].PoolID
})

err := q.UpsertLiquidityPools(tt.Ctx, expected)
tt.Assert.NoError(err)

var pools []LiquidityPool
err = q.StreamAllLiquidityPools(tt.Ctx, func(pool LiquidityPool) error {
pools = append(pools, pool)
return nil
})
sort.Slice(pools, func(i, j int) bool {
return pools[i].PoolID < pools[j].PoolID
})
tt.Assert.Equal(expected, pools)
}

func TestFindLiquidityPoolsByAssets(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) err
}

defer rows.Close()
offer := Offer{}

for rows.Next() {
offer := Offer{}
if err = rows.StructScan(&offer); err != nil {
return errors.Wrap(err, "could not scan row into offer struct")
}
Expand Down
27 changes: 23 additions & 4 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ var (
eurAsset = xdr.MustNewCreditAsset("EUR", issuer.Address())
usdAsset = xdr.MustNewCreditAsset("USD", issuer.Address())

xlmOffer = Offer{
SellerID: issuer.Address(),
OfferID: int64(100),

BuyingAsset: xlmAsset,
SellingAsset: eurAsset,

Amount: int64(100),
Pricen: int32(2),
Priced: int32(1),
Price: float64(2),
Flags: 1,
LastModifiedLedger: uint32(1234),
}

eurOffer = Offer{
SellerID: issuer.Address(),
OfferID: int64(4),
Expand Down Expand Up @@ -131,26 +146,30 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.NoError(err)
err = insertOffer(tt, q, twoEurOffer)
tt.Assert.NoError(err)
err = insertOffer(tt, q, xlmOffer)
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(offers, 2)
tt.Assert.Len(offers, 3)

offersByID := map[int64]Offer{
offers[0].OfferID: offers[0],
offers[1].OfferID: offers[1],
offers[2].OfferID: offers[2],
}

tt.Assert.Equal(offersByID[eurOffer.OfferID], eurOffer)
tt.Assert.Equal(offersByID[twoEurOffer.OfferID], twoEurOffer)
tt.Assert.Equal(offersByID[xlmOffer.OfferID], xlmOffer)

count, err := q.CountOffers(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.Equal(2, count)
tt.Assert.Equal(3, count)

numRemoved, err := q.CompactOffers(tt.Ctx, 12350)
tt.Assert.NoError(err)
Expand All @@ -161,15 +180,15 @@ func TestInsertOffers(t *testing.T) {

afterCompactionCount, err := q.CountOffers(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.Equal(2, afterCompactionCount)
tt.Assert.Equal(3, afterCompactionCount)

var afterCompactionOffers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
afterCompactionOffers = append(afterCompactionOffers, offer)
return nil
})
tt.Assert.NoError(err)
tt.Assert.Len(afterCompactionOffers, 2)
tt.Assert.Len(afterCompactionOffers, 3)
}

func TestUpdateOffer(t *testing.T) {
Expand Down

0 comments on commit e125e7b

Please sign in to comment.