Skip to content

Commit

Permalink
Merge pull request #3641 from paulbellamy/paulb/trade_aggregation_pre…
Browse files Browse the repository at this point in the history
…compute

services/horizon: Precompute 1m Trade Aggregation buckets
  • Loading branch information
Paul Bellamy authored Jul 6, 2021
2 parents ef7b236 + 8797896 commit d336609
Show file tree
Hide file tree
Showing 23 changed files with 703 additions and 71 deletions.
6 changes: 3 additions & 3 deletions go.list
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ github.com/stellar/go
cloud.google.com/go v0.34.0
firebase.google.com/go v3.12.0+incompatible
github.com/BurntSushi/toml v0.3.1
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/Masterminds/squirrel v1.5.0
github.com/Microsoft/go-winio v0.4.14
github.com/Shopify/sarama v1.19.0
github.com/Shopify/toxiproxy v2.1.4+incompatible
Expand Down Expand Up @@ -67,7 +67,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515
github.com/kr/pretty v0.0.0-20150520163514-e6ac2fc51e89
github.com/kr/text v0.0.0-20150520163712-e373e137fafd
github.com/lann/builder v0.0.0-20140829050551-c603884a2c1f
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0
github.com/lib/pq v1.2.0
github.com/magiconair/properties v1.5.4
github.com/manucorporat/sse v0.0.0-20160126180136-ee05b128a739
Expand All @@ -77,7 +78,6 @@ github.com/mattn/go-sqlite3 v1.9.0
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366
github.com/mndrix/ps v0.0.0-20131111202200-33ddf69629c1
github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223
github.com/onsi/ginkgo v1.7.0
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.15
require (
firebase.google.com/go v3.12.0+incompatible
github.com/BurntSushi/toml v0.3.1
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/Masterminds/squirrel v1.5.0
github.com/Microsoft/go-winio v0.4.14
github.com/adjust/goautoneg v0.0.0-20150426214442-d788f35a0315
github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f // indirect
Expand Down Expand Up @@ -37,14 +37,12 @@ require (
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 // indirect
github.com/kr/pretty v0.0.0-20150520163514-e6ac2fc51e89 // indirect
github.com/kr/text v0.0.0-20150520163712-e373e137fafd // indirect
github.com/lann/builder v0.0.0-20140829050551-c603884a2c1f // indirect
github.com/lib/pq v1.2.0
github.com/magiconair/properties v1.5.4 // indirect
github.com/manucorporat/sse v0.0.0-20160126180136-ee05b128a739
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366 // indirect
github.com/mndrix/ps v0.0.0-20131111202200-33ddf69629c1 // indirect
github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db // indirect
github.com/onsi/ginkgo v1.7.0
github.com/onsi/gomega v1.4.3
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ firebase.google.com/go v3.12.0+incompatible h1:q70KCp/J0oOL8kJ8oV2j3646kV4TB8Y5I
firebase.google.com/go v3.12.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5 h1:PPfYWScYacO3Q6JMCLkyh6Ea2Q/REDTMgmiTAeiV8Jg=
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5/go.mod h1:xnKTFzjGUiZtiOagBsfnvomW+nJg2usB1ZpordQWqNM=
github.com/Masterminds/squirrel v1.5.0 h1:JukIZisrUXadA9pl3rMkjhiamxiB0cXiu+HGp/Y8cY8=
github.com/Masterminds/squirrel v1.5.0/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down Expand Up @@ -118,8 +118,10 @@ github.com/kr/pretty v0.0.0-20150520163514-e6ac2fc51e89 h1:Smt4CPhAnATQEGlX/nyqG
github.com/kr/pretty v0.0.0-20150520163514-e6ac2fc51e89/go.mod h1:Bvhd+E3laJ0AVkG0c9rmtZcnhV0HQ3+c3YxxqTvc/gA=
github.com/kr/text v0.0.0-20150520163712-e373e137fafd h1:ohpc+F5FseC/PPrR1wz2WcZxOc4kplnJ39pGbFlj/eY=
github.com/kr/text v0.0.0-20150520163712-e373e137fafd/go.mod h1:sjUstKUATFIcff4qlB53Kml0wQPtJVc/3fWrmuUmcfA=
github.com/lann/builder v0.0.0-20140829050551-c603884a2c1f h1:GYBg1t6ujjhgyYsiO9i0qwbnUZzTiPVLCA/QUkD7ECQ=
github.com/lann/builder v0.0.0-20140829050551-c603884a2c1f/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand All @@ -139,8 +141,6 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366 h1:1ypTpKUfEOyX1YsJru6lLq7hrmK+QGECpJQ1PHUHuGo=
github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mndrix/ps v0.0.0-20131111202200-33ddf69629c1 h1:kCroTjOY+wyp+iHA2lZOV5aJ6WfBVjGnW8bCYmXmLPo=
github.com/mndrix/ps v0.0.0-20131111202200-33ddf69629c1/go.mod h1:dHgTaDInzkAqJv67VaX1IkK449M2UoBY68CZeI/bNCU=
github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db h1:eZgFHVkk9uOTaOQLC6tgjkzdp7Ays8eEVecBcfHZlJQ=
github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
Expand Down
4 changes: 4 additions & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
strtime "github.com/stellar/go/support/time"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -237,6 +238,8 @@ type IngestionQ interface {
QSigners
//QTrades
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32) error
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -838,6 +841,7 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
"history_operation_participants": "history_operation_id",
"history_operations": "id",
"history_trades": "history_operation_id",
"history_trades_60000": "open_ledger_toid",
"history_transaction_claimable_balances": "history_transaction_id",
"history_transaction_participants": "history_transaction_id",
"history_transactions": "id",
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (m *MockQTrades) NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchIns
return a.Get(0).(TradeBatchInsertBuilder)
}

func (m *MockQTrades) RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32) error {
a := m.Called(ctx, fromLedger, toLedger)
return a.Error(0)
}

