Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Precompute 1m Trade Aggregation buckets #3641

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
5b34a63
Add history_trades_60000 table for precomputed aggregations
May 28, 2021
8cd9b94
WIP getting trade aggregations endpoint using new _60000 table
May 28, 2021
1e56741
More comments on the migration
May 28, 2021
eadab59
./gogenerate.sh
May 28, 2021
c1fe1e5
Trying a formatting thing to fix unterminated function in postgres
May 28, 2021
464f363
Fix high/low calculation in trade aggs
Jun 10, 2021
0c002ac
Fixing some migration syntax issues
Jun 10, 2021
bd57c98
add rebuild sql function to rebuild given buckets
Jun 11, 2021
d81c1fb
rework the precomputation to avoid jsonb
Jun 11, 2021
43d5270
Rebuild first/last ledger buckets after reingestion
Jun 16, 2021
72c7e1d
Working on tests for trade aggregation triggers
Jun 16, 2021
e01919f
Fixing migration for postgres 9.6
Jun 16, 2021
b04b162
Fixing query to match expected fields
Jun 16, 2021
980edef
WIP tests
Jun 16, 2021
03daa32
Upgrade github.com/Masterminds/squirrel to v1.5.0
Jun 16, 2021
e5c3966
Try rebuilding buckets post-insert from go instead of a db trigger.
Jun 16, 2021
a8d1516
Fix rogue comma
Jun 17, 2021
cba804f
Need to quote order field
Jun 17, 2021
b68acfe
Remove defunct comment
Jun 17, 2021
5ad7102
Add index to make bucket rebuilds FAST
Jun 17, 2021
6bf9090
Getting integration tests passing
Jun 18, 2021
7be0f01
./gogenerate.sh
Jun 18, 2021
c49626d
update base and kahuna sql
Jun 18, 2021
ee3f5fd
Fix ordering in migration
Jun 22, 2021
3f82b68
Add migration to kahuna scenario:
Jun 22, 2021
e2c1a2b
update time estimate for backfilling
Jun 23, 2021
41ef524
Rebuild trade aggregations once per reingest
Jun 24, 2021
301cc29
WIP tests
Jun 24, 2021
071a50a
Make toLedger inclusive
Jun 24, 2021
e94b732
./gogenerate.sh
Jun 24, 2021
d1c2c97
fix govet
Jun 24, 2021
54d95c1
WIP tests
Jun 24, 2021
e75189e
WIP tests
Jun 24, 2021
6772e00
WIP tests
Jun 25, 2021
940a147
Fix go vet nonsense
Jun 25, 2021
6e35397
Need to use total-order-id for trade agg rebuilds
Jun 25, 2021
1cd9401
Fixing reverse bucket trades high/low/avg
Jun 28, 2021
4f55c13
Merge branch 'master' into paulb/trade_aggregation_precompute
Jun 28, 2021
25ac86c
Add an index on open_ledger so DeleteRangeAll is quicker
Jun 29, 2021
cfbbe32
Update services/horizon/internal/db2/history/trade_aggregation.go
paulbellamy Jul 2, 2021
1fa749e
Update services/horizon/internal/db2/history/trade_aggregation.go
paulbellamy Jul 2, 2021
01488e2
Feedback renames
Jul 2, 2021
07802dc
Add metric on ledger_ingestion_trade_aggregation_duration_seconds
Jul 2, 2021
41cf9fe
Fix a couple bugs in trade aggregation bucket rebuilding
Jul 2, 2021
401e413
There might not be any trades in the new ledgers
Jul 2, 2021
38492ad
Use the ledgers to get the start/end timestamps to rebuild
Jul 5, 2021
1aa55b8
Fix open/close ledger bug when rebuilding buckets for multiple assets
Jul 5, 2021
778f90a
Add RebuildTradeAggregationTimes to make testing easier
Jul 5, 2021
8797896
Merge branch 'master' into paulb/trade_aggregation_precompute
Jul 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.list
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ github.com/stellar/go
cloud.google.com/go v0.34.0
firebase.google.com/go v3.12.0+incompatible
github.com/BurntSushi/toml v0.3.1
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/Masterminds/squirrel v1.5.0
paulbellamy marked this conversation as resolved.
Show resolved Hide resolved
github.com/Microsoft/go-winio v0.4.14
github.com/Shopify/sarama v1.19.0
github.com/Shopify/toxiproxy v2.1.4+incompatible
Expand Down Expand Up @@ -67,7 +67,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515
github.com/kr/pretty v0.0.0-20150520163514-e6ac2fc51e89
github.com/kr/text v0.0.0-20150520163712-e373e137fafd
github.com/lann/builder v0.0.0-20140829050551-c603884a2c1f
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0
github.com/lib/pq v1.2.0
github.com/magiconair/properties v1.5.4
github.com/manucorporat/sse v0.0.0-20160126180136-ee05b128a739
Expand All @@ -77,7 +78,6 @@ github.com/mattn/go-sqlite3 v1.9.0
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366
github.com/mndrix/ps v0.0.0-20131111202200-33ddf69629c1
github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223
github.com/onsi/ginkgo v1.7.0
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.15
require (
firebase.google.com/go v3.12.0+incompatible
github.com/BurntSushi/toml v0.3.1
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/Masterminds/squirrel v1.5.0
github.com/Microsoft/go-winio v0.4.14
github.com/adjust/goautoneg v0.0.0-20150426214442-d788f35a0315
github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f // indirect
Expand Down Expand Up @@ -37,14 +37,12 @@ require (
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 // indirect
github.com/kr/pretty v0.0.0-20150520163514-e6ac2fc51e89 // indirect
github.com/kr/text v0.0.0-20150520163712-e373e137fafd // indirect
github.com/lann/builder v0.0.0-20140829050551-c603884a2c1f // indirect
github.com/lib/pq v1.2.0
github.com/magiconair/properties v1.5.4 // indirect
github.com/manucorporat/sse v0.0.0-20160126180136-ee05b128a739
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366 // indirect
github.com/mndrix/ps v0.0.0-20131111202200-33ddf69629c1 // indirect
github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db // indirect
github.com/onsi/ginkgo v1.7.0
github.com/onsi/gomega v1.4.3
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ firebase.google.com/go v3.12.0+incompatible h1:q70KCp/J0oOL8kJ8oV2j3646kV4TB8Y5I
firebase.google.com/go v3.12.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5 h1:PPfYWScYacO3Q6JMCLkyh6Ea2Q/REDTMgmiTAeiV8Jg=
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5/go.mod h1:xnKTFzjGUiZtiOagBsfnvomW+nJg2usB1ZpordQWqNM=
github.com/Masterminds/squirrel v1.5.0 h1:JukIZisrUXadA9pl3rMkjhiamxiB0cXiu+HGp/Y8cY8=
github.com/Masterminds/squirrel v1.5.0/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down Expand Up @@ -118,8 +118,10 @@ github.com/kr/pretty v0.0.0-20150520163514-e6ac2fc51e89 h1:Smt4CPhAnATQEGlX/nyqG
github.com/kr/pretty v0.0.0-20150520163514-e6ac2fc51e89/go.mod h1:Bvhd+E3laJ0AVkG0c9rmtZcnhV0HQ3+c3YxxqTvc/gA=
github.com/kr/text v0.0.0-20150520163712-e373e137fafd h1:ohpc+F5FseC/PPrR1wz2WcZxOc4kplnJ39pGbFlj/eY=
github.com/kr/text v0.0.0-20150520163712-e373e137fafd/go.mod h1:sjUstKUATFIcff4qlB53Kml0wQPtJVc/3fWrmuUmcfA=
github.com/lann/builder v0.0.0-20140829050551-c603884a2c1f h1:GYBg1t6ujjhgyYsiO9i0qwbnUZzTiPVLCA/QUkD7ECQ=
github.com/lann/builder v0.0.0-20140829050551-c603884a2c1f/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand All @@ -139,8 +141,6 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366 h1:1ypTpKUfEOyX1YsJru6lLq7hrmK+QGECpJQ1PHUHuGo=
github.com/mitchellh/mapstructure v0.0.0-20150613213606-2caf8efc9366/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mndrix/ps v0.0.0-20131111202200-33ddf69629c1 h1:kCroTjOY+wyp+iHA2lZOV5aJ6WfBVjGnW8bCYmXmLPo=
github.com/mndrix/ps v0.0.0-20131111202200-33ddf69629c1/go.mod h1:dHgTaDInzkAqJv67VaX1IkK449M2UoBY68CZeI/bNCU=
github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db h1:eZgFHVkk9uOTaOQLC6tgjkzdp7Ays8eEVecBcfHZlJQ=
github.com/moul/http2curl v0.0.0-20161031194548-4e24498b31db/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
Expand Down
4 changes: 4 additions & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
strtime "github.com/stellar/go/support/time"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -237,6 +238,8 @@ type IngestionQ interface {
QSigners
//QTrades
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32) error
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -838,6 +841,7 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
"history_operation_participants": "history_operation_id",
"history_operations": "id",
"history_trades": "history_operation_id",
"history_trades_60000": "open_ledger_toid",
"history_transaction_claimable_balances": "history_transaction_id",
"history_transaction_participants": "history_transaction_id",
"history_transactions": "id",
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/mock_q_trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (m *MockQTrades) NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchIns
return a.Get(0).(TradeBatchInsertBuilder)
}

func (m *MockQTrades) RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32) error {
a := m.Called(ctx, fromLedger, toLedger)
return a.Error(0)
}

