From 73327c4cbd7b4274f085b1d3fb4e72fb9809b17c Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 17 Nov 2022 13:46:30 +0100 Subject: [PATCH] Add last_modified_ledger to claimable_balance_claimants and fix other queries --- .../actions/claimable_balance_test.go | 15 ++-- ...e_balance_claimant_batch_insert_builder.go | 2 +- .../db2/history/claimable_balances.go | 72 ++++++++++--------- .../db2/history/claimable_balances_test.go | 15 ++-- .../db2/history/mock_q_claimable_balances.go | 4 +- .../horizon/internal/db2/schema/bindata.go | 6 +- .../62_claimable_balance_claimants.sql | 20 +++++- .../claimable_balances_change_processor.go | 5 +- ...laimable_balances_change_processor_test.go | 5 +- services/horizon/internal/ingest/verify.go | 9 ++- .../ingest/verify_range_state_test.go | 10 ++- 11 files changed, 101 insertions(+), 62 deletions(-) diff --git a/services/horizon/internal/actions/claimable_balance_test.go b/services/horizon/internal/actions/claimable_balance_test.go index bf5aa74644..ff59b96134 100644 --- a/services/horizon/internal/actions/claimable_balance_test.go +++ b/services/horizon/internal/actions/claimable_balance_test.go @@ -49,8 +49,9 @@ func TestGetClaimableBalanceByID(t *testing.T) { claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10) for _, claimant := range cBalance.Claimants { claimant := history.ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, + BalanceID: cBalance.BalanceID, + Destination: claimant.Destination, + LastModifiedLedger: cBalance.LastModifiedLedger, } err = claimantsInsertBuilder.Add(tt.Ctx, claimant) tt.Assert.NoError(err) @@ -194,8 +195,9 @@ func TestGetClaimableBalances(t *testing.T) { for _, cBalance := range hCBs { for _, claimant := range cBalance.Claimants { claimant := history.ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, + BalanceID: cBalance.BalanceID, + Destination: claimant.Destination, + LastModifiedLedger: cBalance.LastModifiedLedger, } err = claimantsInsertBuilder.Add(tt.Ctx, claimant) tt.Assert.NoError(err) @@ -320,8 +322,9 @@ func TestGetClaimableBalances(t *testing.T) { for _, cBalance := range hCBs { for _, claimant := range cBalance.Claimants { claimant := history.ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, + BalanceID: cBalance.BalanceID, + Destination: claimant.Destination, + LastModifiedLedger: cBalance.LastModifiedLedger, } err = claimantsInsertBuilder.Add(tt.Ctx, claimant) tt.Assert.NoError(err) diff --git a/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go b/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go index bb90d83041..a1a13d84db 100644 --- a/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go @@ -27,7 +27,7 @@ func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) Clai builder: db.BatchInsertBuilder{ Table: q.GetTable("claimable_balance_claimants"), MaxBatchSize: maxBatchSize, - Suffix: "ON CONFLICT (id, destination) DO NOTHING", + Suffix: "ON CONFLICT (id, destination) DO UPDATE SET last_modified_ledger=EXCLUDED.last_modified_ledger", }, } } diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go index 88c1180337..3fe4029dc3 100644 --- a/services/horizon/internal/db2/history/claimable_balances.go +++ b/services/horizon/internal/db2/history/claimable_balances.go @@ -4,6 +4,7 @@ import ( "context" "database/sql/driver" "encoding/json" + "fmt" "strconv" "strings" @@ -56,34 +57,29 @@ func (cbq ClaimableBalancesQuery) Cursor() (int64, string, error) { // ApplyCursor applies cursor to the given sql. For performance reason the limit // is not applied here. This allows us to hint the planner later to use the right // indexes. -func (cbq ClaimableBalancesQuery) ApplyCursor(sql sq.SelectBuilder) (sq.SelectBuilder, error) { - p := cbq.PageQuery +func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, lCursor int64, rCursor string, order string) (sq.SelectBuilder, error) { hasPagedLimit := false - l, r, err := cbq.Cursor() - if err != nil { - return sql, err - } - if l > 0 && r != "" { + if lCursor > 0 && rCursor != "" { hasPagedLimit = true } - switch p.Order { + switch order { case db2.OrderAscending: if hasPagedLimit { sql = sql. - Where(sq.Expr("(cb.last_modified_ledger, cb.id) > (?, ?)", l, r)) + Where(sq.Expr("(last_modified_ledger, id) > (?, ?)", lCursor, rCursor)) } - sql = sql.OrderBy("cb.last_modified_ledger asc, cb.id asc") + sql = sql.OrderBy("last_modified_ledger asc, id asc") case db2.OrderDescending: if hasPagedLimit { sql = sql. - Where(sq.Expr("(cb.last_modified_ledger, cb.id) < (?, ?)", l, r)) + Where(sq.Expr("(last_modified_ledger, id) < (?, ?)", lCursor, rCursor)) } - sql = sql.OrderBy("cb.last_modified_ledger desc, cb.id desc") + sql = sql.OrderBy("last_modified_ledger desc, id desc") default: - return sql, errors.Errorf("invalid order: %s", p.Order) + return sql, errors.Errorf("invalid order: %s", order) } return sql, nil @@ -92,8 +88,9 @@ func (cbq ClaimableBalancesQuery) ApplyCursor(sql sq.SelectBuilder) (sq.SelectBu // ClaimableBalanceClaimant is a row of data from the `claimable_balances_claimants` table. // This table exists to allow faster querying for claimable balances for a specific claimant. type ClaimableBalanceClaimant struct { - BalanceID string `db:"id"` - Destination string `db:"destination"` + BalanceID string `db:"id"` + Destination string `db:"destination"` + LastModifiedLedger uint32 `db:"last_modified_ledger"` } // ClaimableBalance is a row of data from the `claimable_balances` table. @@ -135,7 +132,7 @@ type QClaimableBalances interface { GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error) CountClaimableBalances(ctx context.Context) (int, error) NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder - GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]string, error) + GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error) } // CountClaimableBalances returns the total number of claimable balances in the DB @@ -160,16 +157,16 @@ func (q *Q) GetClaimableBalancesByID(ctx context.Context, ids []string) ([]Claim // GetClaimantsByClaimableBalances finds all claimants for ClaimableBalanceIds. // The returned list is sorted by ids and then destination ids for each balance id. -func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]string, error) { +func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error) { var claimants []ClaimableBalanceClaimant sql := sq.Select("*").From("claimable_balance_claimants cbc"). Where(map[string]interface{}{"cbc.id": ids}). OrderBy("id asc, destination asc") err := q.Select(ctx, &claimants, sql) - claimantsMap := make(map[string][]string) + claimantsMap := make(map[string][]ClaimableBalanceClaimant) for _, claimant := range claimants { - claimantsMap[claimant.BalanceID] = append(claimantsMap[claimant.BalanceID], claimant.Destination) + claimantsMap[claimant.BalanceID] = append(claimantsMap[claimant.BalanceID], claimant) } return claimantsMap, err } @@ -240,12 +237,12 @@ func (q *Q) FindClaimableBalanceByID(ctx context.Context, balanceID string) (Cla // GetClaimableBalances finds all claimable balances where accountID is one of the claimants func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQuery) ([]ClaimableBalance, error) { - sql, err := query.ApplyCursor(selectClaimableBalances) - // we need to use WITH syntax and correct LIMIT placement to force the query planner to use the right - // indexes, otherwise when the limit is small, it will use an index scan - // which will be very slow once we have millions of records - limitClausePlacement := "LIMIT ?) select " + claimableBalancesSelectStatement + " from cb" + l, r, err := query.Cursor() + if err != nil { + return nil, errors.Wrap(err, "error getting cursor") + } + sql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalances, l, r, query.PageQuery.Order) if err != nil { return nil, errors.Wrap(err, "could not apply query to page") } @@ -260,19 +257,26 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue } if query.Claimant != nil { + var selectClaimableBalanceClaimants = sq.Select("id").From("claimable_balance_claimants"). + Where("destination = ?", query.Claimant.Address()). + // Given that each destination can be a claimant for each balance maximum once + // we can LIMIT the subquery. + Limit(query.PageQuery.Limit) + subSql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalanceClaimants, l, r, query.PageQuery.Order) + if err != nil { + return nil, errors.Wrap(err, "could not apply subquery to page") + } + + subSqlString, subSqlArgs, err := subSql.ToSql() + if err != nil { + return nil, errors.Wrap(err, "could not build subquery") + } + sql = sql. - Where(`cb.id IN (select id from claimable_balance_claimants where destination = '` + query.Claimant.Address() + `')`) - // when search by claimant, profiling has shown the LIMIT should be on the outer query to - // hint appropriate indexes for best performance - limitClausePlacement = ") select " + claimableBalancesSelectStatement + " from cb LIMIT ?" + Where(fmt.Sprintf("cb.id IN (%s)", subSqlString), subSqlArgs...) } - sql = sql. - Prefix("WITH cb AS ("). - Suffix( - limitClausePlacement, - query.PageQuery.Limit, - ) + sql = sql.Limit(query.PageQuery.Limit) var results []ClaimableBalance if err := q.Select(ctx, &results, sql); err != nil { diff --git a/services/horizon/internal/db2/history/claimable_balances_test.go b/services/horizon/internal/db2/history/claimable_balances_test.go index 4175cec819..49cc722f57 100644 --- a/services/horizon/internal/db2/history/claimable_balances_test.go +++ b/services/horizon/internal/db2/history/claimable_balances_test.go @@ -91,8 +91,9 @@ func TestRemoveClaimableBalanceClaimants(t *testing.T) { for _, claimant := range cBalance.Claimants { claimant := ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, + BalanceID: cBalance.BalanceID, + Destination: claimant.Destination, + LastModifiedLedger: cBalance.LastModifiedLedger, } err = claimantsInsertBuilder.Add(tt.Ctx, claimant) tt.Assert.NoError(err) @@ -143,8 +144,9 @@ func TestFindClaimableBalancesByDestination(t *testing.T) { claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10) for _, claimant := range cBalance.Claimants { claimant := ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, + BalanceID: cBalance.BalanceID, + Destination: claimant.Destination, + LastModifiedLedger: cBalance.LastModifiedLedger, } err = claimantsInsertBuilder.Add(tt.Ctx, claimant) tt.Assert.NoError(err) @@ -182,8 +184,9 @@ func TestFindClaimableBalancesByDestination(t *testing.T) { for _, claimant := range cBalance.Claimants { claimant := ClaimableBalanceClaimant{ - BalanceID: cBalance.BalanceID, - Destination: claimant.Destination, + BalanceID: cBalance.BalanceID, + Destination: claimant.Destination, + LastModifiedLedger: cBalance.LastModifiedLedger, } err = claimantsInsertBuilder.Add(tt.Ctx, claimant) tt.Assert.NoError(err) diff --git a/services/horizon/internal/db2/history/mock_q_claimable_balances.go b/services/horizon/internal/db2/history/mock_q_claimable_balances.go index cae819106e..2493d9ebea 100644 --- a/services/horizon/internal/db2/history/mock_q_claimable_balances.go +++ b/services/horizon/internal/db2/history/mock_q_claimable_balances.go @@ -41,7 +41,7 @@ func (m *MockQClaimableBalances) NewClaimableBalanceClaimantBatchInsertBuilder(m return a.Get(0).(ClaimableBalanceClaimantBatchInsertBuilder) } -func (m *MockQClaimableBalances) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]string, error) { +func (m *MockQClaimableBalances) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error) { a := m.Called(ctx, ids) - return a.Get(0).(map[string][]string), a.Error(1) + return a.Get(0).(map[string][]ClaimableBalanceClaimant), a.Error(1) } diff --git a/services/horizon/internal/db2/schema/bindata.go b/services/horizon/internal/db2/schema/bindata.go index 33e23349cb..e7131a62a6 100644 --- a/services/horizon/internal/db2/schema/bindata.go +++ b/services/horizon/internal/db2/schema/bindata.go @@ -58,7 +58,7 @@ // migrations/5_create_trades_table.sql (1.1kB) // migrations/60_add_asset_id_indexes.sql (289B) // migrations/61_trust_lines_by_account_type_code_issuer.sql (383B) -// migrations/62_claimable_balance_claimants.sql (382B) +// migrations/62_claimable_balance_claimants.sql (1.428kB) // migrations/6_create_assets_table.sql (366B) // migrations/7_modify_trades_table.sql (2.303kB) // migrations/8_add_aggregators.sql (907B) @@ -1292,7 +1292,7 @@ func migrations61_trust_lines_by_account_type_code_issuerSql() (*asset, error) { return a, nil } -var _migrations62_claimable_balance_claimantsSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x90\x31\x6b\xc3\x30\x10\x85\x77\xfd\x8a\x47\x26\x9b\xc6\x5b\x9b\xc5\x93\x13\x8b\x62\xea\xca\xc1\x95\x21\x99\xcc\x49\x3e\x52\x81\xa3\x14\x59\xb4\xe4\xdf\x17\x12\xda\x7a\x32\x5d\x8f\xfb\xde\xbd\xfb\xb2\x0c\x0f\x67\x77\x0a\x14\x19\xdd\x87\x10\xbb\x56\x16\x5a\x42\x17\xdb\x5a\xc2\x8e\xe4\xce\x64\x46\xee\x0d\x8d\xe4\x2d\xf7\xf7\x89\x8f\x13\x12\x01\x00\x6e\x80\x96\x07\x0d\xd5\x68\xa8\xae\xae\xd7\xc8\x32\xec\x7e\xb0\xed\x9d\xaa\x4a\x38\x0f\x43\x13\x6f\x1e\x6f\xd4\xc0\x53\x74\x9e\xa2\xbb\x78\xd8\x77\x0a\x64\x23\x07\x7c\x52\xb8\x3a\x7f\x4a\x9e\x36\xe9\x5f\xde\x6d\x7f\xdf\x56\xaf\x45\x7b\xc4\x8b\x3c\x22\x71\xc3\x7a\x1e\x90\x8a\x34\xff\xad\x5d\xa9\x52\x1e\xb0\x5a\xe8\xdd\x9b\x6b\x3f\xa3\x57\x68\xd4\xe2\x9b\xdd\x5b\xa5\x9e\x61\x62\x60\x46\x32\x3f\x9b\x0b\x31\x77\x57\x5e\xbe\xbc\x10\x65\xdb\xec\xff\xe1\xce\xd2\x64\x69\xe0\x5c\x7c\x07\x00\x00\xff\xff\xe2\xd4\xcb\xfe\x7e\x01\x00\x00") +var _migrations62_claimable_balance_claimantsSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x94\xc1\x6f\xd3\x30\x18\xc5\xef\xf9\x2b\x9e\x72\x6a\x43\xab\x5e\x60\x97\x0a\xa1\x6e\x8d\x50\x45\x49\xa7\x2c\x95\xb6\x53\xf4\xc5\xfe\xd6\x59\x4a\xec\xc8\x36\x1b\xfd\xef\x51\xd2\x0e\x02\x6c\x4d\x0a\x88\x53\x24\xcb\xef\x7d\xef\xfd\xec\x78\x3a\xc5\x9b\x4a\xed\x2c\x79\xc6\xb6\x0e\x82\xab\x34\x5e\x64\x31\xb2\xc5\xe5\x3a\x86\x28\x49\x55\x54\x94\x9c\x17\x54\x92\x16\x9c\x1f\x56\xb4\x77\x18\x05\x00\xa0\x24\xb2\xf8\x36\x43\xb2\xc9\x90\x6c\xd7\xeb\x09\xa6\x53\x5c\x3d\xcb\x2e\x0f\xaa\xd5\x12\x4a\xa3\x20\xc7\x17\x6f\x5b\x95\x64\xe7\x95\x26\xaf\x8c\x86\x78\x20\x4b\xc2\xb3\xc5\x23\xd9\xbd\xd2\xbb\xd1\xbb\x8b\xf1\x0f\xbf\x76\x7f\x49\xce\xe7\x95\x91\xea\x5e\xb1\xcc\x4b\x96\x3b\xb6\x50\xda\x73\xf3\xfd\x79\xeb\x75\xba\xfa\xbc\x48\xef\xf0\x29\xbe\xc3\x48\xc9\x49\x77\xd6\x38\x18\xcf\xbf\x37\x5c\x25\xcb\xf8\x16\xe1\x89\x8a\x79\xb1\xcf\x3b\xea\xfc\xa5\x14\xb9\x92\x21\x36\xc9\x49\x52\xdb\x9b\x55\xf2\x11\x85\xb7\xcc\x18\x75\x0c\x27\x2f\xf6\x9a\x40\xc9\x26\xe6\x2c\xc2\xcd\x97\xba\x36\xd6\x3b\x84\x8e\x4b\x16\x1e\x11\xee\xad\xa9\x7e\x1f\xe6\xf0\xf4\xc0\x96\x41\xce\xb1\xc7\x7b\x7c\x80\xb1\x92\x2d\x8a\xfd\xab\x23\x42\x44\xb3\x3e\x14\x2d\x81\xd6\xf3\xbc\xee\xbf\x54\x6e\x1d\x4e\x96\x9d\x45\x48\xb9\x32\x8f\x0c\xa5\x25\x7f\x85\xe5\xba\x24\xc1\xb2\x29\x40\x45\xb3\x1e\xcd\x82\x65\xba\xb9\x1e\x10\x35\xfc\x43\x76\xae\x36\xda\x19\xfb\x8f\xe9\x1d\x5d\xff\x8a\xdf\xd1\xe3\x7f\x10\x3c\x8e\x6a\x18\x76\x1f\x86\xa5\x79\xd2\xc1\x41\xde\xff\x30\x08\x72\x82\x24\xf7\xff\x69\x9d\x33\x1b\x7c\x8d\xc6\xf3\xa1\xf7\xe0\x55\xe4\x03\x83\x3d\xa3\x38\xe3\x84\x06\x85\xeb\xbb\x11\xf3\x20\xf8\x16\x00\x00\xff\xff\x84\x8f\x50\xb2\x94\x05\x00\x00") func migrations62_claimable_balance_claimantsSqlBytes() ([]byte, error) { return bindataRead( @@ -1308,7 +1308,7 @@ func migrations62_claimable_balance_claimantsSql() (*asset, error) { } info := bindataFileInfo{name: "migrations/62_claimable_balance_claimants.sql", size: 0, mode: os.FileMode(0), modTime: time.Unix(0, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb3, 0xe8, 0x65, 0x32, 0x38, 0x3d, 0x6f, 0x3d, 0xa, 0x68, 0x52, 0xd2, 0x37, 0xed, 0x38, 0xd, 0xeb, 0xce, 0xcd, 0x0, 0xbb, 0xb8, 0x26, 0x27, 0x99, 0x95, 0xf, 0xbe, 0xad, 0x3, 0xe6, 0x4f}} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x88, 0x4b, 0xc2, 0xa2, 0x4c, 0x82, 0x68, 0xe8, 0xa8, 0x71, 0x70, 0x49, 0x44, 0x59, 0x8f, 0xd7, 0xa1, 0xc8, 0x31, 0x86, 0xf1, 0x5b, 0x30, 0x67, 0xdd, 0x15, 0x2c, 0x54, 0xfe, 0xe4, 0xe7, 0x3e}} return a, nil } diff --git a/services/horizon/internal/db2/schema/migrations/62_claimable_balance_claimants.sql b/services/horizon/internal/db2/schema/migrations/62_claimable_balance_claimants.sql index e63d5f6085..140a721575 100644 --- a/services/horizon/internal/db2/schema/migrations/62_claimable_balance_claimants.sql +++ b/services/horizon/internal/db2/schema/migrations/62_claimable_balance_claimants.sql @@ -3,11 +3,29 @@ CREATE TABLE claimable_balance_claimants ( id TEXT NOT NULL, -- ClaimableBalanceID in base64 destination character varying(56) NOT NULL, + last_modified_ledger integer NOT NULL, PRIMARY KEY (id, destination) ); -CREATE INDEX "claimable_balance_claimants_by_destination" ON claimable_balance_claimants USING btree (destination); +CREATE INDEX "claimable_balance_claimants_by_destination_last_modified_ledger_id" ON claimable_balance_claimants USING btree (destination, last_modified_ledger, id); + +/* Supports "select * from claimable_balances where asset = ? order by last_modified_ledger, id" */ +CREATE INDEX "claimable_balances_by_asset_last_modified_ledger_id" ON claimable_balances USING btree (asset, last_modified_ledger, id); +/* Remove index replaced by above */ +DROP INDEX "claimable_balances_by_asset"; + +/* Supports "select * from claimable_balances where sponsor = ? order by last_modified_ledger, id" */ +CREATE INDEX "claimable_balances_by_sponsor_last_modified_ledger_id" ON claimable_balances USING btree (sponsor, last_modified_ledger, id); +/* Remove index replaced by above */ +DROP INDEX "claimable_balances_by_sponsor"; -- +migrate Down DROP TABLE claimable_balance_claimants cascade; + +CREATE INDEX "claimable_balances_by_asset" ON claimable_balances USING btree (asset); +DROP INDEX "claimable_balances_by_asset_last_modified_ledger_id"; + +CREATE INDEX "claimable_balances_by_sponsor" ON claimable_balances USING btree (sponsor); +DROP INDEX "claimable_balances_by_sponsor_last_modified_ledger_id"; + diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go index b5edb2277b..a929927323 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go @@ -93,8 +93,9 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { for _, cb := range cbsToUpsert { for _, claimant := range cb.Claimants { claimant := history.ClaimableBalanceClaimant{ - BalanceID: cb.BalanceID, - Destination: claimant.Destination, + BalanceID: cb.BalanceID, + Destination: claimant.Destination, + LastModifiedLedger: cb.LastModifiedLedger, } if err := p.claimantsInsertBuilder.Add(ctx, claimant); err != nil { return errors.Wrap(err, "error adding to claimantsInsertBuilder") diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go index e6ba3a639f..70cd43bba5 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go @@ -82,8 +82,9 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteState) TestCreatesClaimableBal }).Return(nil).Once() s.mockBatchInsertBuilder.On("Add", s.ctx, history.ClaimableBalanceClaimant{ - BalanceID: id, - Destination: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + BalanceID: id, + Destination: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + LastModifiedLedger: uint32(lastModifiedLedgerSeq), }).Return(nil).Once() s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index 7c4817530e..43a80da0bd 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -701,12 +701,15 @@ func addClaimableBalanceToStateVerifier( } for i, claimant := range claimants { - if claimant.MustV0().Destination.Address() != cBalancesClaimants[row.BalanceID][i] { + if claimant.MustV0().Destination.Address() != cBalancesClaimants[row.BalanceID][i].Destination || + row.LastModifiedLedger != cBalancesClaimants[row.BalanceID][i].LastModifiedLedger { return fmt.Errorf( - "claimable_balance_claimants table for balance %s does not match. expected=%s actual=%s", + "claimable_balance_claimants table for balance %s does not match. expectedDestination=%s actualDestination=%s, expectedLastModifiedLedger=%d actualLastModifiedLedger=%d", row.BalanceID, claimant.MustV0().Destination.Address(), - cBalancesClaimants[row.BalanceID][i], + cBalancesClaimants[row.BalanceID][i].Destination, + row.LastModifiedLedger, + cBalancesClaimants[row.BalanceID][i].LastModifiedLedger, ) } } diff --git a/services/horizon/internal/ingest/verify_range_state_test.go b/services/horizon/internal/ingest/verify_range_state_test.go index a07be09275..fb6170a4b2 100644 --- a/services/horizon/internal/ingest/verify_range_state_test.go +++ b/services/horizon/internal/ingest/verify_range_state_test.go @@ -575,8 +575,14 @@ func (s *VerifyRangeStateTestSuite) TestSuccessWithVerify() { clonedQ.MockQClaimableBalances. On("GetClaimantsByClaimableBalances", s.ctx, []string{balanceIDStr}). - Return(map[string][]string{ - claimableBalance.BalanceID: {claimableBalance.Claimants[0].Destination}, + Return(map[string][]history.ClaimableBalanceClaimant{ + claimableBalance.BalanceID: { + { + BalanceID: claimableBalance.BalanceID, + Destination: claimableBalance.Claimants[0].Destination, + LastModifiedLedger: claimableBalance.LastModifiedLedger, + }, + }, }, nil).Once() clonedQ.MockQLiquidityPools.On("CountLiquidityPools", s.ctx).Return(1, nil).Once()