Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/db2/history: Improve /trades DB queries performance #2869

Merged
merged 3 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,19 @@ type Trade struct {
// TradesQ is a helper struct to aid in configuring queries that loads
// slices of trade structs.
type TradesQ struct {
Err error
parent *Q
sql sq.SelectBuilder
Err error
parent *Q
sql sq.SelectBuilder
pageCalled bool

// For queries for account and offer we construct UNION query. The alternative
// is to use (base = X OR counter = X) query but it's costly.
forAccountID int64
forOfferID int64

// rawSQL will be executed if present (instead of sql - sq.SelectBuilder).
rawSQL string
rawArgs []interface{}
}

// Transaction is a row of data from the `history_transactions` table
Expand Down
97 changes: 78 additions & 19 deletions services/horizon/internal/db2/history/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/toid"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ func (q *Q) TradesForAssetPair(baseAssetId int64, counterAssetId int64) *TradesQ

// ForOffer filters the query results by the offer id.
func (q *TradesQ) ForOffer(id int64) *TradesQ {
q.sql = q.sql.Where("(htrd.base_offer_id = ? OR htrd.counter_offer_id = ?)", id, id)
q.forOfferID = id
return q
}

Expand Down Expand Up @@ -99,7 +100,7 @@ func (q *TradesQ) ForAccount(aid string) *TradesQ {
return q
}

q.sql = q.sql.Where("(htrd.base_account_id = ? OR htrd.counter_account_id = ?)", account.ID, account.ID)
q.forAccountID = account.ID
return q
}

Expand All @@ -120,42 +121,100 @@ func (q *TradesQ) Page(page db2.PageQuery) *TradesQ {
idx = math.MaxInt32
}

q.pageCalled = true

if q.forAccountID != 0 || q.forOfferID != 0 {
// Construct UNION query
var firstSelect, secondSelect sq.SelectBuilder
switch {
case q.forAccountID != 0:
firstSelect = q.sql.Where("htrd.base_account_id = ?", q.forAccountID)
secondSelect = q.sql.Where("htrd.counter_account_id = ?", q.forAccountID)
case q.forOfferID != 0:
firstSelect = q.sql.Where("htrd.base_offer_id = ?", q.forOfferID)
secondSelect = q.sql.Where("htrd.counter_offer_id = ?", q.forOfferID)
}

firstSelect = q.appendOrdering(firstSelect, op, idx, page.Order)
secondSelect = q.appendOrdering(secondSelect, op, idx, page.Order)

firstSQL, firstArgs, err := firstSelect.ToSql()
if err != nil {
q.Err = errors.New("error building a firstSelect query")
return q
}
secondSQL, secondArgs, err := secondSelect.ToSql()
if err != nil {
q.Err = errors.New("error building a secondSelect query")
return q
}

q.rawSQL = fmt.Sprintf("(%s) UNION (%s) ", firstSQL, secondSQL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

take a look at TransactionByHash() to see an alternative way to construct the UNION query using squirrel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my first approach but it doesn't work when you want to add ORDER BY and/or LIMIT to UNION (requires brackets).

q.rawArgs = append(q.rawArgs, firstArgs...)
q.rawArgs = append(q.rawArgs, secondArgs...)
// Order the final UNION:
switch page.Order {
case "asc":
q.rawSQL = q.rawSQL + `ORDER BY history_operation_id asc, "order" asc `
case "desc":
q.rawSQL = q.rawSQL + `ORDER BY history_operation_id desc, "order" desc `
default:
panic("Invalid order")
}
q.rawSQL = q.rawSQL + fmt.Sprintf("LIMIT %d", page.Limit)
// Reset sql so it's not used accidentally
q.sql = sq.SelectBuilder{}
} else {
q.sql = q.appendOrdering(q.sql, op, idx, page.Order)
q.sql = q.sql.Limit(page.Limit)
}
return q
}

