Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Fix ReapLookupTables query #4525

Merged
merged 15 commits into from
Aug 11, 2022
66 changes: 44 additions & 22 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,13 +843,35 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[
return nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 10000
const batchSize = 1000

if offsets == nil {
offsets = make(map[string]int64)
}

for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
"history_claimable_balances": {
{
name: "history_operation_claimable_balances",
Expand All @@ -876,6 +898,17 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[
return nil, errors.Wrap(err, "error constructing a query")
}

// Find new offset before removing the rows
var newOffset int64
err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
if err != nil {
if q.NoRows(err) {
newOffset = 0
} else {
return nil, err
}
}

_, err = q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType),
query,
Expand All @@ -884,18 +917,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[
return nil, errors.Wrapf(err, "error running query: %s", query)
}

offsets[table] += batchSize

// Check if offset exceeds table size and then reset it
var count int64
err = q.GetRaw(ctx, &count, fmt.Sprintf("SELECT COUNT(*) FROM %s", table))
if err != nil {
return nil, err
}

if offsets[table] > count {
offsets[table] = 0
}
offsets[table] = newOffset
}
return offsets, nil
}
Expand All @@ -906,13 +928,13 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[
// delete from history_claimable_balances where id in
// (select id from
// (select id,
// (select count(*) from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id) as c1,
// (select count(*) from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id) as c2,
// (select 1 from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c1,
// (select 1 from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c2,
// 1 as cx,
// from history_claimable_balances hcb order by id limit 100 offset 1000)
// as sub where c1 = 0 and c2 = 0 and 1=1);
// from history_claimable_balances hcb where id > 1000 order by id limit 100)
// as sub where c1 IS NULL and c2 IS NULL and 1=1);
//
// In short it checks the 100 rows omiting 1000 row of history_claimable_balances
// and counts occurences of each row in corresponding history tables.
Expand All @@ -935,7 +957,7 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie
for i, historyTable := range historyTables {
_, err = fmt.Fprintf(
&sb,
`(select count(*) from %s where %s = hcb.id) as c%d, `,
`(select 1 from %s where %s = hcb.id limit 1) as c%d, `,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice optimization 👍

historyTable.name,
historyTable.objectField,
i,
Expand All @@ -945,13 +967,13 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie
}
}

_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb order by id limit %d offset %d) as sub where ", table, batchSize, offset)
_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize)
if err != nil {
return "", err
}

for i := range historyTables {
_, err = fmt.Fprintf(&sb, "c%d = 0 and ", i)
_, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i)
if err != nil {
return "", err
}
Expand Down
14 changes: 7 additions & 7 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func TestConstructReapLookupTablesQuery(t *testing.T) {
assert.Equal(t,
"delete from history_accounts where id IN "+
"(select id from "+
"(select id, (select count(*) from history_effects where history_account_id = hcb.id) as c0, "+
"(select count(*) from history_operation_participants where history_account_id = hcb.id) as c1, "+
"(select count(*) from history_trades where base_account_id = hcb.id) as c2, "+
"(select count(*) from history_trades where counter_account_id = hcb.id) as c3, "+
"(select count(*) from history_transaction_participants where history_account_id = hcb.id) as c4, "+
"1 as cx from history_accounts hcb order by id limit 10 offset 0) as sub "+
"where c0 = 0 and c1 = 0 and c2 = 0 and c3 = 0 and c4 = 0 and 1=1);", query)
"(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+
"(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+
"(select 1 from history_trades where base_account_id = hcb.id limit 1) as c2, "+
"(select 1 from history_trades where counter_account_id = hcb.id limit 1) as c3, "+
"(select 1 from history_transaction_participants where history_account_id = hcb.id limit 1) as c4, "+
"1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are negative ids possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the 0 in id >=0 condition is the offset param and it's there just to ensure we move forward when iterating over the tables in batches. So for next cycle it will be id >=1000 if the ID after 1000 rows (batch size) is equal 1000.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could probably clean this up with WITH clauses, and named sub-queries?

"where c0 IS NULL and c1 IS NULL and c2 IS NULL and c3 IS NULL and c4 IS NULL and 1=1);", query)
}
10 changes: 9 additions & 1 deletion services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestReapLookupTables(t *testing.T) {

var (
prevLedgers, curLedgers int
prevAccounts, curAccounts int
prevClaimableBalances, curClaimableBalances int
prevLiquidityPools, curLiquidityPools int
)
Expand All @@ -29,6 +30,8 @@ func TestReapLookupTables(t *testing.T) {
{
err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
Expand All @@ -55,6 +58,8 @@ func TestReapLookupTables(t *testing.T) {
{
err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
Expand All @@ -63,12 +68,15 @@ func TestReapLookupTables(t *testing.T) {

tt.Assert.Equal(61, prevLedgers, "prevLedgers")
tt.Assert.Equal(1, curLedgers, "curLedgers")
tt.Assert.Equal(25, prevAccounts, "prevAccounts")
tt.Assert.Equal(1, curAccounts, "curAccounts")
tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances")
tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances")
tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools")
tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools")

tt.Assert.Len(newOffsets, 2)
tt.Assert.Len(newOffsets, 3)
tt.Assert.Equal(int64(0), newOffsets["history_accounts"])
tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"])
tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"])
}
23 changes: 23 additions & 0 deletions services/horizon/internal/db2/schema/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +migrate Up

ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_account_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_asset_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_account_id_fkey;
ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_asset_id_fkey;

-- +migrate Down

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_base_account_id_fkey FOREIGN KEY (base_account_id) REFERENCES history_accounts(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_base_asset_id_fkey FOREIGN KEY (base_asset_id) REFERENCES history_assets(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_counter_account_id_fkey FOREIGN KEY (counter_account_id) REFERENCES history_accounts(id);

ALTER TABLE ONLY history_trades
ADD CONSTRAINT history_trades_counter_asset_id_fkey FOREIGN KEY (counter_asset_id) REFERENCES history_assets(id);
2 changes: 2 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
}

s.reapOffsets = newOffsets
// Remove before merging
log.WithField("offsets", newOffsets).Info("New offsets")
bartekn marked this conversation as resolved.
Show resolved Hide resolved
reapDuration := time.Since(reapStart).Seconds()
s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration))
}
Expand Down