Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal: Include liquidity pools in OrderBookStream #3909

Merged
merged 1 commit into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions exp/orderbook/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@ const (
_ = iota
// the operationType enum values start at 1 because when constructing a
// orderBookOperation struct, the operationType field should always be specified
// explicity. if the operationType enum values started at 0 then it would be
// explicitly. if the operationType enum values started at 0 then it would be
// possible to create a valid orderBookOperation struct without specifying
// the operationType field
addOfferOperationType = iota
removeOfferOperationType = iota
addOfferOperationType = iota
removeOfferOperationType = iota
addLiquidityPoolOperationType = iota
removeLiquidityPoolOperationType = iota
)

type orderBookOperation struct {
operationType int
offerID xdr.Int64
offer *xdr.OfferEntry
operationType int
offerID xdr.Int64
offer *xdr.OfferEntry
liquidityPoolAssets tradingPair
liquidityPool *xdr.LiquidityPoolEntry
}

type orderBookBatchedUpdates struct {
Expand All @@ -39,6 +43,21 @@ func (tx *orderBookBatchedUpdates) addOffer(offer xdr.OfferEntry) *orderBookBatc
return tx
}

// addLiquidityPool will queue an operation to add the given liquidity pool to the order book graph
func (tx *orderBookBatchedUpdates) addLiquidityPool(liquidityPool xdr.LiquidityPoolEntry) *orderBookBatchedUpdates {
params := liquidityPool.Body.MustConstantProduct().Params
tx.operations = append(tx.operations, orderBookOperation{
operationType: addLiquidityPoolOperationType,
liquidityPool: &liquidityPool,
liquidityPoolAssets: tradingPair{
buyingAsset: params.AssetA.String(),
sellingAsset: params.AssetB.String(),
},
})

return tx
}