type MockTradeBatchInsertBuilder struct {
mock.Mock
}
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,5 +273,6 @@ func getCanonicalAssetOrder(assetId1 int64, assetId2 int64) (orderPreserved bool
type QTrades interface {
QCreateAccountsHistory
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationBuckets(ctx context.Context, fromledger, toLedger uint32) error
CreateAssets(ctx context.Context, assets []xdr.Asset, maxBatchSize int) (map[string]Asset, error)
}
187 changes: 149 additions & 38 deletions services/horizon/internal/db2/history/trade_aggregation.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package history

import (
"context"
"fmt"
"time"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/toid"
"github.com/stellar/go/support/errors"
strtime "github.com/stellar/go/support/time"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -149,37 +152,23 @@ func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
bucketSQL = reverseBucketTrades(q.resolution, q.offset)
}

bucketSQL = bucketSQL.From("history_trades").
bucketSQL = bucketSQL.From("history_trades_60000").
Where(sq.Eq{"base_asset_id": q.baseAssetID, "counter_asset_id": q.counterAssetID})

//adjust time range and apply time filters
bucketSQL = bucketSQL.Where(sq.GtOrEq{"ledger_closed_at": q.startTime.ToTime()})
bucketSQL = bucketSQL.Where(sq.GtOrEq{"timestamp": q.startTime})
if !q.endTime.IsNil() {
bucketSQL = bucketSQL.Where(sq.Lt{"ledger_closed_at": q.endTime.ToTime()})
bucketSQL = bucketSQL.Where(sq.Lt{"timestamp": q.endTime})
}

//ensure open/close order for cases when multiple trades occur in the same ledger
bucketSQL = bucketSQL.OrderBy("history_operation_id ", "\"order\"")
if q.resolution != 60000 {
//ensure open/close order for cases when multiple trades occur in the same ledger
bucketSQL = bucketSQL.OrderBy("timestamp ASC", "open_ledger_toid ASC")
// Do on-the-fly aggregation for higher resolutions.
bucketSQL = aggregate(bucketSQL)
}

return sq.Select(
"timestamp",
"count(*) as count",
"sum(base_amount) as base_volume",
"sum(counter_amount) as counter_volume",
"sum(counter_amount)/sum(base_amount) as avg",
// We fetch N, D here directly because of lib/pq bug (stellar/go#3345).
// (Note: [1] is the first array element)
"(max_price(price))[1] as high_n",
"(max_price(price))[2] as high_d",
"(min_price(price))[1] as low_n",
"(min_price(price))[2] as low_d",
"(first(price))[1] as open_n",
"(first(price))[2] as open_d",
"(last(price))[1] as close_n",
"(last(price))[2] as close_d",
).
FromSelect(bucketSQL, "htrd").
GroupBy("timestamp").
return bucketSQL.
Limit(q.pagingParams.Limit).
OrderBy("timestamp " + q.pagingParams.Order)
}
Expand All @@ -188,22 +177,26 @@ func (q *TradeAggregationsQ) GetSql() sq.SelectBuilder {
// and the offset. Given a time t, it gives it a timestamp defined by
// f(t) = ((t - offset)/resolution)*resolution + offset.
func formatBucketTimestampSelect(resolution int64, offset int64) string {
return fmt.Sprintf("div((cast((extract(epoch from ledger_closed_at) * 1000 ) as bigint) - %d), %d)*%d + %d as timestamp",
offset, resolution, resolution, offset)
return fmt.Sprintf("((timestamp - %d) / %d) * %d + %d as timestamp", offset, resolution, resolution, offset)
}

// bucketTrades generates a select statement to filter rows from the `history_trades` table in
// a compact form, with a timestamp rounded to resolution and reversed base/counter.
func bucketTrades(resolution int64, offset int64) sq.SelectBuilder {
return sq.Select(
formatBucketTimestampSelect(resolution, offset),
"history_operation_id",
"\"order\"",
"base_asset_id",
"base_amount",
"counter_asset_id",
"counter_amount",
"ARRAY[price_n, price_d] as price",
"count",
"base_volume",
"counter_volume",
"avg",
"high_n",
"high_d",
"low_n",
"low_d",
"open_n",
"open_d",
"close_n",
"close_d",
)
}

Expand All @@ -212,12 +205,130 @@ func bucketTrades(resolution int64, offset int64) sq.SelectBuilder {
func reverseBucketTrades(resolution int64, offset int64) sq.SelectBuilder {
return sq.Select(
formatBucketTimestampSelect(resolution, offset),
"count",
"base_volume as counter_volume",
"counter_volume as base_volume",
"(base_volume::numeric/counter_volume::numeric) as avg",
"low_n as high_d",
"low_d as high_n",
"high_n as low_d",
"high_d as low_n",
"open_n as open_d",
"open_d as open_n",
"close_n as close_d",
"close_d as close_n",
)
}

func aggregate(query sq.SelectBuilder) sq.SelectBuilder {
return sq.Select(
"timestamp",
"sum(\"count\") as count",
"sum(base_volume) as base_volume",
"sum(counter_volume) as counter_volume",
"sum(counter_volume::numeric)/sum(base_volume::numeric) as avg",
"(max_price(ARRAY[high_n, high_d]))[1] as high_n",
"(max_price(ARRAY[high_n, high_d]))[2] as high_d",
"(min_price(ARRAY[low_n, low_d]))[1] as low_n",
"(min_price(ARRAY[low_n, low_d]))[2] as low_d",
"(first(ARRAY[open_n, open_d]))[1] as open_n",
"(first(ARRAY[open_n, open_d]))[2] as open_d",
"(last(ARRAY[close_n, close_d]))[1] as close_n",
"(last(ARRAY[close_n, close_d]))[2] as close_d",
).FromSelect(query, "htrd").GroupBy("timestamp")
}

// RebuildTradeAggregationTimes rebuilds a specific set of trade aggregation
// buckets, (specified by start and end times) to ensure complete data in case
// of partial reingestion.
func (q Q) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis) error {
from = from.RoundDown(60_000)
to = to.RoundDown(60_000)
// Clear out the old bucket values.
_, err := q.Exec(ctx, sq.Delete("history_trades_60000").Where(
sq.GtOrEq{"timestamp": from},
).Where(
sq.LtOrEq{"timestamp": to},
))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}

// find all related trades
trades := sq.Select(
"to_millis(ledger_closed_at, 60000) as timestamp",
"history_operation_id",
"\"order\"",
"counter_asset_id as base_asset_id",
"counter_amount as base_amount",
"base_asset_id as counter_asset_id",
"base_amount as counter_amount",
"ARRAY[price_d, price_n] as price",
"base_asset_id",
"base_amount",
"counter_asset_id",
"counter_amount",
"ARRAY[price_n, price_d] as price",
).From("history_trades").Where(
sq.GtOrEq{"to_millis(ledger_closed_at, 60000)": from},
).Where(
sq.LtOrEq{"to_millis(ledger_closed_at, 60000)": to},
).OrderBy("base_asset_id", "counter_asset_id", "history_operation_id", "\"order\"")

// figure out the new bucket values
rebuilt := sq.Select(
"timestamp",
"base_asset_id",
"counter_asset_id",
"count(*) as count",
"sum(base_amount) as base_volume",
"sum(counter_amount) as counter_volume",
"sum(counter_amount::numeric)/sum(base_amount::numeric) as avg",
"(max_price(price))[1] as high_n",
"(max_price(price))[2] as high_d",
"(min_price(price))[1] as low_n",
"(min_price(price))[2] as low_d",
"first(history_operation_id) as open_ledger_toid",
"(first(price))[1] as open_n",
"(first(price))[2] as open_d",
"last(history_operation_id) as close_ledger_toid",
"(last(price))[1] as close_n",
"(last(price))[2] as close_d",
).FromSelect(trades, "trades").GroupBy("base_asset_id", "counter_asset_id", "timestamp")

// Insert the new bucket values.
_, err = q.Exec(ctx, sq.Insert("history_trades_60000").Select(rebuilt))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}
return nil
}