type MockTradeBatchInsertBuilder struct {
mock.Mock
}
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,5 +273,6 @@ func getCanonicalAssetOrder(assetId1 int64, assetId2 int64) (orderPreserved bool
type QTrades interface {
QCreateAccountsHistory
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationBuckets(ctx context.Context, fromledger, toLedger uint32) error
CreateAssets(ctx context.Context, assets []xdr.Asset, maxBatchSize int) (map[string]Asset, error)
}
187 changes: 149 additions & 38 deletions services/horizon/internal/db2/history/trade_aggregation.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package history

import (
"context"
"fmt"
"time"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/toid"
"github.com/stellar/go/support/errors"
strtime "github.com/stellar/go/support/time"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -149,37 +152,23 @@ func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
bucketSQL = reverseBucketTrades(q.resolution, q.offset)
}

bucketSQL = bucketSQL.From("history_trades").
bucketSQL = bucketSQL.From("history_trades_60000").
Where(sq.Eq{"base_asset_id": q.baseAssetID, "counter_asset_id": q.counterAssetID})

//adjust time range and apply time filters
bucketSQL = bucketSQL.Where(sq.GtOrEq{"ledger_closed_at": q.startTime.ToTime()})
bucketSQL = bucketSQL.Where(sq.GtOrEq{"timestamp": q.startTime})
if !q.endTime.IsNil() {
bucketSQL = bucketSQL.Where(sq.Lt{"ledger_closed_at": q.endTime.ToTime()})
bucketSQL = bucketSQL.Where(sq.Lt{"timestamp": q.endTime})
}

//ensure open/close order for cases when multiple trades occur in the same ledger
bucketSQL = bucketSQL.OrderBy("history_operation_id ", "\"order\"")
if q.resolution != 60000 {
//ensure open/close order for cases when multiple trades occur in the same ledger
bucketSQL = bucketSQL.OrderBy("timestamp ASC", "open_ledger_toid ASC")
// Do on-the-fly aggregation for higher resolutions.
bucketSQL = aggregate(bucketSQL)
}

return sq.Select(
"timestamp",
"count(*) as count",
"sum(base_amount) as base_volume",
"sum(counter_amount) as counter_volume",
"sum(counter_amount)/sum(base_amount) as avg",
// We fetch N, D here directly because of lib/pq bug (stellar/go#3345).
// (Note: [1] is the first array element)
"(max_price(price))[1] as high_n",
"(max_price(price))[2] as high_d",
"(min_price(price))[1] as low_n",
"(min_price(price))[2] as low_d",
"(first(price))[1] as open_n",
"(first(price))[2] as open_d",
"(last(price))[1] as close_n",
"(last(price))[2] as close_d",
).
FromSelect(bucketSQL, "htrd").
GroupBy("timestamp").
return bucketSQL.
Limit(q.pagingParams.Limit).
OrderBy("timestamp " + q.pagingParams.Order)
}
Expand All @@ -188,22 +177,26 @@ func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
// and the offset. Given a time t, it gives it a timestamp defined by
// f(t) = ((t - offset)/resolution)*resolution + offset.
func formatBucketTimestampSelect(resolution int64, offset int64) string {
return fmt.Sprintf("div((cast((extract(epoch from ledger_closed_at) * 1000 ) as bigint) - %d), %d)*%d + %d as timestamp",
offset, resolution, resolution, offset)
return fmt.Sprintf("((timestamp - %d) / %d) * %d + %d as timestamp", offset, resolution, resolution, offset)
}

