Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Refactor trade aggregation query. #4389

Merged
merged 1 commit into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
## Unreleased

- Querying claimable balances has been optimized ([4385](https://github.com/stellar/go/pull/4385)).
- Querying trade aggregations has been optimized ([4389](https://github.com/stellar/go/pull/4389)).

## V2.17.0

Expand Down
103 changes: 77 additions & 26 deletions services/horizon/internal/db2/history/trade_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type TradeAggregation struct {
CloseD int64 `db:"close_d"`
}

const HistoryTradesTableName = "history_trades_60000"

// TradeAggregationsQ is a helper struct to aid in configuring queries to
// bucket and aggregate trades
type TradeAggregationsQ struct {
Expand Down Expand Up @@ -123,51 +125,100 @@ func (q *TradeAggregationsQ) WithEndTime(endTime strtime.Millis) (*TradeAggregat
}
}

// GetSql generates a sql statement to aggregate Trades based on given parameters
func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
var orderPreserved bool
orderPreserved, q.baseAssetID, q.counterAssetID = getCanonicalAssetOrder(q.baseAssetID, q.counterAssetID)

var bucketSQL sq.SelectBuilder
func (q *TradeAggregationsQ) getRawTradesSql(orderPreserved bool) sq.SelectBuilder {
var rawTradesSQL sq.SelectBuilder
if orderPreserved {
bucketSQL = bucketTrades(q.resolution, q.offset)
rawTradesSQL = bucketTrades(q.resolution, q.offset)
} else {
bucketSQL = reverseBucketTrades(q.resolution, q.offset)
rawTradesSQL = reverseBucketTrades(q.resolution, q.offset)
}

bucketSQL = bucketSQL.From("history_trades_60000").
rawTradesSQL = rawTradesSQL.
Join("timestamp_range r ON 1=1").
From(fmt.Sprintf("%s AS tr", HistoryTradesTableName)).
Where(sq.Eq{"base_asset_id": q.baseAssetID, "counter_asset_id": q.counterAssetID})

//adjust time range and apply time filters
bucketSQL = bucketSQL.Where(sq.GtOrEq{"timestamp": q.startTime})
if !q.endTime.IsNil() {
bucketSQL = bucketSQL.Where(sq.Lt{"timestamp": q.endTime})
}
bucketTs := formatBucketTimestamp(q.resolution, q.offset, "tr")
rawTradesSQL = rawTradesSQL.
Where(fmt.Sprintf("r.max_ts >= %s", bucketTs)).
Where(fmt.Sprintf("r.min_ts <= %s", bucketTs))

if q.resolution != 60000 {
//ensure open/close order for cases when multiple trades occur in the same ledger
bucketSQL = bucketSQL.OrderBy("timestamp ASC", "open_ledger_toid ASC")
rawTradesSQL = rawTradesSQL.OrderBy("timestamp ASC", "open_ledger_toid ASC")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this timestamp be qualified? (ie tr.timestamp). Or does the open_ledger_toid preserve original raw timestamp order enough to guarantee that the ordering would not be incorrect ever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paulbellamy do you have any insight here?

// Do on-the-fly aggregation for higher resolutions.
bucketSQL = aggregate(bucketSQL)
}
return rawTradesSQL
}

// GetSql generates a sql statement to aggregate Trades based on given parameters
func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
var orderPreserved bool
orderPreserved, q.baseAssetID, q.counterAssetID = getCanonicalAssetOrder(q.baseAssetID, q.counterAssetID)

return bucketSQL.
bucketSQL := aggregate("raw_trades").
Limit(q.pagingParams.Limit).
OrderBy("timestamp " + q.pagingParams.Order)
OrderBy("timestamp "+q.pagingParams.Order).
Prefix("WITH last_range_ts AS (?),",
lastRangeTs(
q.baseAssetID, q.counterAssetID, q.resolution, q.offset, q.startTime, q.endTime,
q.pagingParams.Order, q.pagingParams.Limit)).
Prefix("timestamp_range AS (?),",
timestampRange()).
Prefix("raw_trades AS (?)",
q.getRawTradesSql(orderPreserved))

return bucketSQL
}

// formatBucketTimestampSelect formats a sql select clause for a bucketed timestamp, based on given resolution
// formatBucketTimestamp formats a sql select clause for a bucketed timestamp, based on given resolution
// and the offset. Given a time t, it gives it a timestamp defined by
// f(t) = ((t - offset)/resolution)*resolution + offset.
func formatBucketTimestampSelect(resolution int64, offset int64) string {
return fmt.Sprintf("((timestamp - %d) / %d) * %d + %d as timestamp", offset, resolution, resolution, offset)
func formatBucketTimestamp(resolution int64, offset int64, tsPrefix string) string {
prefix := ""
if len(tsPrefix) > 0 {
prefix = fmt.Sprintf("%s.", tsPrefix)
}
return fmt.Sprintf("((%stimestamp - %d) / %d) * %d + %d", prefix, offset, resolution, resolution, offset)
}

func formatBucketTimestampSelect(resolution int64, offset int64, tsPrefix string) string {
return fmt.Sprintf("%s AS timestamp", formatBucketTimestamp(resolution, offset, tsPrefix))
}

func lastRangeTs(baseAssetID, counterAssetID, resolution, offset int64, startTime, endTime strtime.Millis, order string, limit uint64) sq.SelectBuilder {
s := sq.Select(
formatBucketTimestampSelect(resolution, offset, ""),
).From(
HistoryTradesTableName,
).Where(
sq.Eq{"base_asset_id": baseAssetID, "counter_asset_id": counterAssetID},
).Where(sq.GtOrEq{"timestamp": startTime})
if !endTime.IsNil() {
s = s.Where(sq.Lt{"timestamp": endTime})
}
return s.GroupBy(
formatBucketTimestamp(resolution, offset, ""),
).OrderBy(
fmt.Sprintf("%s %s", formatBucketTimestamp(resolution, offset, ""), order),
).Suffix(
fmt.Sprintf("FETCH FIRST %d ROWS ONLY", limit),
)
}

func timestampRange() sq.SelectBuilder {
return sq.Select(
"min(timestamp) as min_ts",
"max(timestamp) as max_ts",
).From("last_range_ts")
}

// bucketTrades generates a select statement to filter rows from the `history_trades` table in
// a compact form, with a timestamp rounded to resolution and reversed base/counter.
func bucketTrades(resolution int64, offset int64) sq.SelectBuilder {
return sq.Select(
formatBucketTimestampSelect(resolution, offset),
formatBucketTimestampSelect(resolution, offset, "tr"),
"count",
"base_volume",
"counter_volume",
Expand All @@ -187,7 +238,7 @@ func bucketTrades(resolution int64, offset int64) sq.SelectBuilder {
// a compact form, with a timestamp rounded to resolution and reversed base/counter.
func reverseBucketTrades(resolution int64, offset int64) sq.SelectBuilder {
return sq.Select(
formatBucketTimestampSelect(resolution, offset),
formatBucketTimestampSelect(resolution, offset, "tr"),
"count",
"base_volume as counter_volume",
"counter_volume as base_volume",
Expand All @@ -203,7 +254,7 @@ func reverseBucketTrades(resolution int64, offset int64) sq.SelectBuilder {
)
}

func aggregate(query sq.SelectBuilder) sq.SelectBuilder {
func aggregate(rawTradesTable string) sq.SelectBuilder {
return sq.Select(
"timestamp",
"sum(\"count\") as count",
Expand All @@ -218,7 +269,7 @@ func aggregate(query sq.SelectBuilder) sq.SelectBuilder {
"(first(ARRAY[open_n, open_d]))[2] as open_d",
"(last(ARRAY[close_n, close_d]))[1] as close_n",
"(last(ARRAY[close_n, close_d]))[2] as close_d",
).FromSelect(query, "htrd").GroupBy("timestamp")
).From(rawTradesTable).GroupBy("timestamp")
}

// RebuildTradeAggregationTimes rebuilds a specific set of trade aggregation
Expand All @@ -228,7 +279,7 @@ func (q Q) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Mi
from = from.RoundDown(60_000)
to = to.RoundDown(60_000)
// Clear out the old bucket values.
_, err := q.Exec(ctx, sq.Delete("history_trades_60000").Where(
_, err := q.Exec(ctx, sq.Delete(HistoryTradesTableName).Where(
sq.GtOrEq{"timestamp": from},
).Where(
sq.LtOrEq{"timestamp": to},
Expand Down Expand Up @@ -278,7 +329,7 @@ func (q Q) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Mi
).FromSelect(trades, "trades").GroupBy("base_asset_id", "counter_asset_id", "timestamp")

// Insert the new bucket values.
_, err = q.Exec(ctx, sq.Insert("history_trades_60000").Select(rebuilt))
_, err = q.Exec(ctx, sq.Insert(HistoryTradesTableName).Select(rebuilt))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}
Expand Down