func (q *TradesQ) appendOrdering(sel sq.SelectBuilder, op, idx int64, order string) sq.SelectBuilder {
// NOTE: Remember to test the queries below with EXPLAIN / EXPLAIN ANALYZE
// before changing them.
// This condition is using multicolumn index and it's easy to write it in a way that
// DB will perform a full table scan.
switch page.Order {
switch order {
case "asc":
q.sql = q.sql.
return sel.
Where(`(
htrd.history_operation_id >= ?
AND (
htrd.history_operation_id > ? OR
(htrd.history_operation_id = ? AND htrd.order > ?)
))`, op, op, op, idx).
htrd.history_operation_id >= ?
AND (
htrd.history_operation_id > ? OR
(htrd.history_operation_id = ? AND htrd.order > ?)
))`, op, op, op, idx).
OrderBy("htrd.history_operation_id asc, htrd.order asc")
case "desc":
q.sql = q.sql.
return sel.
Where(`(
htrd.history_operation_id <= ?
AND (
htrd.history_operation_id < ? OR
(htrd.history_operation_id = ? AND htrd.order < ?)
))`, op, op, op, idx).
htrd.history_operation_id <= ?
AND (
htrd.history_operation_id < ? OR
(htrd.history_operation_id = ? AND htrd.order < ?)
))`, op, op, op, idx).
OrderBy("htrd.history_operation_id desc, htrd.order desc")
default:
panic("Invalid order")
}

q.sql = q.sql.Limit(page.Limit)
return q
}

// Select loads the results of the query specified by `q` into `dest`.
func (q *TradesQ) Select(dest interface{}) error {
if !q.pageCalled {
return errors.New("TradesQ.Page call is required before calling Select")
}

if q.Err != nil {
return q.Err
}

q.Err = q.parent.Select(dest, q.sql)
if q.rawSQL != "" {
q.Err = q.parent.SelectRaw(dest, q.rawSQL, q.rawArgs...)
} else {
q.Err = q.parent.Select(dest, q.sql)
}
return q.Err
}

