From 69225cf8c6033fed20de507501e410287349dd7a Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 11 Aug 2022 15:04:41 +0200 Subject: [PATCH] services/horizon: Fix ReapLookupTables query (#4525) This commit fixes the query added in ee063a7. The previous query was using `limit ... offset ...` which becomes slow for larger `offset` values. This is happening because (from docs[1]): > The rows skipped by an OFFSET clause still have to be computed inside the > server; therefore a large OFFSET might be inefficient. I rewrote the query to use `id` field in tables in `where id > ...`. Before each run, the new `id` offset is fetched from the table that is later returned to be used in the next cycle. I also realized that rewriting `count(*)` in subqueries to `1` with `limit 1` further improves the query performance. This is because instead of counting all rows in parent tables it just check if there are _any_ rows in tables (what we need to determine if the row is orphaned or not). This allows running the reaper also for `history_accounts` and `history_assets` tables. Finally this commit adds logging and enable reaper only when `--history-retention-count` is set. [1] https://www.postgresql.org/docs/current/queries-limit.html --- services/horizon/internal/db2/history/main.go | 86 +++++++++++++------ .../horizon/internal/db2/history/main_test.go | 14 +-- .../horizon/internal/db2/history/reap_test.go | 18 +++- .../horizon/internal/db2/schema/bindata.go | 23 +++++ .../59_remove_foreign_key_constraints.sql | 20 +++++ services/horizon/internal/ingest/main.go | 18 +++- services/horizon/internal/ingest/main_test.go | 12 ++- .../internal/ingest/resume_state_test.go | 50 +++++++++-- services/horizon/internal/init.go | 4 +- 9 files changed, 199 insertions(+), 46 deletions(-) create mode 100644 services/horizon/internal/db2/schema/migrations/59_remove_foreign_key_constraints.sql diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 775f9bb14f..10eb423290 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -262,7 +262,7 @@ type IngestionQ interface { NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) + ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -839,18 +839,46 @@ type tableObjectFieldPair struct { // which aren't used (orphaned), i.e. history entries for them were reaped. // This method must be executed inside ingestion transaction. Otherwise it may // create invalid state in lookup and history tables. -func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) { +func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( + map[string]int64, // deleted rows count + map[string]int64, // new offsets + error, +) { if q.GetTx() == nil { - return nil, errors.New("cannot be called outside of an ingestion transaction") + return nil, nil, errors.New("cannot be called outside of an ingestion transaction") } - const batchSize = 10000 + const batchSize = 1000 + + deletedCount := make(map[string]int64) if offsets == nil { offsets = make(map[string]int64) } for table, historyTables := range map[string][]tableObjectFieldPair{ + "history_accounts": { + { + name: "history_effects", + objectField: "history_account_id", + }, + { + name: "history_operation_participants", + objectField: "history_account_id", + }, + { + name: "history_trades", + objectField: "base_account_id", + }, + { + name: "history_trades", + objectField: "counter_account_id", + }, + { + name: "history_transaction_participants", + objectField: "history_account_id", + }, + }, "history_claimable_balances": { { name: "history_operation_claimable_balances", @@ -874,31 +902,37 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ } { query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) if err != nil { - return nil, errors.Wrap(err, "error constructing a query") + return nil, nil, errors.Wrap(err, "error constructing a query") + } + + // Find new offset before removing the rows + var newOffset int64 + err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) + if err != nil { + if q.NoRows(err) { + newOffset = 0 + } else { + return nil, nil, err + } } - _, err = q.ExecRaw( + res, err := q.ExecRaw( context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), query, ) if err != nil { - return nil, errors.Wrapf(err, "error running query: %s", query) + return nil, nil, errors.Wrapf(err, "error running query: %s", query) } - offsets[table] += batchSize - - // Check if offset exceeds table size and then reset it - var count int64 - err = q.GetRaw(ctx, &count, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)) + rows, err := res.RowsAffected() if err != nil { - return nil, err + return nil, nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) } - if offsets[table] > count { - offsets[table] = 0 - } + deletedCount[table] = rows + offsets[table] = newOffset } - return offsets, nil + return deletedCount, offsets, nil } // constructReapLookupTablesQuery creates a query like (using history_claimable_balances @@ -908,13 +942,13 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[ // // (select id from // (select id, -// (select count(*) from history_operation_claimable_balances -// where history_claimable_balance_id = hcb.id) as c1, -// (select count(*) from history_transaction_claimable_balances -// where history_claimable_balance_id = hcb.id) as c2, +// (select 1 from history_operation_claimable_balances +// where history_claimable_balance_id = hcb.id limit 1) as c1, +// (select 1 from history_transaction_claimable_balances +// where history_claimable_balance_id = hcb.id limit 1) as c2, // 1 as cx, -// from history_claimable_balances hcb order by id limit 100 offset 1000) -// as sub where c1 = 0 and c2 = 0 and 1=1); +// from history_claimable_balances hcb where id > 1000 order by id limit 100) +// as sub where c1 IS NULL and c2 IS NULL and 1=1); // // In short it checks the 100 rows omiting 1000 row of history_claimable_balances // and counts occurences of each row in corresponding history tables. @@ -937,7 +971,7 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie for i, historyTable := range historyTables { _, err = fmt.Fprintf( &sb, - `(select count(*) from %s where %s = hcb.id) as c%d, `, + `(select 1 from %s where %s = hcb.id limit 1) as c%d, `, historyTable.name, historyTable.objectField, i, @@ -947,13 +981,13 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie } } - _, err = fmt.Fprintf(&sb, "1 as cx from %s hcb order by id limit %d offset %d) as sub where ", table, batchSize, offset) + _, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize) if err != nil { return "", err } for i := range historyTables { - _, err = fmt.Fprintf(&sb, "c%d = 0 and ", i) + _, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i) if err != nil { return "", err } diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 8123f4c786..792f9826aa 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -102,11 +102,11 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { assert.Equal(t, "delete from history_accounts where id IN "+ "(select id from "+ - "(select id, (select count(*) from history_effects where history_account_id = hcb.id) as c0, "+ - "(select count(*) from history_operation_participants where history_account_id = hcb.id) as c1, "+ - "(select count(*) from history_trades where base_account_id = hcb.id) as c2, "+ - "(select count(*) from history_trades where counter_account_id = hcb.id) as c3, "+ - "(select count(*) from history_transaction_participants where history_account_id = hcb.id) as c4, "+ - "1 as cx from history_accounts hcb order by id limit 10 offset 0) as sub "+ - "where c0 = 0 and c1 = 0 and c2 = 0 and c3 = 0 and c4 = 0 and 1=1);", query) + "(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+ + "(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+ + "(select 1 from history_trades where base_account_id = hcb.id limit 1) as c2, "+ + "(select 1 from history_trades where counter_account_id = hcb.id limit 1) as c3, "+ + "(select 1 from history_transaction_participants where history_account_id = hcb.id limit 1) as c4, "+ + "1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+ + "where c0 IS NULL and c1 IS NULL and c2 IS NULL and c3 IS NULL and c4 IS NULL and 1=1);", query) } diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index cdd16b5387..5e62dc606f 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -21,6 +21,7 @@ func TestReapLookupTables(t *testing.T) { var ( prevLedgers, curLedgers int + prevAccounts, curAccounts int prevClaimableBalances, curClaimableBalances int prevLiquidityPools, curLiquidityPools int ) @@ -29,6 +30,8 @@ func TestReapLookupTables(t *testing.T) { { err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) @@ -45,7 +48,7 @@ func TestReapLookupTables(t *testing.T) { err = q.Begin() tt.Require.NoError(err) - newOffsets, err := q.ReapLookupTables(tt.Ctx, nil) + deletedCount, newOffsets, err := q.ReapLookupTables(tt.Ctx, nil) tt.Require.NoError(err) err = q.Commit() @@ -55,6 +58,8 @@ func TestReapLookupTables(t *testing.T) { { err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) tt.Require.NoError(err) err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) @@ -63,12 +68,21 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(61, prevLedgers, "prevLedgers") tt.Assert.Equal(1, curLedgers, "curLedgers") + + tt.Assert.Equal(25, prevAccounts, "prevAccounts") + tt.Assert.Equal(1, curAccounts, "curAccounts") + tt.Assert.Equal(int64(24), deletedCount["history_accounts"], `deletedCount["history_accounts"]`) + tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") + tt.Assert.Equal(int64(1), deletedCount["history_claimable_balances"], `deletedCount["history_claimable_balances"]`) + tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") + tt.Assert.Equal(int64(1), deletedCount["history_liquidity_pools"], `deletedCount["history_liquidity_pools"]`) - tt.Assert.Len(newOffsets, 2) + tt.Assert.Len(newOffsets, 3) + tt.Assert.Equal(int64(0), newOffsets["history_accounts"]) tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"]) tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"]) } diff --git a/services/horizon/internal/db2/schema/bindata.go b/services/horizon/internal/db2/schema/bindata.go index 91b28f93b1..1d6c2b7bde 100644 --- a/services/horizon/internal/db2/schema/bindata.go +++ b/services/horizon/internal/db2/schema/bindata.go @@ -54,6 +54,7 @@ // migrations/56_txsub_read_only.sql (784B) // migrations/57_trade_aggregation_autovac.sql (282B) // migrations/58_add_index_by_id_optimization.sql (868B) +// migrations/59_remove_foreign_key_constraints.sql (981B) // migrations/5_create_trades_table.sql (1.1kB) // migrations/6_create_assets_table.sql (366B) // migrations/7_modify_trades_table.sql (2.303kB) @@ -1208,6 +1209,26 @@ func migrations58_add_index_by_id_optimizationSql() (*asset, error) { return a, nil } +var _migrations59_remove_foreign_key_constraintsSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x92\xd1\x4a\xc3\x30\x18\x85\xef\xf3\x14\xe7\x72\x43\xf6\x04\xbb\x8a\xcb\x3f\x19\x96\x44\xb2\x78\xb1\xab\x10\xd7\xa8\x41\x5c\x25\x89\xc8\xde\x5e\x94\x52\xda\xda\xa2\x42\xd8\xf5\x39\x1c\xbe\x7c\xf9\x57\x2b\x5c\xbd\x86\xa7\xe8\xb2\xc7\xfd\x1b\x63\xbc\x32\xa4\x61\xf8\x75\x45\x50\xb2\x3a\xe0\x39\xa4\xdc\xc4\xb3\xcd\xd1\xd5\x3e\x41\x68\x75\x87\x8d\x92\x7b\xa3\xf9\x4e\x9a\x51\x6c\x1f\x5c\xf2\xd6\x1d\x8f\xcd\xfb\x29\xdb\x50\xdb\xc7\x17\x7f\x5e\x97\x19\x4d\xc9\x97\x9b\xfc\x06\xf4\xb1\x38\x6a\xb7\x3b\xa4\x65\x7d\xcd\xa2\xf9\x38\xfd\x2a\x9a\x01\x00\x17\xe2\x9f\xae\xb1\x55\x9a\x76\x37\x12\xb7\x74\xc0\x62\xd4\x58\x42\xd3\x96\x34\xc9\x0d\xed\xbb\xb1\x36\x4e\x8b\x50\x2f\xd7\xe5\xb0\xfa\xef\x9f\x82\x6a\xf3\x69\xa4\xaf\xb0\x1c\xd0\xcc\x5f\x0f\xa9\x7e\x96\x2e\x63\x6b\xf2\x60\x66\xd0\xfe\xe8\x0c\xec\x33\x00\x00\xff\xff\xd7\x39\x62\x6b\xd5\x03\x00\x00") + +func migrations59_remove_foreign_key_constraintsSqlBytes() ([]byte, error) { + return bindataRead( + _migrations59_remove_foreign_key_constraintsSql, + "migrations/59_remove_foreign_key_constraints.sql", + ) +} + +func migrations59_remove_foreign_key_constraintsSql() (*asset, error) { + bytes, err := migrations59_remove_foreign_key_constraintsSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "migrations/59_remove_foreign_key_constraints.sql", size: 0, mode: os.FileMode(0), modTime: time.Unix(0, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x8a, 0xef, 0x36, 0x64, 0x26, 0xdf, 0xec, 0x49, 0x62, 0x47, 0xca, 0x8c, 0x1e, 0x29, 0x37, 0x7c, 0x9f, 0xc8, 0x47, 0x94, 0x28, 0xc9, 0xdb, 0x3e, 0x95, 0xdb, 0x8d, 0xc8, 0xf0, 0x38, 0xf1, 0x8c}} + return a, nil +} + var _migrations5_create_trades_tableSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x94\x94\x51\x6f\xaa\x40\x10\x85\xdf\xf9\x15\x13\x9f\x30\x17\x93\x7b\x6f\x5a\x5f\x4c\x9a\x58\x25\xad\xa9\xc1\xd6\x4a\xd2\x37\xb2\xb0\x23\x6c\xa2\x2c\x99\x1d\xda\xf0\xef\x1b\x68\x69\x10\x57\xad\xaf\x9c\x39\x67\x38\xbb\x5f\x76\x34\x82\x3f\x7b\x95\x92\x60\x84\xb0\x70\x66\x6b\x7f\xba\xf1\x61\x33\xbd\x5f\xfa\x90\x29\xc3\x9a\xaa\x88\x49\x48\x34\xe0\x3a\x00\xf0\xf3\x51\x17\x48\x82\x95\xce\x23\x25\x21\x56\xa9\xca\x19\x82\xd5\x06\x82\x70\xb9\xf4\x9a\xc9\x81\x26\x89\x34\x00\x95\x33\xa6\x48\x1d\xb5\x91\xf5\x76\x8b\x64\x35\x37\xb2\xc1\xdd\xee\x84\x5e\xcb\x71\x59\x9d\x75\xeb\x9d\x8c\x84\x31\xc8\x11\x57\x05\x42\x92\x09\x12\x09\x23\xc1\xbb\xa0\x4a\xe5\xa9\x3b\xbe\x19\xf6\x22\x3b\x1e\x65\x4c\x89\x64\x71\xdd\x8e\xcf\xb8\x12\x2d\x6d\x9b\xfe\xfd\xb7\x7b\xf6\xba\xcc\xb9\xff\xff\x30\x7b\xf4\x67\x4f\xe0\x76\x47\xee\xe0\xef\xf0\xbb\x57\xac\xcb\x34\xe3\x6b\x9b\x1d\xb8\xae\xe8\x76\xe0\xfb\x75\xbb\xd6\x75\xb6\xdf\xe1\x50\xdd\xd0\x19\x4e\x9c\x96\xbf\x30\x58\xbc\x84\x3e\x2c\x82\xb9\xff\x06\x19\x93\x8c\x0a\x25\x61\x15\xf4\x91\x0c\x5f\x17\xc1\x03\xc4\x4c\x88\xe0\xda\xc8\xf4\x5a\x0a\x3b\xe1\x9d\xd4\xb8\x8a\x1a\x0c\x2f\x45\xb7\xac\xda\x52\xea\x90\xfa\xb6\x2e\x65\xf4\x90\xf4\xfa\xe4\x78\xc7\x00\x9e\x5a\xf7\x75\x78\x97\x16\x1e\xb1\xe2\x1d\x5f\xa8\x67\x63\xa3\x5e\xdb\x7d\x17\xe6\xfa\x23\x77\xe6\xeb\xd5\xb3\xfd\x5d\x48\x84\x49\x84\xc4\x89\xf3\x19\x00\x00\xff\xff\x79\x87\x24\x6b\x4c\x04\x00\x00") func migrations5_create_trades_tableSqlBytes() ([]byte, error) { @@ -1473,6 +1494,7 @@ var _bindata = map[string]func() (*asset, error){ "migrations/56_txsub_read_only.sql": migrations56_txsub_read_onlySql, "migrations/57_trade_aggregation_autovac.sql": migrations57_trade_aggregation_autovacSql, "migrations/58_add_index_by_id_optimization.sql": migrations58_add_index_by_id_optimizationSql, + "migrations/59_remove_foreign_key_constraints.sql": migrations59_remove_foreign_key_constraintsSql, "migrations/5_create_trades_table.sql": migrations5_create_trades_tableSql, "migrations/6_create_assets_table.sql": migrations6_create_assets_tableSql, "migrations/7_modify_trades_table.sql": migrations7_modify_trades_tableSql, @@ -1579,6 +1601,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "56_txsub_read_only.sql": &bintree{migrations56_txsub_read_onlySql, map[string]*bintree{}}, "57_trade_aggregation_autovac.sql": &bintree{migrations57_trade_aggregation_autovacSql, map[string]*bintree{}}, "58_add_index_by_id_optimization.sql": &bintree{migrations58_add_index_by_id_optimizationSql, map[string]*bintree{}}, + "59_remove_foreign_key_constraints.sql": &bintree{migrations59_remove_foreign_key_constraintsSql, map[string]*bintree{}}, "5_create_trades_table.sql": &bintree{migrations5_create_trades_tableSql, map[string]*bintree{}}, "6_create_assets_table.sql": &bintree{migrations6_create_assets_tableSql, map[string]*bintree{}}, "7_modify_trades_table.sql": &bintree{migrations7_modify_trades_tableSql, map[string]*bintree{}}, diff --git a/services/horizon/internal/db2/schema/migrations/59_remove_foreign_key_constraints.sql b/services/horizon/internal/db2/schema/migrations/59_remove_foreign_key_constraints.sql new file mode 100644 index 0000000000..7faebe98fe --- /dev/null +++ b/services/horizon/internal/db2/schema/migrations/59_remove_foreign_key_constraints.sql @@ -0,0 +1,20 @@ +-- +migrate Up + +ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_account_id_fkey; +ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_base_asset_id_fkey; +ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_account_id_fkey; +ALTER TABLE ONLY history_trades DROP CONSTRAINT history_trades_counter_asset_id_fkey; + +-- +migrate Down + +ALTER TABLE ONLY history_trades + ADD CONSTRAINT history_trades_base_account_id_fkey FOREIGN KEY (base_account_id) REFERENCES history_accounts(id); + +ALTER TABLE ONLY history_trades + ADD CONSTRAINT history_trades_base_asset_id_fkey FOREIGN KEY (base_asset_id) REFERENCES history_assets(id); + +ALTER TABLE ONLY history_trades + ADD CONSTRAINT history_trades_counter_account_id_fkey FOREIGN KEY (counter_account_id) REFERENCES history_accounts(id); + +ALTER TABLE ONLY history_trades + ADD CONSTRAINT history_trades_counter_asset_id_fkey FOREIGN KEY (counter_asset_id) REFERENCES history_assets(id); diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index aaf054acd7..d12e7fa8bb 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -85,6 +85,7 @@ type Config struct { HistoryArchiveURL string DisableStateVerification bool + EnableReapLookupTables bool EnableExtendedLogLedgerStats bool ReingestEnabled bool @@ -680,6 +681,10 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) { } func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { + if !s.config.EnableReapLookupTables { + return + } + // Check if lastIngestedLedger is the last one available in the backend sequence, err := s.ledgerBackend.GetLatestLedgerSequence(s.ctx) if err != nil { @@ -712,7 +717,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { defer cancel() reapStart := time.Now() - newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets) + deletedCount, newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets) if err != nil { log.WithField("err", err).Warn("Error reaping lookup tables") return @@ -724,6 +729,17 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { return } + totalDeleted := int64(0) + reapLog := log + for table, c := range deletedCount { + totalDeleted += c + reapLog = reapLog.WithField(table, c) + } + + if totalDeleted > 0 { + reapLog.Info("Reaper deleted rows from lookup tables") + } + s.reapOffsets = newOffsets reapDuration := time.Since(reapStart).Seconds() s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration)) diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index b5d0dfb78c..7b930e80a2 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -374,12 +374,16 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder(maxBatchSize int) history.TradeBatc return args.Get(0).(history.TradeBatchInsertBuilder) } -func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) { +func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) { args := m.Called(ctx, offsets) - if args.Get(0) == nil { - return nil, args.Error(1) + var r1, r2 map[string]int64 + if args.Get(0) != nil { + r1 = args.Get(0).(map[string]int64) + } + if args.Get(1) != nil { + r1 = args.Get(1).(map[string]int64) } - return args.Get(0).(map[string]int64), args.Error(1) + return r1, r2, args.Error(2) } func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error { diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index eadae11bc5..7f336a79ae 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -283,8 +283,6 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { ).Return(nil).Once() s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() - // Reap lookup tables - s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(0), nil) } func (s *ResumeTestTestSuite) TestBumpIngestLedger() { *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} @@ -368,8 +366,46 @@ func (s *ResumeTestTestSuite) TestErrorSettingCursorIgnored() { s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() - // Reap lookup tables - s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(0), nil) + + next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) + s.Assert().NoError(err) + s.Assert().Equal( + transition{ + node: resumeState{latestSuccessfullyProcessedLedger: 101}, + sleepDuration: 0, + }, + next, + ) +} + +func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() + s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() + s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(100), nil) + + s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")). + Run(func(args mock.Arguments) { + meta := args.Get(0).(xdr.LedgerCloseMeta) + s.Assert().Equal(uint32(101), meta.LedgerSequence()) + }). + Return( + ledgerStats{}, + nil, + ).Once() + s.historyQ.On("UpdateLastLedgerIngest", s.ctx, uint32(101)).Return(nil).Once() + s.historyQ.On("Commit").Return(nil).Once() + + s.stellarCoreClient.On( + "SetCursor", + mock.AnythingOfType("*context.timerCtx"), + defaultCoreCursorName, + int32(101), + ).Return(nil).Once() + + s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() + s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() + // Reap lookup tables not executed next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) s.Assert().NoError(err) @@ -383,6 +419,10 @@ func (s *ResumeTestTestSuite) TestErrorSettingCursorIgnored() { } func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { + s.system.config.EnableReapLookupTables = true + defer func() { + s.system.config.EnableReapLookupTables = false + }() s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() @@ -413,7 +453,7 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(101), nil) s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() - s.historyQ.On("ReapLookupTables", mock.AnythingOfType("*context.timerCtx"), mock.Anything).Return(nil, errors.New("error reaping objects")).Once() + s.historyQ.On("ReapLookupTables", mock.AnythingOfType("*context.timerCtx"), mock.Anything).Return(nil, nil, errors.New("error reaping objects")).Once() s.historyQ.On("Rollback").Return(nil).Once() next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 3c548691e5..4150a1e2a7 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -2,10 +2,11 @@ package horizon import ( "context" - "github.com/stellar/go/services/horizon/internal/paths" "net/http" "runtime" + "github.com/stellar/go/services/horizon/internal/paths" + "github.com/getsentry/raven-go" "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/exp/orderbook" @@ -116,6 +117,7 @@ func initIngester(app *App) { RemoteCaptiveCoreURL: app.config.RemoteCaptiveCoreURL, EnableCaptiveCore: app.config.EnableCaptiveCoreIngestion, DisableStateVerification: app.config.IngestDisableStateVerification, + EnableReapLookupTables: app.config.HistoryRetentionCount > 0, EnableExtendedLogLedgerStats: app.config.IngestEnableExtendedLogLedgerStats, RoundingSlippageFilter: app.config.RoundingSlippageFilter, EnableIngestionFiltering: app.config.EnableIngestionFiltering,