Skip to content

Commit

Permalink
stellar#1622: pass structs by value rather than by ptr on offer/liqui…
Browse files Browse the repository at this point in the history
…dity pool callback funcs
  • Loading branch information
sreuland committed Jan 3, 2022
1 parent df51071 commit 8972dc7
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 57 deletions.
8 changes: 4 additions & 4 deletions services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (lpar *LiquidityPoolAssetReserve) UnmarshalJSON(data []byte) error {
type QLiquidityPools interface {
UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error
GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]LiquidityPool, error)
StreamAllLiquidityPools(ctx context.Context, callback func(*LiquidityPool) error) error
StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error
CountLiquidityPools(ctx context.Context) (int, error)
FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error)
GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error)
Expand Down Expand Up @@ -187,7 +187,7 @@ func (q *Q) GetLiquidityPools(ctx context.Context, query LiquidityPoolsQuery) ([
return results, nil
}

func (q *Q) StreamAllLiquidityPools(ctx context.Context, callback func(*LiquidityPool) error) error {
func (q *Q) StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error {
var rows *sqlx.Rows
var err error

Expand All @@ -196,10 +196,10 @@ func (q *Q) StreamAllLiquidityPools(ctx context.Context, callback func(*Liquidit
}

defer rows.Close()
liquidityPool := &LiquidityPool{}
liquidityPool := LiquidityPool{}

for rows.Next() {
if err = rows.StructScan(liquidityPool); err != nil {
if err = rows.StructScan(&liquidityPool); err != nil {
return errors.Wrap(err, "could not scan row into liquidity pool struct")
}
if err = callback(liquidityPool); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/db2/history/liquidity_pools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func TestFindLiquidityPoolsByAssets(t *testing.T) {

pool := lps[0]
lps = nil
err = q.StreamAllLiquidityPools(tt.Ctx, func(liqudityPool *LiquidityPool) error {
lps = append(lps, *liqudityPool)
err = q.StreamAllLiquidityPools(tt.Ctx, func(liqudityPool LiquidityPool) error {
lps = append(lps, liqudityPool)
return nil
})
tt.Assert.NoError(err)
Expand Down Expand Up @@ -210,8 +210,8 @@ func TestLiquidityPoolCompaction(t *testing.T) {
tt.Assert.Len(lps, 0)

lps = nil
err = q.StreamAllLiquidityPools(tt.Ctx, func(liqudityPool *LiquidityPool) error {
lps = append(lps, *liqudityPool)
err = q.StreamAllLiquidityPools(tt.Ctx, func(liqudityPool LiquidityPool) error {
lps = append(lps, liqudityPool)
return nil
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (m *MockQLiquidityPools) FindLiquidityPoolByID(ctx context.Context, liquidi
return a.Get(0).(LiquidityPool), a.Error(1)
}

func (m *MockQLiquidityPools) StreamAllLiquidityPools(ctx context.Context, callback func(*LiquidityPool) error) error {
func (m *MockQLiquidityPools) StreamAllLiquidityPools(ctx context.Context, callback func(LiquidityPool) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/db2/history/mock_q_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type MockQOffers struct {
mock.Mock
}

func (m *MockQOffers) StreamAllOffers(ctx context.Context, callback func(*Offer) error) error {
func (m *MockQOffers) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}
Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// QOffers defines offer related queries.
type QOffers interface {
StreamAllOffers(ctx context.Context, callback func(*Offer) error) error
StreamAllOffers(ctx context.Context, callback func(Offer) error) error
GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error)
CountOffers(ctx context.Context) (int, error)
GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error)
Expand Down Expand Up @@ -82,7 +82,7 @@ func (q *Q) GetOffers(ctx context.Context, query OffersQuery) ([]Offer, error) {
}

// StreamAllOffers loads all non deleted offers
func (q *Q) StreamAllOffers(ctx context.Context, callback func(*Offer) error) error {
func (q *Q) StreamAllOffers(ctx context.Context, callback func(Offer) error) error {
var rows *sqlx.Rows
var err error

Expand All @@ -91,10 +91,10 @@ func (q *Q) StreamAllOffers(ctx context.Context, callback func(*Offer) error) er
}

defer rows.Close()
offer := &Offer{}
offer := Offer{}

for rows.Next() {
if err = rows.StructScan(offer); err != nil {
if err = rows.StructScan(&offer); err != nil {
return errors.Wrap(err, "could not scan row into offer struct")
}

Expand Down
28 changes: 14 additions & 14 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func TestQueryEmptyOffers(t *testing.T) {
q := &Q{tt.HorizonSession()}

var offers []Offer
err := q.StreamAllOffers(tt.Ctx, func(offer *Offer) error {
offers = append(offers, *offer)
err := q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})

Expand Down Expand Up @@ -133,8 +133,8 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer *Offer) error {
offers = append(offers, *offer)
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
Expand Down Expand Up @@ -164,8 +164,8 @@ func TestInsertOffers(t *testing.T) {
tt.Assert.Equal(2, afterCompactionCount)

var afterCompactionOffers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer *Offer) error {
afterCompactionOffers = append(afterCompactionOffers, *offer)
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
afterCompactionOffers = append(afterCompactionOffers, offer)
return nil
})
tt.Assert.NoError(err)
Expand All @@ -182,8 +182,8 @@ func TestUpdateOffer(t *testing.T) {
tt.Assert.NoError(err)

var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer *Offer) error {
offers = append(offers, *offer)
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
Expand All @@ -210,8 +210,8 @@ func TestUpdateOffer(t *testing.T) {
tt.Assert.NoError(err)

offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer *Offer) error {
offers = append(offers, *offer)
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
Expand All @@ -237,8 +237,8 @@ func TestRemoveOffer(t *testing.T) {
err := insertOffer(tt, q, eurOffer)
tt.Assert.NoError(err)
var offers []Offer
err = q.StreamAllOffers(tt.Ctx, func(offer *Offer) error {
offers = append(offers, *offer)
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
Expand All @@ -255,8 +255,8 @@ func TestRemoveOffer(t *testing.T) {
expectedUpdates[0].Deleted = true

offers = nil
err = q.StreamAllOffers(tt.Ctx, func(offer *Offer) error {
offers = append(offers, *offer)
err = q.StreamAllOffers(tt.Ctx, func(offer Offer) error {
offers = append(offers, offer)
return nil
})
tt.Assert.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (m *mockDBQ) GetExpStateInvalid(ctx context.Context) (bool, error) {
return args.Get(0).(bool), args.Error(1)
}

func (m *mockDBQ) StreamAllOffers(ctx context.Context, callback func(*history.Offer) error) error {
func (m *mockDBQ) StreamAllOffers(ctx context.Context, callback func(history.Offer) error) error {
a := m.Called(ctx, callback)
return a.Error(0)
}
Expand Down
22 changes: 11 additions & 11 deletions services/horizon/internal/ingest/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,18 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b

defer o.graph.Discard()

err := o.historyQ.StreamAllOffers(ctx, func(offer *history.Offer) error {
o.graph.AddOffers(offerToXDR(*offer))
err := o.historyQ.StreamAllOffers(ctx, func(offer history.Offer) error {
o.graph.AddOffers(offerToXDR(offer))
return nil
})

if err != nil {
return true, errors.Wrap(err, "Error loading offers into orderbook")
}

err = o.historyQ.StreamAllLiquidityPools(ctx, func(liquidityPool *history.LiquidityPool) error {
if liquidityPoolXDR, liquidityPoolErr := liquidityPoolToXDR(*liquidityPool); liquidityPoolErr != nil {
return errors.Wrapf(liquidityPoolErr, "Invalid liquidity pool row %v, unable to marshal to xdr", *liquidityPool)
err = o.historyQ.StreamAllLiquidityPools(ctx, func(liquidityPool history.LiquidityPool) error {
if liquidityPoolXDR, liquidityPoolErr := liquidityPoolToXDR(liquidityPool); liquidityPoolErr != nil {
return errors.Wrapf(liquidityPoolErr, "Invalid liquidity pool row %v, unable to marshal to xdr", liquidityPool)
} else {
o.graph.AddLiquidityPools(liquidityPoolXDR)
return nil
Expand Down Expand Up @@ -209,8 +209,8 @@ func (o *OrderBookStream) update(ctx context.Context, status ingestionStatus) (b
}

func (o *OrderBookStream) verifyAllOffers(ctx context.Context, offers []xdr.OfferEntry) (bool, error) {
var ingestionOffers []*history.Offer
err := o.historyQ.StreamAllOffers(ctx, func(offer *history.Offer) error {
var ingestionOffers []history.Offer
err := o.historyQ.StreamAllOffers(ctx, func(offer history.Offer) error {
ingestionOffers = append(ingestionOffers, offer)
return nil
})
Expand All @@ -231,7 +231,7 @@ func (o *OrderBookStream) verifyAllOffers(ctx context.Context, offers []xdr.Offe

for i, offerRow := range ingestionOffers {
offerEntry := offers[i]
offerRowXDR := offerToXDR(*offerRow)
offerRowXDR := offerToXDR(offerRow)
offerEntryBase64, err := o.encodingBuffer.MarshalBase64(&offerEntry)
if err != nil {
return false, errors.Wrap(err, "Error from marshalling offerEntry")
Expand All @@ -258,9 +258,9 @@ func (o *OrderBookStream) verifyAllOffers(ctx context.Context, offers []xdr.Offe
}

func (o *OrderBookStream) verifyAllLiquidityPools(ctx context.Context, liquidityPools []xdr.LiquidityPoolEntry) (bool, error) {
var ingestionLiquidityPools []*history.LiquidityPool
var ingestionLiquidityPools []history.LiquidityPool

err := o.historyQ.StreamAllLiquidityPools(ctx, func(liquidityPool *history.LiquidityPool) error {
err := o.historyQ.StreamAllLiquidityPools(ctx, func(liquidityPool history.LiquidityPool) error {
ingestionLiquidityPools = append(ingestionLiquidityPools, liquidityPool)
return nil
})
Expand All @@ -282,7 +282,7 @@ func (o *OrderBookStream) verifyAllLiquidityPools(ctx context.Context, liquidity

for i, liquidityPoolRow := range ingestionLiquidityPools {
liquidityPoolEntry := liquidityPools[i]
liquidityPoolRowXDR, err := liquidityPoolToXDR(*liquidityPoolRow)
liquidityPoolRowXDR, err := liquidityPoolToXDR(liquidityPoolRow)
if err != nil {
return false, errors.Wrap(err, "Error from converting liquidity pool row to xdr")
}
Expand Down
34 changes: 17 additions & 17 deletions services/horizon/internal/ingest/orderbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,9 @@ func (t *UpdateOrderBookStreamTestSuite) TestResetApplyError() {
t.historyQ.On("StreamAllOffers", t.ctx, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
callback := args.Get(1).(func(offer *history.Offer) error)
callback(&offer)
callback(&otherOffer)
callback := args.Get(1).(func(offer history.Offer) error)
callback(offer)
callback(otherOffer)
}).
Once()

Expand Down Expand Up @@ -326,9 +326,9 @@ func (t *UpdateOrderBookStreamTestSuite) mockReset(status ingestionStatus) {
t.historyQ.On("StreamAllOffers", t.ctx, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
callback := args.Get(1).(func(offer *history.Offer) error)
callback := args.Get(1).(func(offer history.Offer) error)
for idx := range offers {
callback(&offers[idx])
callback(offers[idx])
}
}).
Once()
Expand Down Expand Up @@ -685,9 +685,9 @@ func (t *VerifyOffersStreamTestSuite) TestLengthMismatch() {
t.historyQ.On("StreamAllOffers", t.ctx, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
callback := args.Get(1).(func(offer *history.Offer) error)
callback := args.Get(1).(func(offer history.Offer) error)
for idx := range offers {
callback(&offers[idx])
callback(offers[idx])
}
}).
Once()
Expand Down Expand Up @@ -730,9 +730,9 @@ func (t *VerifyOffersStreamTestSuite) TestContentMismatch() {
t.historyQ.On("StreamAllOffers", t.ctx, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
callback := args.Get(1).(func(offer *history.Offer) error)
callback := args.Get(1).(func(offer history.Offer) error)
for idx := range offers {
callback(&offers[idx])
callback(offers[idx])
}
}).
Once()
Expand Down Expand Up @@ -775,9 +775,9 @@ func (t *VerifyOffersStreamTestSuite) TestSuccess() {
t.historyQ.On("StreamAllOffers", t.ctx, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
callback := args.Get(1).(func(offer *history.Offer) error)
callback := args.Get(1).(func(offer history.Offer) error)
for idx := range offers {
callback(&offers[idx])
callback(offers[idx])
}
}).
Once()
Expand Down Expand Up @@ -894,9 +894,9 @@ func (t *VerifyLiquidityPoolsStreamTestSuite) TestLengthMismatch() {
t.historyQ.MockQLiquidityPools.On("StreamAllLiquidityPools", t.ctx, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
callback := args.Get(1).(func(offer *history.LiquidityPool) error)
callback := args.Get(1).(func(offer history.LiquidityPool) error)
for idx := range liquidityPools {
callback(&liquidityPools[idx])
callback(liquidityPools[idx])
}
}).
Once()
Expand Down Expand Up @@ -950,9 +950,9 @@ func (t *VerifyLiquidityPoolsStreamTestSuite) TestContentMismatch() {
t.historyQ.MockQLiquidityPools.On("StreamAllLiquidityPools", t.ctx, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
callback := args.Get(1).(func(offer *history.LiquidityPool) error)
callback := args.Get(1).(func(offer history.LiquidityPool) error)
for idx := range liquidityPools {
callback(&liquidityPools[idx])
callback(liquidityPools[idx])
}
}).
Once()
Expand Down Expand Up @@ -1006,9 +1006,9 @@ func (t *VerifyLiquidityPoolsStreamTestSuite) TestSuccess() {
t.historyQ.MockQLiquidityPools.On("StreamAllLiquidityPools", t.ctx, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
callback := args.Get(1).(func(*history.LiquidityPool) error)
callback := args.Get(1).(func(history.LiquidityPool) error)
for idx := range liquidityPools {
callback(&liquidityPools[idx])
callback(liquidityPools[idx])
}
}).
Once()
Expand Down

0 comments on commit 8972dc7

Please sign in to comment.