Expand Down
97 changes: 93 additions & 4 deletions services/horizon/internal/db2/history/trade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func TestTradeQueries(t *testing.T) {
assetEUR, err := q.GetAssetID(xdr.MustNewCreditAsset("EUR", "GAXMF43TGZHW3QN3REOUA2U5PW5BTARXGGYJ3JIFHW3YT6QRKRL3CPPU"))
tt.Require.NoError(err)

err = q.TradesForAssetPair(assetUSD, assetEUR).Select(&trades)
err = q.TradesForAssetPair(assetUSD, assetEUR).Page(db2.MustPageQuery("", false, "asc", 100)).Select(&trades)
tt.Require.NoError(err)
tt.Assert.Len(trades, 0)

assetUSD, err = q.GetAssetID(xdr.MustNewCreditAsset("USD", "GAXMF43TGZHW3QN3REOUA2U5PW5BTARXGGYJ3JIFHW3YT6QRKRL3CPPU"))
tt.Require.NoError(err)

err = q.TradesForAssetPair(lumen, assetUSD).Select(&trades)
err = q.TradesForAssetPair(lumen, assetUSD).Page(db2.MustPageQuery("", false, "asc", 100)).Select(&trades)
tt.Require.NoError(err)
tt.Assert.Len(trades, 1)

Expand All @@ -64,7 +64,7 @@ func TestTradeQueries(t *testing.T) {
tt.Assert.Equal(true, trades[0].BaseIsSeller)

// reverse assets
err = q.TradesForAssetPair(assetUSD, lumen).Select(&trades)
err = q.TradesForAssetPair(assetUSD, lumen).Page(db2.MustPageQuery("", false, "asc", 100)).Select(&trades)
tt.Require.NoError(err)
tt.Assert.Len(trades, 1)

Expand Down Expand Up @@ -199,7 +199,7 @@ func TestBatchInsertTrade(t *testing.T) {
tt.Assert.NoError(builder.Exec())

var rows []Trade
tt.Assert.NoError(q.Trades().Select(&rows))
tt.Assert.NoError(q.Trades().Page(db2.MustPageQuery("", false, "asc", 100)).Select(&rows))

idToAccount := buildIDtoAccountMapping(addresses, accountIDs)
idToAsset := buildIDtoAssetMapping(assets, assetIDs)
Expand Down Expand Up @@ -313,3 +313,92 @@ func TestBatchInsertTrade(t *testing.T) {
)
}
}

func TestTradesQueryForAccount(t *testing.T) {
tt := test.Start(t).Scenario("kahuna")
defer tt.Finish()
q := &Q{tt.HorizonSession()}
tradesQ := q.Trades()
var trades []Trade

account := "GAXMF43TGZHW3QN3REOUA2U5PW5BTARXGGYJ3JIFHW3YT6QRKRL3CPPU"
tradesQ.ForAccount(account)
tt.Assert.Equal(int64(15), tradesQ.forAccountID)
tt.Assert.Equal(int64(0), tradesQ.forOfferID)

tradesQ.Page(db2.MustPageQuery("", false, "desc", 100))
_, _, err := tradesQ.sql.ToSql()
// q.sql was reset in Page so should return error
tt.Assert.EqualError(err, "select statements must have at least one result column")

expectedRawSQL := `(SELECT history_operation_id, htrd."order", htrd.ledger_closed_at, htrd.offer_id, htrd.base_offer_id, base_accounts.address as base_account, base_assets.asset_type as base_asset_type, base_assets.asset_code as base_asset_code, base_assets.asset_issuer as base_asset_issuer, htrd.base_amount, htrd.counter_offer_id, counter_accounts.address as counter_account, counter_assets.asset_type as counter_asset_type, counter_assets.asset_code as counter_asset_code, counter_assets.asset_issuer as counter_asset_issuer, htrd.counter_amount, htrd.base_is_seller, htrd.price_n, htrd.price_d FROM history_trades htrd JOIN history_accounts base_accounts ON base_account_id = base_accounts.id JOIN history_accounts counter_accounts ON counter_account_id = counter_accounts.id JOIN history_assets base_assets ON base_asset_id = base_assets.id JOIN history_assets counter_assets ON counter_asset_id = counter_assets.id WHERE htrd.base_account_id = ? AND (
htrd.history_operation_id <= ?
AND (
htrd.history_operation_id < ? OR
(htrd.history_operation_id = ? AND htrd.order < ?)
)) ORDER BY htrd.history_operation_id desc, htrd.order desc) UNION (SELECT history_operation_id, htrd."order", htrd.ledger_closed_at, htrd.offer_id, htrd.base_offer_id, base_accounts.address as base_account, base_assets.asset_type as base_asset_type, base_assets.asset_code as base_asset_code, base_assets.asset_issuer as base_asset_issuer, htrd.base_amount, htrd.counter_offer_id, counter_accounts.address as counter_account, counter_assets.asset_type as counter_asset_type, counter_assets.asset_code as counter_asset_code, counter_assets.asset_issuer as counter_asset_issuer, htrd.counter_amount, htrd.base_is_seller, htrd.price_n, htrd.price_d FROM history_trades htrd JOIN history_accounts base_accounts ON base_account_id = base_accounts.id JOIN history_accounts counter_accounts ON counter_account_id = counter_accounts.id JOIN history_assets base_assets ON base_asset_id = base_assets.id JOIN history_assets counter_assets ON counter_asset_id = counter_assets.id WHERE htrd.counter_account_id = ? AND (
htrd.history_operation_id <= ?
AND (
htrd.history_operation_id < ? OR
(htrd.history_operation_id = ? AND htrd.order < ?)
)) ORDER BY htrd.history_operation_id desc, htrd.order desc) ORDER BY history_operation_id desc, "order" desc LIMIT 100`
tt.Assert.Equal(expectedRawSQL, tradesQ.rawSQL)

err = tradesQ.Select(&trades)
tt.Assert.NoError(err)
tt.Assert.Len(trades, 3)

// Ensure "desc" order and account present
tt.Assert.Equal(int64(85899350017), trades[0].HistoryOperationID)
tt.Assert.Equal(account, trades[0].BaseAccount)

tt.Assert.Equal(int64(81604382721), trades[1].HistoryOperationID)
tt.Assert.Equal(int32(1), trades[1].Order)
tt.Assert.Equal(account, trades[1].BaseAccount)

tt.Assert.Equal(int64(81604382721), trades[2].HistoryOperationID)
tt.Assert.Equal(int32(0), trades[2].Order)
tt.Assert.Equal(account, trades[2].CounterAccount)
}

func TestTradesQueryForOffer(t *testing.T) {
tt := test.Start(t).Scenario("kahuna")
defer tt.Finish()
q := &Q{tt.HorizonSession()}
tradesQ := q.Trades()
var trades []Trade

offerID := int64(2)
tradesQ.ForOffer(offerID)
tt.Assert.Equal(int64(0), tradesQ.forAccountID)
tt.Assert.Equal(int64(2), tradesQ.forOfferID)

tradesQ.Page(db2.MustPageQuery("", false, "asc", 100))
_, _, err := tradesQ.sql.ToSql()
// q.sql was reset in Page so should return error
tt.Assert.EqualError(err, "select statements must have at least one result column")

expectedRawSQL := `(SELECT history_operation_id, htrd."order", htrd.ledger_closed_at, htrd.offer_id, htrd.base_offer_id, base_accounts.address as base_account, base_assets.asset_type as base_asset_type, base_assets.asset_code as base_asset_code, base_assets.asset_issuer as base_asset_issuer, htrd.base_amount, htrd.counter_offer_id, counter_accounts.address as counter_account, counter_assets.asset_type as counter_asset_type, counter_assets.asset_code as counter_asset_code, counter_assets.asset_issuer as counter_asset_issuer, htrd.counter_amount, htrd.base_is_seller, htrd.price_n, htrd.price_d FROM history_trades htrd JOIN history_accounts base_accounts ON base_account_id = base_accounts.id JOIN history_accounts counter_accounts ON counter_account_id = counter_accounts.id JOIN history_assets base_assets ON base_asset_id = base_assets.id JOIN history_assets counter_assets ON counter_asset_id = counter_assets.id WHERE htrd.base_offer_id = ? AND (
htrd.history_operation_id >= ?
AND (
htrd.history_operation_id > ? OR
(htrd.history_operation_id = ? AND htrd.order > ?)
)) ORDER BY htrd.history_operation_id asc, htrd.order asc) UNION (SELECT history_operation_id, htrd."order", htrd.ledger_closed_at, htrd.offer_id, htrd.base_offer_id, base_accounts.address as base_account, base_assets.asset_type as base_asset_type, base_assets.asset_code as base_asset_code, base_assets.asset_issuer as base_asset_issuer, htrd.base_amount, htrd.counter_offer_id, counter_accounts.address as counter_account, counter_assets.asset_type as counter_asset_type, counter_assets.asset_code as counter_asset_code, counter_assets.asset_issuer as counter_asset_issuer, htrd.counter_amount, htrd.base_is_seller, htrd.price_n, htrd.price_d FROM history_trades htrd JOIN history_accounts base_accounts ON base_account_id = base_accounts.id JOIN history_accounts counter_accounts ON counter_account_id = counter_accounts.id JOIN history_assets base_assets ON base_asset_id = base_assets.id JOIN history_assets counter_assets ON counter_asset_id = counter_assets.id WHERE htrd.counter_offer_id = ? AND (
htrd.history_operation_id >= ?
AND (
htrd.history_operation_id > ? OR
(htrd.history_operation_id = ? AND htrd.order > ?)
)) ORDER BY htrd.history_operation_id asc, htrd.order asc) ORDER BY history_operation_id asc, "order" asc LIMIT 100`
tt.Assert.Equal(expectedRawSQL, tradesQ.rawSQL)

err = tradesQ.Select(&trades)
tt.Assert.NoError(err)
tt.Assert.Len(trades, 2)

// Ensure "asc" order and offer present
tt.Assert.Equal(int64(81604382721), trades[0].HistoryOperationID)
tt.Assert.Equal(offerID, trades[0].OfferID)

tt.Assert.Equal(int64(85899350017), trades[1].HistoryOperationID)
tt.Assert.Equal(offerID, trades[1].OfferID)
}
23 changes: 23 additions & 0 deletions services/horizon/internal/db2/schema/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up

CREATE INDEX htrd_pair_pid ON history_trades USING BTREE(base_asset_id, counter_asset_id, history_operation_id, "order");

-- +migrate Down

DROP INDEX htrd_pair_pid;