// removeOffer will queue an operation to remove the given offer from the order book
func (tx *orderBookBatchedUpdates) removeOffer(offerID xdr.Int64) *orderBookBatchedUpdates {
tx.operations = append(tx.operations, orderBookOperation{
Expand All @@ -49,6 +68,16 @@ func (tx *orderBookBatchedUpdates) removeOffer(offerID xdr.Int64) *orderBookBatc
return tx
}

// removeLiquidityPool will queue an operation to remove the given liquidity pool from the order book
func (tx *orderBookBatchedUpdates) removeLiquidityPool(liquidityPoolAssets tradingPair) *orderBookBatchedUpdates {
tx.operations = append(tx.operations, orderBookOperation{
operationType: removeLiquidityPoolOperationType,
liquidityPoolAssets: liquidityPoolAssets,
})

return tx
}

// apply will attempt to apply all the updates in the batch to the order book
func (tx *orderBookBatchedUpdates) apply(ledger uint32) error {
tx.orderbook.lock.Lock()
Expand Down Expand Up @@ -77,6 +106,13 @@ func (tx *orderBookBatchedUpdates) apply(ledger uint32) error {
if err := tx.orderbook.remove(operation.offerID); err != nil {
panic(errors.Wrap(err, "could not apply update in batch"))
}
case addLiquidityPoolOperationType:
tx.orderbook.liquidityPools[operation.liquidityPoolAssets] = *operation.liquidityPool
case removeLiquidityPoolOperationType:
if _, ok := tx.orderbook.liquidityPools[operation.liquidityPoolAssets]; !ok {
panic(errors.New("liquidity pool not present in orderbook graph"))
}
delete(tx.orderbook.liquidityPools, operation.liquidityPoolAssets)
default:
panic(errors.New("invalid operation type"))
}
Expand Down
38 changes: 37 additions & 1 deletion exp/orderbook/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ type tradingPair struct {
// OBGraph is an interface for orderbook graphs
type OBGraph interface {
AddOffer(offer xdr.OfferEntry)
AddLiquidityPool(liquidityPool xdr.LiquidityPoolEntry)
Apply(ledger uint32) error
Discard()
Offers() []xdr.OfferEntry
LiquidityPools() []xdr.LiquidityPoolEntry
OffersMap() map[xdr.Int64]xdr.OfferEntry
RemoveOffer(xdr.Int64) OBGraph
RemoveLiquidityPool(params xdr.LiquidityPoolConstantProductParameters) OBGraph
Pending() ([]xdr.OfferEntry, []xdr.Int64)
Clear()
}
Expand All @@ -58,6 +61,9 @@ type OrderBookGraph struct {
// tradingPairForOffer maps an offer id to the assets which are being exchanged
// in the given offer
tradingPairForOffer map[xdr.Int64]tradingPair
// liquidityPools maps a trading pair to the liquidity pool which contains those
// assets in its reserves
liquidityPools map[tradingPair]xdr.LiquidityPoolEntry
// batchedUpdates is internal batch of updates to this graph. Users can
// create multiple batches using `Batch()` method but sometimes only one
// batch is enough.
Expand All @@ -75,6 +81,7 @@ func NewOrderBookGraph() *OrderBookGraph {
edgesForSellingAsset: map[string]edgeSet{},
edgesForBuyingAsset: map[string]edgeSet{},
tradingPairForOffer: map[xdr.Int64]tradingPair{},
liquidityPools: map[tradingPair]xdr.LiquidityPoolEntry{},
}

graph.batchedUpdates = graph.batch()
Expand All @@ -88,6 +95,13 @@ func (graph *OrderBookGraph) AddOffer(offer xdr.OfferEntry) {
graph.batchedUpdates.addOffer(offer)
}

// AddLiquidityPool will queue an operation to add the given liquidity pool to
// the order book graph in the internal batch.
// You need to run Apply() to apply all enqueued operations.
func (graph *OrderBookGraph) AddLiquidityPool(liquidityPool xdr.LiquidityPoolEntry) {
graph.batchedUpdates.addLiquidityPool(liquidityPool)
}

// RemoveOffer will queue an operation to remove the given offer from the order book in
// the internal batch.
// You need to run Apply() to apply all enqueued operations.
Expand All @@ -96,6 +110,14 @@ func (graph *OrderBookGraph) RemoveOffer(offerID xdr.Int64) OBGraph {
return graph
}

func (graph *OrderBookGraph) RemoveLiquidityPool(params xdr.LiquidityPoolConstantProductParameters) OBGraph {
graph.batchedUpdates.removeLiquidityPool(tradingPair{
buyingAsset: params.AssetA.String(),
sellingAsset: params.AssetB.String(),
})
return graph
}

// Pending returns a list of queued offers which will be added to the order book and
// a list of queued offers which will be removed from the order book.
func (graph *OrderBookGraph) Pending() ([]xdr.OfferEntry, []xdr.Int64) {
Expand Down Expand Up @@ -132,7 +154,7 @@ func (graph *OrderBookGraph) Offers() []xdr.OfferEntry {
graph.lock.RLock()
defer graph.lock.RUnlock()

offers := []xdr.OfferEntry{}
var offers []xdr.OfferEntry
for _, edges := range graph.edgesForSellingAsset {
for _, offersForEdge := range edges {
offers = append(offers, offersForEdge...)
Expand All @@ -142,6 +164,19 @@ func (graph *OrderBookGraph) Offers() []xdr.OfferEntry {
return offers
}

// LiquidityPools returns a list of liquidity pools contained in the order book graph
func (graph *OrderBookGraph) LiquidityPools() []xdr.LiquidityPoolEntry {
graph.lock.RLock()
defer graph.lock.RUnlock()

var liquidityPools []xdr.LiquidityPoolEntry
for _, liquidityPool := range graph.liquidityPools {
liquidityPools = append(liquidityPools, liquidityPool)
}

return liquidityPools
}

// Clear removes all offers from the graph.
func (graph *OrderBookGraph) Clear() {
graph.lock.Lock()
Expand All @@ -150,6 +185,7 @@ func (graph *OrderBookGraph) Clear() {
graph.edgesForSellingAsset = map[string]edgeSet{}
graph.edgesForBuyingAsset = map[string]edgeSet{}
graph.tradingPairForOffer = map[xdr.Int64]tradingPair{}
graph.liquidityPools = map[tradingPair]xdr.LiquidityPoolEntry{}
graph.batchedUpdates = graph.batch()
graph.lastLedger = 0
}
Expand Down
102 changes: 102 additions & 0 deletions exp/orderbook/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding"
"math"
"sort"
"testing"

"github.com/stellar/go/keypair"
Expand Down Expand Up @@ -493,6 +494,107 @@ func TestAddOfferOrderBook(t *testing.T) {
assertGraphEquals(t, graph, expectedGraph)
}

func setupGraphWithLiquidityPools(t *testing.T) (*OrderBookGraph, []xdr.LiquidityPoolEntry) {
graph := NewOrderBookGraph()
nativeEURPool := xdr.LiquidityPoolEntry{
LiquidityPoolId: xdr.PoolId{1, 2, 3},
Body: xdr.LiquidityPoolEntryBody{
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
ConstantProduct: &xdr.LiquidityPoolEntryConstantProduct{
Params: xdr.LiquidityPoolConstantProductParameters{
AssetA: xdr.MustNewNativeAsset(),
AssetB: eurAsset,
Fee: xdr.LiquidityPoolFeeV18,
},
ReserveA: 23,
ReserveB: 15,
TotalPoolShares: 123,
PoolSharesTrustLineCount: 22,
},
},
}
nativeUSDPool := xdr.LiquidityPoolEntry{
LiquidityPoolId: xdr.PoolId{9, 23, 5},
Body: xdr.LiquidityPoolEntryBody{
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
ConstantProduct: &xdr.LiquidityPoolEntryConstantProduct{
Params: xdr.LiquidityPoolConstantProductParameters{
AssetA: xdr.MustNewNativeAsset(),
AssetB: usdAsset,
Fee: xdr.LiquidityPoolFeeV18,
},
ReserveA: 3,
ReserveB: 8,
TotalPoolShares: 12,
PoolSharesTrustLineCount: 100,
},
},
}
graph.AddLiquidityPool(nativeEURPool)
graph.AddLiquidityPool(nativeUSDPool)
if err := graph.Apply(1); err != nil {
t.Fatalf("unexpected apply error %v", err)
}

expectedLiquidityPools := []xdr.LiquidityPoolEntry{nativeEURPool, nativeUSDPool}
return graph, expectedLiquidityPools
}

func assertLiquidityPoolsEqual(t *testing.T, expectedLiquidityPools, liquidityPools []xdr.LiquidityPoolEntry) {
sort.Slice(liquidityPools, func(i, j int) bool {
return liquidityPools[i].Body.MustConstantProduct().Params.AssetB.String() <
liquidityPools[j].Body.MustConstantProduct().Params.AssetB.String()
})

for i, expected := range expectedLiquidityPools {
liquidityPool := liquidityPools[i]
liquidityPoolBase64, err := xdr.MarshalBase64(liquidityPool)
if err != nil {
t.Fatalf("unexpected marshall error %v", err)
}
expectedBase64, err := xdr.MarshalBase64(expected)
if err != nil {
t.Fatalf("unexpected marshall error %v", err)
}
if expectedBase64 != liquidityPoolBase64 {
t.Fatalf("pool mismatch: %v != %v", expected, liquidityPoolBase64)
}
}
}

func TestAddLiquidityPool(t *testing.T) {
graph, expectedLiquidityPools := setupGraphWithLiquidityPools(t)
assertLiquidityPoolsEqual(t, expectedLiquidityPools, graph.LiquidityPools())
}

func TestUpdateLiquidityPools(t *testing.T) {
graph, expectedLiquidityPools := setupGraphWithLiquidityPools(t)
expectedLiquidityPools[0].Body.ConstantProduct.ReserveA += 100
expectedLiquidityPools[1].Body.ConstantProduct.ReserveB -= 2

graph.AddLiquidityPool(expectedLiquidityPools[0])
graph.AddLiquidityPool(expectedLiquidityPools[1])
if err := graph.Apply(2); err != nil {
t.Fatalf("unexpected error %v", err)
}

assertLiquidityPoolsEqual(t, expectedLiquidityPools, graph.LiquidityPools())
}

func TestRemoveLiquidityPools(t *testing.T) {
graph, expectedLiquidityPools := setupGraphWithLiquidityPools(t)
expectedLiquidityPools[0].Body.ConstantProduct.ReserveA += 100

graph.AddLiquidityPool(expectedLiquidityPools[0])
graph.RemoveLiquidityPool(expectedLiquidityPools[1].Body.MustConstantProduct().Params)

if err := graph.Apply(2); err != nil {
t.Fatalf("unexpected error %v", err)
}

assertLiquidityPoolsEqual(t, expectedLiquidityPools[:1], graph.LiquidityPools())
}

func TestUpdateOfferOrderBook(t *testing.T) {
graph := NewOrderBookGraph()

Expand Down
10 changes: 10 additions & 0 deletions services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type QLiquidityPools interface {
UpdateLiquidityPool(ctx context.Context, lp LiquidityPool) (int64, error)
RemoveLiquidityPool(ctx context.Context, liquidityPoolID string, lastModifiedLedger uint32) (int64, error)
GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]LiquidityPool, error)
GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error)
CountLiquidityPools(ctx context.Context) (int, error)
FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error)
GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error)
Expand Down Expand Up @@ -199,6 +200,15 @@ func (q *Q) GetLiquidityPools(ctx context.Context, query LiquidityPoolsQuery) ([
return results, nil
}

func (q *Q) GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error) {
var results []LiquidityPool
if err := q.Select(ctx, &results, selectLiquidityPools.Where("deleted = ?", false)); err != nil {
return nil, errors.Wrap(err, "could not run select query")
}

return results, nil
}

// GetUpdatedLiquidityPools returns all liquidity pools created, updated, or deleted after the given ledger sequence.
func (q *Q) GetUpdatedLiquidityPools(ctx context.Context, newerThanSequence uint32) ([]LiquidityPool, error) {
var pools []LiquidityPool
Expand Down
10 changes: 10 additions & 0 deletions services/horizon/internal/db2/history/liquidity_pools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ func TestFindLiquidityPoolsByAssets(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(lps, 1)

pool := lps[0]
lps, err = q.GetAllLiquidityPools(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.Len(lps, 1)
tt.Assert.Equal(pool, lps[0])

// query by one asset
query = LiquidityPoolsQuery{
PageQuery: db2.MustPageQuery("", false, "", 10),
Expand Down Expand Up @@ -279,6 +285,10 @@ func TestLiquidityPoolCompaction(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(lps, 0)

lps, err = q.GetAllLiquidityPools(tt.Ctx)
tt.Assert.NoError(err)
tt.Assert.Len(lps, 0)

err = q.Select(tt.Ctx, &lps, selectLiquidityPools)
tt.Assert.NoError(err)
tt.Assert.Len(lps, 1)
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ type IngestionQ interface {
GetExpStateInvalid(context.Context) (bool, error)
GetLatestHistoryLedger(context.Context) (uint32, error)
GetOfferCompactionSequence(context.Context) (uint32, error)
GetLiquidityPoolCompactionSequence(context.Context) (uint32, error)
TruncateIngestStateTables(context.Context) error
DeleteRangeAll(ctx context.Context, start, end int64) error
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (m *MockQLiquidityPools) FindLiquidityPoolByID(ctx context.Context, liquidi
return a.Get(0).(LiquidityPool), a.Error(1)
}

func (m *MockQLiquidityPools) GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error) {
a := m.Called(ctx)
return a.Get(0).([]LiquidityPool), a.Error(1)
}

func (m *MockQLiquidityPools) GetUpdatedLiquidityPools(ctx context.Context, sequence uint32) ([]LiquidityPool, error) {
a := m.Called(ctx, sequence)
return a.Get(0).([]LiquidityPool), a.Error(1)
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ func (m *mockDBQ) GetOfferCompactionSequence(ctx context.Context) (uint32, error
return args.Get(0).(uint32), args.Error(1)
}

func (m *mockDBQ) GetLiquidityPoolCompactionSequence(ctx context.Context) (uint32, error) {
args := m.Called(ctx)
return args.Get(0).(uint32), args.Error(1)
}

func (m *mockDBQ) GetLastLedgerIngestNonBlocking(ctx context.Context) (uint32, error) {
args := m.Called(ctx)
return args.Get(0).(uint32), args.Error(1)
Expand Down
Loading