From 669a37f1239b0b39ec16674aa99cbddd719561d4 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 18 Feb 2022 18:43:45 +0000 Subject: [PATCH] Fix StreamAllLiquidityPools and StreamAllOffers --- .../internal/db2/history/liquidity_pools.go | 16 ++++++++++- .../db2/history/liquidity_pools_test.go | 28 +++++++++++++++++++ .../horizon/internal/db2/history/offers.go | 2 +- .../internal/db2/history/offers_test.go | 27 +++++++++++++++--- 4 files changed, 67 insertions(+), 6 deletions(-) diff --git a/services/horizon/internal/db2/history/liquidity_pools.go b/services/horizon/internal/db2/history/liquidity_pools.go index 0c79dc890c..46e6ba59d3 100644 --- a/services/horizon/internal/db2/history/liquidity_pools.go +++ b/services/horizon/internal/db2/history/liquidity_pools.go @@ -196,9 +196,9 @@ func (q *Q) StreamAllLiquidityPools(ctx context.Context, callback func(Liquidity } defer rows.Close() - liquidityPool := LiquidityPool{} for rows.Next() { + liquidityPool := LiquidityPool{} if err = rows.StructScan(&liquidityPool); err != nil { return errors.Wrap(err, "could not scan row into liquidity pool struct") } @@ -246,9 +246,23 @@ var liquidityPoolsSelectStatement = "lp.id, " + var selectLiquidityPools = sq.Select(liquidityPoolsSelectStatement).From("liquidity_pools lp") +func cloneAsset(a xdr.Asset) xdr.Asset { + b64, err := xdr.MarshalBase64(a) + if err != nil { + panic(err) + } + var b xdr.Asset + if err = xdr.SafeUnmarshalBase64(b64, &b); err != nil { + panic(err) + } + return b +} + // MakeTestPool is a helper to make liquidity pools for testing purposes. It's // public because it's used in other test suites. func MakeTestPool(A xdr.Asset, a uint64, B xdr.Asset, b uint64) LiquidityPool { + A = cloneAsset(A) + B = cloneAsset(B) if !A.LessThan(B) { B, A = A, B b, a = a, b diff --git a/services/horizon/internal/db2/history/liquidity_pools_test.go b/services/horizon/internal/db2/history/liquidity_pools_test.go index eb95e35a3e..fd268d2518 100644 --- a/services/horizon/internal/db2/history/liquidity_pools_test.go +++ b/services/horizon/internal/db2/history/liquidity_pools_test.go @@ -1,6 +1,7 @@ package history import ( + "sort" "testing" "github.com/stellar/go/services/horizon/internal/db2" @@ -90,6 +91,33 @@ func TestRemoveLiquidityPool(t *testing.T) { tt.Assert.Equal(lp, lpObtained) } +func TestStreamAllLiquidity(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + lp := MakeTestPool(usdAsset, 450, xlmAsset, 450) + otherLP := MakeTestPool(usdAsset, 10, eurAsset, 20) + expected := []LiquidityPool{lp, otherLP} + sort.Slice(expected, func(i, j int) bool { + return expected[i].PoolID < expected[j].PoolID + }) + + err := q.UpsertLiquidityPools(tt.Ctx, expected) + tt.Assert.NoError(err) + + var pools []LiquidityPool + err = q.StreamAllLiquidityPools(tt.Ctx, func(pool LiquidityPool) error { + pools = append(pools, pool) + return nil + }) + sort.Slice(pools, func(i, j int) bool { + return pools[i].PoolID < pools[j].PoolID + }) + tt.Assert.Equal(expected, pools) +} + func TestFindLiquidityPoolsByAssets(t *testing.T) { tt := test.Start(t) defer tt.Finish() diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index 1d10b1bcde..83bf3f8e17 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -91,9 +91,9 @@ func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) err } defer rows.Close() - offer := Offer{} for rows.Next() { + offer := Offer{} if err = rows.StructScan(&offer); err != nil { return errors.Wrap(err, "could not scan row into offer struct") } diff --git a/services/horizon/internal/db2/history/offers_test.go b/services/horizon/internal/db2/history/offers_test.go index 85951bfa1c..0aac2522f7 100644 --- a/services/horizon/internal/db2/history/offers_test.go +++ b/services/horizon/internal/db2/history/offers_test.go @@ -18,6 +18,21 @@ var ( eurAsset = xdr.MustNewCreditAsset("EUR", issuer.Address()) usdAsset = xdr.MustNewCreditAsset("USD", issuer.Address()) + xlmOffer = Offer{ + SellerID: issuer.Address(), + OfferID: int64(100), + + BuyingAsset: xlmAsset, + SellingAsset: eurAsset, + + Amount: int64(100), + Pricen: int32(2), + Priced: int32(1), + Price: float64(2), + Flags: 1, + LastModifiedLedger: uint32(1234), + } + eurOffer = Offer{ SellerID: issuer.Address(), OfferID: int64(4), @@ -131,6 +146,8 @@ func TestInsertOffers(t *testing.T) { tt.Assert.NoError(err) err = insertOffer(tt, q, twoEurOffer) tt.Assert.NoError(err) + err = insertOffer(tt, q, xlmOffer) + tt.Assert.NoError(err) var offers []Offer err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { @@ -138,19 +155,21 @@ func TestInsertOffers(t *testing.T) { return nil }) tt.Assert.NoError(err) - tt.Assert.Len(offers, 2) + tt.Assert.Len(offers, 3) offersByID := map[int64]Offer{ offers[0].OfferID: offers[0], offers[1].OfferID: offers[1], + offers[2].OfferID: offers[2], } tt.Assert.Equal(offersByID[eurOffer.OfferID], eurOffer) tt.Assert.Equal(offersByID[twoEurOffer.OfferID], twoEurOffer) + tt.Assert.Equal(offersByID[xlmOffer.OfferID], xlmOffer) count, err := q.CountOffers(tt.Ctx) tt.Assert.NoError(err) - tt.Assert.Equal(2, count) + tt.Assert.Equal(3, count) numRemoved, err := q.CompactOffers(tt.Ctx, 12350) tt.Assert.NoError(err) @@ -161,7 +180,7 @@ func TestInsertOffers(t *testing.T) { afterCompactionCount, err := q.CountOffers(tt.Ctx) tt.Assert.NoError(err) - tt.Assert.Equal(2, afterCompactionCount) + tt.Assert.Equal(3, afterCompactionCount) var afterCompactionOffers []Offer err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error { @@ -169,7 +188,7 @@ func TestInsertOffers(t *testing.T) { return nil }) tt.Assert.NoError(err) - tt.Assert.Len(afterCompactionOffers, 2) + tt.Assert.Len(afterCompactionOffers, 3) } func TestUpdateOffer(t *testing.T) {