// RebuildTradeAggregationBuckets rebuilds a specific set of trade aggregation
// buckets, (specified by start and end ledger seq) to ensure complete data in
// case of partial reingestion.
func (q Q) RebuildTradeAggregationBuckets(ctx context.Context, fromSeq, toSeq uint32) error {
fromLedgerToid := toid.New(int32(fromSeq), 0, 0).ToInt64()
// toLedger should be inclusive here.
toLedgerToid := toid.New(int32(toSeq+1), 0, 0).ToInt64()

// Get the affected timestamp buckets
timestamps := sq.Select(
"to_millis(closed_at, 60000)",
).From("history_ledgers").Where(
sq.GtOrEq{"id": fromLedgerToid},
).Where(
sq.Lt{"id": toLedgerToid},
)

// Get first bucket timestamp in the ledger range
var from strtime.Millis
err := q.Get(ctx, &from, timestamps.OrderBy("id").Limit(1))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}

// Get last bucket timestamp in the ledger range
var to strtime.Millis
err = q.Get(ctx, &to, timestamps.OrderBy("id DESC").Limit(1))
if err != nil {
return errors.Wrap(err, "could not rebuild trade aggregation bucket")
}

return q.RebuildTradeAggregationTimes(ctx, from, to)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type TradeBatchInsertBuilder interface {
// tradeBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type tradeBatchInsertBuilder struct {
builder db.BatchInsertBuilder
q *Q
}

// NewTradeBatchInsertBuilder constructs a new TradeBatchInsertBuilder instance
Expand All @@ -44,6 +45,7 @@ func (q *Q) NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
Table: q.GetTable("history_trades"),
MaxBatchSize: maxBatchSize,
},
q: q,
}
}

Expand Down
Loading