Skip to content

Commit

Permalink
services/horizon: Fix ReapLookupTables query (#4525)
Browse files Browse the repository at this point in the history
This commit fixes the query added in ee063a7. The previous query was using
`limit ... offset ...` which becomes slow for larger `offset` values. This is
happening because (from docs[1]):

> The rows skipped by an OFFSET clause still have to be computed inside the
> server; therefore a large OFFSET might be inefficient.

I rewrote the query to use `id` field in tables in `where id > ...`. Before each
run, the new `id` offset is fetched from the table that is later returned to be
used in the next cycle.

I also realized that rewriting `count(*)` in subqueries to `1` with `limit 1`
further improves the query performance. This is because instead of counting all
rows in parent tables it just check if there are _any_ rows in tables (what we
need to determine if the row is orphaned or not). This allows running the reaper
also for `history_accounts` and `history_assets` tables.

Finally this commit adds logging and enable reaper only when
`--history-retention-count` is set.

[1] https://www.postgresql.org/docs/current/queries-limit.html
  • Loading branch information
bartekn authored Aug 11, 2022
1 parent 64b754a commit 69225cf
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 46 deletions.
86 changes: 60 additions & 26 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error)
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -839,18 +839,46 @@ type tableObjectFieldPair struct {
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) {
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
map[string]int64, // deleted rows count
map[string]int64, // new offsets
error,
) {
if q.GetTx() == nil {
return nil, errors.New("cannot be called outside of an ingestion transaction")
return nil, nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 10000
const batchSize = 1000

deletedCount := make(map[string]int64)

if offsets == nil {
offsets = make(map[string]int64)
}

for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
"history_claimable_balances": {
{
name: "history_operation_claimable_balances",
Expand All @@ -874,31 +902,37 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[
} {
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, errors.Wrap(err, "error constructing a query")
return nil, nil, errors.Wrap(err, "error constructing a query")
}

// Find new offset before removing the rows
var newOffset int64
err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
if err != nil {
if q.NoRows(err) {
newOffset = 0
} else {
return nil, nil, err
}
}

_, err = q.ExecRaw(
res, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType),
query,
)
if err != nil {
return nil, errors.Wrapf(err, "error running query: %s", query)
return nil, nil, errors.Wrapf(err, "error running query: %s", query)
}

offsets[table] += batchSize

// Check if offset exceeds table size and then reset it
var count int64
err = q.GetRaw(ctx, &count, fmt.Sprintf("SELECT COUNT(*) FROM %s", table))
rows, err := res.RowsAffected()
if err != nil {
return nil, err
return nil, nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
}

if offsets[table] > count {
offsets[table] = 0
}
deletedCount[table] = rows
offsets[table] = newOffset
}
return offsets, nil
return deletedCount, offsets, nil
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
Expand All @@ -908,13 +942,13 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[
//
// (select id from
// (select id,
// (select count(*) from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id) as c1,
// (select count(*) from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id) as c2,
// (select 1 from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c1,
// (select 1 from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c2,
// 1 as cx,
// from history_claimable_balances hcb order by id limit 100 offset 1000)
// as sub where c1 = 0 and c2 = 0 and 1=1);
// from history_claimable_balances hcb where id > 1000 order by id limit 100)
// as sub where c1 IS NULL and c2 IS NULL and 1=1);
//
// In short it checks the 100 rows omiting 1000 row of history_claimable_balances
// and counts occurences of each row in corresponding history tables.
Expand All @@ -937,7 +971,7 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie
for i, historyTable := range historyTables {
_, err = fmt.Fprintf(
&sb,
`(select count(*) from %s where %s = hcb.id) as c%d, `,
`(select 1 from %s where %s = hcb.id limit 1) as c%d, `,
historyTable.name,
historyTable.objectField,
i,
Expand All @@ -947,13 +981,13 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie
}
}

_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb order by id limit %d offset %d) as sub where ", table, batchSize, offset)
_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize)
if err != nil {
return "", err
}

for i := range historyTables {
_, err = fmt.Fprintf(&sb, "c%d = 0 and ", i)
_, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i)
if err != nil {
return "", err
}
Expand Down
14 changes: 7 additions & 7 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func TestConstructReapLookupTablesQuery(t *testing.T) {
assert.Equal(t,
"delete from history_accounts where id IN "+
"(select id from "+
"(select id, (select count(*) from history_effects where history_account_id = hcb.id) as c0, "+
"(select count(*) from history_operation_participants where history_account_id = hcb.id) as c1, "+
"(select count(*) from history_trades where base_account_id = hcb.id) as c2, "+
"(select count(*) from history_trades where counter_account_id = hcb.id) as c3, "+
"(select count(*) from history_transaction_participants where history_account_id = hcb.id) as c4, "+
"1 as cx from history_accounts hcb order by id limit 10 offset 0) as sub "+
"where c0 = 0 and c1 = 0 and c2 = 0 and c3 = 0 and c4 = 0 and 1=1);", query)
"(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+
"(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+
"(select 1 from history_trades where base_account_id = hcb.id limit 1) as c2, "+
"(select 1 from history_trades where counter_account_id = hcb.id limit 1) as c3, "+
"(select 1 from history_transaction_participants where history_account_id = hcb.id limit 1) as c4, "+
"1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+
"where c0 IS NULL and c1 IS NULL and c2 IS NULL and c3 IS NULL and c4 IS NULL and 1=1);", query)
}
18 changes: 16 additions & 2 deletions services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestReapLookupTables(t *testing.T) {

var (
prevLedgers, curLedgers int
prevAccounts, curAccounts int
prevClaimableBalances, curClaimableBalances int
prevLiquidityPools, curLiquidityPools int
)
Expand All @@ -29,6 +30,8 @@ func TestReapLookupTables(t *testing.T) {
{
err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
Expand All @@ -45,7 +48,7 @@ func TestReapLookupTables(t *testing.T) {
err = q.Begin()
tt.Require.NoError(err)

newOffsets, err := q.ReapLookupTables(tt.Ctx, nil)
deletedCount, newOffsets, err := q.ReapLookupTables(tt.Ctx, nil)
tt.Require.NoError(err)

err = q.Commit()
Expand All @@ -55,6 +58,8 @@ func TestReapLookupTables(t *testing.T) {
{
err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
Expand All @@ -63,12 +68,21 @@ func TestReapLookupTables(t *testing.T) {

tt.Assert.Equal(61, prevLedgers, "prevLedgers")
tt.Assert.Equal(1, curLedgers, "curLedgers")

tt.Assert.Equal(25, prevAccounts, "prevAccounts")
tt.Assert.Equal(1, curAccounts, "curAccounts")
tt.Assert.Equal(int64(24), deletedCount["history_accounts"], `deletedCount["history_accounts"]`)

tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances")
tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances")
tt.Assert.Equal(int64(1), deletedCount["history_claimable_balances"], `deletedCount["history_claimable_balances"]`)

tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools")
tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools")
tt.Assert.Equal(int64(1), deletedCount["history_liquidity_pools"], `deletedCount["history_liquidity_pools"]`)

tt.Assert.Len(newOffsets, 2)
tt.Assert.Len(newOffsets, 3)
tt.Assert.Equal(int64(0), newOffsets["history_accounts"])
tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"])
tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"])
}
23 changes: 23 additions & 0 deletions services/horizon/internal/db2/schema/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +migrate Up

ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_account_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_asset_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_account_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_asset_id_fkey;

-- +migrate Down

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_base_account_id_fkey FOREIGN KEY (base_account_id) REFERENCES history_accounts(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_base_asset_id_fkey FOREIGN KEY (base_asset_id) REFERENCES history_assets(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_counter_account_id_fkey FOREIGN KEY (counter_account_id) REFERENCES history_accounts(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_counter_asset_id_fkey FOREIGN KEY (counter_asset_id) REFERENCES history_assets(id);
18 changes: 17 additions & 1 deletion services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Config struct {
HistoryArchiveURL string

DisableStateVerification bool
EnableReapLookupTables bool
EnableExtendedLogLedgerStats bool

ReingestEnabled bool
Expand Down Expand Up @@ -680,6 +681,10 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) {
}

func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
if !s.config.EnableReapLookupTables {
return
}

// Check if lastIngestedLedger is the last one available in the backend
sequence, err := s.ledgerBackend.GetLatestLedgerSequence(s.ctx)
if err != nil {
Expand Down Expand Up @@ -712,7 +717,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
defer cancel()

reapStart := time.Now()
newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
deletedCount, newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
if err != nil {
log.WithField("err", err).Warn("Error reaping lookup tables")
return
Expand All @@ -724,6 +729,17 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
return
}

totalDeleted := int64(0)
reapLog := log
for table, c := range deletedCount {
totalDeleted += c
reapLog = reapLog.WithField(table, c)
}

if totalDeleted > 0 {
reapLog.Info("Reaper deleted rows from lookup tables")
}

s.reapOffsets = newOffsets
reapDuration := time.Since(reapStart).Seconds()
s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration))
Expand Down
12 changes: 8 additions & 4 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,16 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder(maxBatchSize int) history.TradeBatc
return args.Get(0).(history.TradeBatchInsertBuilder)
}

func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) {
func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) {
args := m.Called(ctx, offsets)
if args.Get(0) == nil {
return nil, args.Error(1)
var r1, r2 map[string]int64
if args.Get(0) != nil {
r1 = args.Get(0).(map[string]int64)
}
if args.Get(1) != nil {
r1 = args.Get(1).(map[string]int64)
}
return args.Get(0).(map[string]int64), args.Error(1)
return r1, r2, args.Error(2)
}

func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error {
Expand Down
Loading

0 comments on commit 69225cf

Please sign in to comment.