// bucketTrades generates a select statement to filter rows from the `history_trades` table in
// a compact form, with a timestamp rounded to resolution and reversed base/counter.
func bucketTrades(resolution int64, offset int64) sq.SelectBuilder {
return sq.Select(
formatBucketTimestampSelect(resolution, offset),
"history_operation_id",
"\"order\"",
"base_asset_id",
"base_amount",
"counter_asset_id",
"counter_amount",
"ARRAY[price_n, price_d] as price",
"count",
"base_volume",
"counter_volume",
"avg",
"high_n",
"high_d",
"low_n",
"low_d",
"open_n",
"open_d",
"close_n",
"close_d",
)
}

Expand All @@ -212,12 +205,130 @@ func bucketTrades(resolution int64, offset int64) sq.SelectBuilder {
func reverseBucketTrades(resolution int64, offset int64) sq.SelectBuilder {
return sq.Select(
formatBucketTimestampSelect(resolution, offset),
"count",
"base_volume as counter_volume",
"counter_volume as base_volume",
"(base_volume::numeric/counter_volume::numeric) as avg",
"low_n as high_d",
"low_d as high_n",
"high_n as low_d",
"high_d as low_n",
"open_n as open_d",
"open_d as open_n",
"close_n as close_d",
"close_d as close_n",
)
}

func aggregate(query sq.SelectBuilder) sq.SelectBuilder {
return sq.Select(
"timestamp",
"sum(\"count\") as count",
"sum(base_volume) as base_volume",
"sum(counter_volume) as counter_volume",
"sum(counter_volume::numeric)/sum(base_volume::numeric) as avg",
"(max_price(ARRAY[high_n, high_d]))[1] as high_n",
"(max_price(ARRAY[high_n, high_d]))[2] as high_d",
"(min_price(ARRAY[low_n, low_d]))[1] as low_n",
"(min_price(ARRAY[low_n, low_d]))[2] as low_d",
"(first(ARRAY[open_n, open_d]))[1] as open_n",
"(first(ARRAY[open_n, open_d]))[2] as open_d",
"(last(ARRAY[close_n, close_d]))[1] as close_n",
"(last(ARRAY[close_n, close_d]))[2] as close_d",
).FromSelect(query, "htrd").GroupBy("timestamp")
}

// RebuildTradeAggregationTimes rebuilds a specific set of trade aggregation
// buckets, (specified by start and end times) to ensure complete data in case
// of partial reingestion.
func (q Q) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis) error {
from = from.RoundDown(60_000)
to = to.RoundDown(60_000)
// Clear out the old bucket values.
_, err := q.Exec(ctx, sq.Delete("history_trades_60000").Where(
sq.GtOrEq{"timestamp": from},
).Where(
sq.LtOrEq{"timestamp": to},
))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}

