diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 5cf19852ec..78be859536 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -6,6 +6,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased - Querying claimable balances has been optimized ([4385](https://github.com/stellar/go/pull/4385)). +- Querying trade aggregations has been optimized ([4389](https://github.com/stellar/go/pull/4389)). ## V2.17.0 diff --git a/services/horizon/internal/db2/history/trade_aggregation.go b/services/horizon/internal/db2/history/trade_aggregation.go index c012b3ad0b..ac77cfaf40 100644 --- a/services/horizon/internal/db2/history/trade_aggregation.go +++ b/services/horizon/internal/db2/history/trade_aggregation.go @@ -45,6 +45,8 @@ type TradeAggregation struct { CloseD int64 `db:"close_d"` } +const HistoryTradesTableName = "history_trades_60000" + // TradeAggregationsQ is a helper struct to aid in configuring queries to // bucket and aggregate trades type TradeAggregationsQ struct { @@ -123,51 +125,100 @@ func (q *TradeAggregationsQ) WithEndTime(endTime strtime.Millis) (*TradeAggregat } } -// GetSql generates a sql statement to aggregate Trades based on given parameters -func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder { - var orderPreserved bool - orderPreserved, q.baseAssetID, q.counterAssetID = getCanonicalAssetOrder(q.baseAssetID, q.counterAssetID) - - var bucketSQL sq.SelectBuilder +func (q *TradeAggregationsQ) getRawTradesSql(orderPreserved bool) sq.SelectBuilder { + var rawTradesSQL sq.SelectBuilder if orderPreserved { - bucketSQL = bucketTrades(q.resolution, q.offset) + rawTradesSQL = bucketTrades(q.resolution, q.offset) } else { - bucketSQL = reverseBucketTrades(q.resolution, q.offset) + rawTradesSQL = reverseBucketTrades(q.resolution, q.offset) } - bucketSQL = bucketSQL.From("history_trades_60000"). + rawTradesSQL = rawTradesSQL. + Join("timestamp_range r ON 1=1"). + From(fmt.Sprintf("%s AS tr", HistoryTradesTableName)). Where(sq.Eq{"base_asset_id": q.baseAssetID, "counter_asset_id": q.counterAssetID}) //adjust time range and apply time filters - bucketSQL = bucketSQL.Where(sq.GtOrEq{"timestamp": q.startTime}) - if !q.endTime.IsNil() { - bucketSQL = bucketSQL.Where(sq.Lt{"timestamp": q.endTime}) - } + bucketTs := formatBucketTimestamp(q.resolution, q.offset, "tr") + rawTradesSQL = rawTradesSQL. + Where(fmt.Sprintf("r.max_ts >= %s", bucketTs)). + Where(fmt.Sprintf("r.min_ts <= %s", bucketTs)) if q.resolution != 60000 { //ensure open/close order for cases when multiple trades occur in the same ledger - bucketSQL = bucketSQL.OrderBy("timestamp ASC", "open_ledger_toid ASC") + rawTradesSQL = rawTradesSQL.OrderBy("timestamp ASC", "open_ledger_toid ASC") // Do on-the-fly aggregation for higher resolutions. - bucketSQL = aggregate(bucketSQL) } + return rawTradesSQL +} + +// GetSql generates a sql statement to aggregate Trades based on given parameters +func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder { + var orderPreserved bool + orderPreserved, q.baseAssetID, q.counterAssetID = getCanonicalAssetOrder(q.baseAssetID, q.counterAssetID) - return bucketSQL. + bucketSQL := aggregate("raw_trades"). Limit(q.pagingParams.Limit). - OrderBy("timestamp " + q.pagingParams.Order) + OrderBy("timestamp "+q.pagingParams.Order). + Prefix("WITH last_range_ts AS (?),", + lastRangeTs( + q.baseAssetID, q.counterAssetID, q.resolution, q.offset, q.startTime, q.endTime, + q.pagingParams.Order, q.pagingParams.Limit)). + Prefix("timestamp_range AS (?),", + timestampRange()). + Prefix("raw_trades AS (?)", + q.getRawTradesSql(orderPreserved)) + + return bucketSQL } -// formatBucketTimestampSelect formats a sql select clause for a bucketed timestamp, based on given resolution +// formatBucketTimestamp formats a sql select clause for a bucketed timestamp, based on given resolution // and the offset. Given a time t, it gives it a timestamp defined by // f(t) = ((t - offset)/resolution)*resolution + offset. -func formatBucketTimestampSelect(resolution int64, offset int64) string { - return fmt.Sprintf("((timestamp - %d) / %d) * %d + %d as timestamp", offset, resolution, resolution, offset) +func formatBucketTimestamp(resolution int64, offset int64, tsPrefix string) string { + prefix := "" + if len(tsPrefix) > 0 { + prefix = fmt.Sprintf("%s.", tsPrefix) + } + return fmt.Sprintf("((%stimestamp - %d) / %d) * %d + %d", prefix, offset, resolution, resolution, offset) +} + +func formatBucketTimestampSelect(resolution int64, offset int64, tsPrefix string) string { + return fmt.Sprintf("%s AS timestamp", formatBucketTimestamp(resolution, offset, tsPrefix)) +} + +func lastRangeTs(baseAssetID, counterAssetID, resolution, offset int64, startTime, endTime strtime.Millis, order string, limit uint64) sq.SelectBuilder { + s := sq.Select( + formatBucketTimestampSelect(resolution, offset, ""), + ).From( + HistoryTradesTableName, + ).Where( + sq.Eq{"base_asset_id": baseAssetID, "counter_asset_id": counterAssetID}, + ).Where(sq.GtOrEq{"timestamp": startTime}) + if !endTime.IsNil() { + s = s.Where(sq.Lt{"timestamp": endTime}) + } + return s.GroupBy( + formatBucketTimestamp(resolution, offset, ""), + ).OrderBy( + fmt.Sprintf("%s %s", formatBucketTimestamp(resolution, offset, ""), order), + ).Suffix( + fmt.Sprintf("FETCH FIRST %d ROWS ONLY", limit), + ) +} + +func timestampRange() sq.SelectBuilder { + return sq.Select( + "min(timestamp) as min_ts", + "max(timestamp) as max_ts", + ).From("last_range_ts") } // bucketTrades generates a select statement to filter rows from the `history_trades` table in // a compact form, with a timestamp rounded to resolution and reversed base/counter. func bucketTrades(resolution int64, offset int64) sq.SelectBuilder { return sq.Select( - formatBucketTimestampSelect(resolution, offset), + formatBucketTimestampSelect(resolution, offset, "tr"), "count", "base_volume", "counter_volume", @@ -187,7 +238,7 @@ func bucketTrades(resolution int64, offset int64) sq.SelectBuilder { // a compact form, with a timestamp rounded to resolution and reversed base/counter. func reverseBucketTrades(resolution int64, offset int64) sq.SelectBuilder { return sq.Select( - formatBucketTimestampSelect(resolution, offset), + formatBucketTimestampSelect(resolution, offset, "tr"), "count", "base_volume as counter_volume", "counter_volume as base_volume", @@ -203,7 +254,7 @@ func reverseBucketTrades(resolution int64, offset int64) sq.SelectBuilder { ) } -func aggregate(query sq.SelectBuilder) sq.SelectBuilder { +func aggregate(rawTradesTable string) sq.SelectBuilder { return sq.Select( "timestamp", "sum(\"count\") as count", @@ -218,7 +269,7 @@ func aggregate(query sq.SelectBuilder) sq.SelectBuilder { "(first(ARRAY[open_n, open_d]))[2] as open_d", "(last(ARRAY[close_n, close_d]))[1] as close_n", "(last(ARRAY[close_n, close_d]))[2] as close_d", - ).FromSelect(query, "htrd").GroupBy("timestamp") + ).From(rawTradesTable).GroupBy("timestamp") } // RebuildTradeAggregationTimes rebuilds a specific set of trade aggregation @@ -228,7 +279,7 @@ func (q Q) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Mi from = from.RoundDown(60_000) to = to.RoundDown(60_000) // Clear out the old bucket values. - _, err := q.Exec(ctx, sq.Delete("history_trades_60000").Where( + _, err := q.Exec(ctx, sq.Delete(HistoryTradesTableName).Where( sq.GtOrEq{"timestamp": from}, ).Where( sq.LtOrEq{"timestamp": to}, @@ -278,7 +329,7 @@ func (q Q) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Mi ).FromSelect(trades, "trades").GroupBy("base_asset_id", "counter_asset_id", "timestamp") // Insert the new bucket values. - _, err = q.Exec(ctx, sq.Insert("history_trades_60000").Select(rebuilt)) + _, err = q.Exec(ctx, sq.Insert(HistoryTradesTableName).Select(rebuilt)) if err != nil { return errors.Wrap(err, "could not rebuild trade aggregation bucket") }