// find all related trades
trades := sq.Select(
"to_millis(ledger_closed_at, 60000) as timestamp",
"history_operation_id",
"\"order\"",
"counter_asset_id as base_asset_id",
"counter_amount as base_amount",
"base_asset_id as counter_asset_id",
"base_amount as counter_amount",
"ARRAY[price_d, price_n] as price",
"base_asset_id",
"base_amount",
"counter_asset_id",
"counter_amount",
"ARRAY[price_n, price_d] as price",
).From("history_trades").Where(
sq.GtOrEq{"to_millis(ledger_closed_at, 60000)": from},
).Where(
sq.LtOrEq{"to_millis(ledger_closed_at, 60000)": to},
).OrderBy("base_asset_id", "counter_asset_id", "history_operation_id", "\"order\"")

// figure out the new bucket values
rebuilt := sq.Select(
"timestamp",
"base_asset_id",
"counter_asset_id",
"count(*) as count",
"sum(base_amount) as base_volume",
"sum(counter_amount) as counter_volume",
"sum(counter_amount::numeric)/sum(base_amount::numeric) as avg",
"(max_price(price))[1] as high_n",
"(max_price(price))[2] as high_d",
"(min_price(price))[1] as low_n",
"(min_price(price))[2] as low_d",
"first(history_operation_id) as open_ledger_toid",
"(first(price))[1] as open_n",
"(first(price))[2] as open_d",
"last(history_operation_id) as close_ledger_toid",
"(last(price))[1] as close_n",
"(last(price))[2] as close_d",
).FromSelect(trades, "trades").GroupBy("base_asset_id", "counter_asset_id", "timestamp")

// Insert the new bucket values.
_, err = q.Exec(ctx, sq.Insert("history_trades_60000").Select(rebuilt))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}
return nil
}

// RebuildTradeAggregationBuckets rebuilds a specific set of trade aggregation
// buckets, (specified by start and end ledger seq) to ensure complete data in
// case of partial reingestion.
func (q Q) RebuildTradeAggregationBuckets(ctx context.Context, fromSeq, toSeq uint32) error {
fromLedgerToid := toid.New(int32(fromSeq), 0, 0).ToInt64()
// toLedger should be inclusive here.
toLedgerToid := toid.New(int32(toSeq+1), 0, 0).ToInt64()

// Get the affected timestamp buckets
timestamps := sq.Select(
"to_millis(closed_at, 60000)",
).From("history_ledgers").Where(
sq.GtOrEq{"id": fromLedgerToid},
).Where(
sq.Lt{"id": toLedgerToid},
)

// Get first bucket timestamp in the ledger range
var from strtime.Millis
err := q.Get(ctx, &from, timestamps.OrderBy("id").Limit(1))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}

// Get last bucket timestamp in the ledger range
var to strtime.Millis
err = q.Get(ctx, &to, timestamps.OrderBy("id DESC").Limit(1))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}

return q.RebuildTradeAggregationTimes(ctx, from, to)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type TradeBatchInsertBuilder interface {
// tradeBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type tradeBatchInsertBuilder struct {
builder db.BatchInsertBuilder
q *Q
}

// NewTradeBatchInsertBuilder constructs a new TradeBatchInsertBuilder instance
Expand All @@ -44,6 +45,7 @@ func (q *Q) NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
Table: q.GetTable("history_trades"),
MaxBatchSize: maxBatchSize,
},
q: q,
}
}

Expand Down
Loading

0 comments on commit d336609

Please sign in to comment.