Skip to content

Commit

Permalink
services/horizon: Improve performance of claimable balances queries (#…
Browse files Browse the repository at this point in the history
…4690)

Add a new table `claimable_balance_claimants` which holds all claimants'
destinations for corresponding claimable balances IDs. Also, improve other
filters (by `sponsor` and `asset`) but adding better indexes for such queries.

We noticed that "claimable balances for claimants" query
(`/claimable_balances?claimant=...`) is very slow in Postgres 12. The reason,
apart from possible changes to gin index on `claimants` field, is that the
`claimable_balances` table size significantly increased in the last couple
months.
  • Loading branch information
bartekn authored Nov 18, 2022
1 parent 0666818 commit 5c83343
Show file tree
Hide file tree
Showing 15 changed files with 463 additions and 46 deletions.
43 changes: 43 additions & 0 deletions services/horizon/internal/actions/claimable_balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ func TestGetClaimableBalanceByID(t *testing.T) {
err = q.UpsertClaimableBalances(tt.Ctx, []history.ClaimableBalance{cBalance})
tt.Assert.NoError(err)

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
}

handler := GetClaimableBalanceByIDHandler{}
response, err := handler.GetResource(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -179,6 +190,23 @@ func TestGetClaimableBalances(t *testing.T) {
err := q.UpsertClaimableBalances(tt.Ctx, hCBs)
tt.Assert.NoError(err)

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)

for _, cBalance := range hCBs {
for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
}
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)

handler := GetClaimableBalancesHandler{}
response, err := handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
Expand Down Expand Up @@ -291,6 +319,21 @@ func TestGetClaimableBalances(t *testing.T) {
err = q.UpsertClaimableBalances(tt.Ctx, hCBs)
tt.Assert.NoError(err)

for _, cBalance := range hCBs {
for _, claimant := range cBalance.Claimants {
claimant := history.ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
}
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)

response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest(
t,
map[string]string{},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package history

import (
"context"

"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)

// ClaimableBalanceClaimantBatchInsertBuilder is used to insert transactions into the
// history_transactions table
type ClaimableBalanceClaimantBatchInsertBuilder interface {
Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error
Exec(ctx context.Context) error
}

// ClaimableBalanceClaimantBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type claimableBalanceClaimantBatchInsertBuilder struct {
encodingBuffer *xdr.EncodingBuffer
builder db.BatchInsertBuilder
}

// NewClaimableBalanceClaimantBatchInsertBuilder constructs a new ClaimableBalanceClaimantBatchInsertBuilder instance
func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder {
return &claimableBalanceClaimantBatchInsertBuilder{
encodingBuffer: xdr.NewEncodingBuffer(),
builder: db.BatchInsertBuilder{
Table: q.GetTable("claimable_balance_claimants"),
MaxBatchSize: maxBatchSize,
Suffix: "ON CONFLICT (id, destination) DO UPDATE SET last_modified_ledger=EXCLUDED.last_modified_ledger",
},
}
}

// Add adds a new transaction to the batch
func (i *claimableBalanceClaimantBatchInsertBuilder) Add(ctx context.Context, claimableBalanceClaimant ClaimableBalanceClaimant) error {
return i.builder.RowStruct(ctx, claimableBalanceClaimant)
}

func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
}
103 changes: 73 additions & 30 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"strconv"
"strings"

Expand Down Expand Up @@ -56,39 +57,42 @@ func (cbq ClaimableBalancesQuery) Cursor() (int64, string, error) {
// ApplyCursor applies cursor to the given sql. For performance reason the limit
// is not applied here. This allows us to hint the planner later to use the right
// indexes.
func (cbq ClaimableBalancesQuery) ApplyCursor(sql sq.SelectBuilder) (sq.SelectBuilder, error) {
p := cbq.PageQuery
func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, lCursor int64, rCursor string, order string) (sq.SelectBuilder, error) {
hasPagedLimit := false
l, r, err := cbq.Cursor()
if err != nil {
return sql, err
}
if l > 0 && r != "" {
if lCursor > 0 && rCursor != "" {
hasPagedLimit = true
}

switch p.Order {
switch order {
case db2.OrderAscending:
if hasPagedLimit {
sql = sql.
Where(sq.Expr("(cb.last_modified_ledger, cb.id) > (?, ?)", l, r))
Where(sq.Expr("(last_modified_ledger, id) > (?, ?)", lCursor, rCursor))

}
sql = sql.OrderBy("cb.last_modified_ledger asc, cb.id asc")
sql = sql.OrderBy("last_modified_ledger asc, id asc")
case db2.OrderDescending:
if hasPagedLimit {
sql = sql.
Where(sq.Expr("(cb.last_modified_ledger, cb.id) < (?, ?)", l, r))
Where(sq.Expr("(last_modified_ledger, id) < (?, ?)", lCursor, rCursor))
}

sql = sql.OrderBy("cb.last_modified_ledger desc, cb.id desc")
sql = sql.OrderBy("last_modified_ledger desc, id desc")
default:
return sql, errors.Errorf("invalid order: %s", p.Order)
return sql, errors.Errorf("invalid order: %s", order)
}

return sql, nil
}

// ClaimableBalanceClaimant is a row of data from the `claimable_balances_claimants` table.
// This table exists to allow faster querying for claimable balances for a specific claimant.
type ClaimableBalanceClaimant struct {
BalanceID string `db:"id"`
Destination string `db:"destination"`
LastModifiedLedger uint32 `db:"last_modified_ledger"`
}

// ClaimableBalance is a row of data from the `claimable_balances` table.
type ClaimableBalance struct {
BalanceID string `db:"id"`
Expand Down Expand Up @@ -124,8 +128,11 @@ type Claimant struct {
type QClaimableBalances interface {
UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error
RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error)
RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error)
GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error)
CountClaimableBalances(ctx context.Context) (int, error)
NewClaimableBalanceClaimantBatchInsertBuilder(maxBatchSize int) ClaimableBalanceClaimantBatchInsertBuilder
GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error)
}

// CountClaimableBalances returns the total number of claimable balances in the DB
Expand All @@ -148,6 +155,22 @@ func (q *Q) GetClaimableBalancesByID(ctx context.Context, ids []string) ([]Claim
return cBalances, err
}

// GetClaimantsByClaimableBalances finds all claimants for ClaimableBalanceIds.
// The returned list is sorted by ids and then destination ids for each balance id.
func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (map[string][]ClaimableBalanceClaimant, error) {
var claimants []ClaimableBalanceClaimant
sql := sq.Select("*").From("claimable_balance_claimants cbc").
Where(map[string]interface{}{"cbc.id": ids}).
OrderBy("id asc, destination asc")
err := q.Select(ctx, &claimants, sql)

claimantsMap := make(map[string][]ClaimableBalanceClaimant)
for _, claimant := range claimants {
claimantsMap[claimant.BalanceID] = append(claimantsMap[claimant.BalanceID], claimant)
}
return claimantsMap, err
}

// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table.
// There's currently no limit of the number of offers this method can
// accept other than 2GB limit of the query string length what should be enough
Expand Down Expand Up @@ -191,6 +214,19 @@ func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, e
return result.RowsAffected()
}

// RemoveClaimableBalanceClaimants deletes claimable balance claimants.
// Returns number of rows affected and error.
func (q *Q) RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error) {
sql := sq.Delete("claimable_balance_claimants").
Where(sq.Eq{"id": ids})
result, err := q.Exec(ctx, sql)
if err != nil {
return 0, err
}

return result.RowsAffected()
}

// FindClaimableBalanceByID returns a claimable balance.
func (q *Q) FindClaimableBalanceByID(ctx context.Context, balanceID string) (ClaimableBalance, error) {
var claimableBalance ClaimableBalance
Expand All @@ -201,12 +237,12 @@ func (q *Q) FindClaimableBalanceByID(ctx context.Context, balanceID string) (Cla

// GetClaimableBalances finds all claimable balances where accountID is one of the claimants
func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQuery) ([]ClaimableBalance, error) {
sql, err := query.ApplyCursor(selectClaimableBalances)
// we need to use WITH syntax and correct LIMIT placement to force the query planner to use the right
// indexes, otherwise when the limit is small, it will use an index scan
// which will be very slow once we have millions of records
limitClausePlacement := "LIMIT ?) select " + claimableBalancesSelectStatement + " from cb"
l, r, err := query.Cursor()
if err != nil {
return nil, errors.Wrap(err, "error getting cursor")
}

sql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalances, l, r, query.PageQuery.Order)
if err != nil {
return nil, errors.Wrap(err, "could not apply query to page")
}
Expand All @@ -221,19 +257,26 @@ func (q *Q) GetClaimableBalances(ctx context.Context, query ClaimableBalancesQue
}

if query.Claimant != nil {
var selectClaimableBalanceClaimants = sq.Select("id").From("claimable_balance_claimants").
Where("destination = ?", query.Claimant.Address()).
// Given that each destination can be a claimant for each balance maximum once
// we can LIMIT the subquery.
Limit(query.PageQuery.Limit)
subSql, err := applyClaimableBalancesQueriesCursor(selectClaimableBalanceClaimants, l, r, query.PageQuery.Order)
if err != nil {
return nil, errors.Wrap(err, "could not apply subquery to page")
}

subSqlString, subSqlArgs, err := subSql.ToSql()
if err != nil {
return nil, errors.Wrap(err, "could not build subquery")
}

sql = sql.
Where(`cb.claimants @> '[{"destination": "` + query.Claimant.Address() + `"}]'`)
// when search by claimant, profiling has shown the LIMIT should be on the outer query to
// hint appropriate indexes for best performance
limitClausePlacement = ") select " + claimableBalancesSelectStatement + " from cb LIMIT ?"
}

sql = sql.
Prefix("WITH cb AS (").
Suffix(
limitClausePlacement,
query.PageQuery.Limit,
)
Where(fmt.Sprintf("cb.id IN (%s)", subSqlString), subSqlArgs...)
}

sql = sql.Limit(query.PageQuery.Limit)

var results []ClaimableBalance
if err := q.Select(ctx, &results, sql); err != nil {
Expand Down
73 changes: 73 additions & 0 deletions services/horizon/internal/db2/history/claimable_balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,55 @@ func TestRemoveClaimableBalance(t *testing.T) {
}
}

func TestRemoveClaimableBalanceClaimants(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
asset := xdr.MustNewCreditAsset("USD", accountID)
balanceID := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{1, 2, 3},
}
id, err := xdr.MarshalHex(balanceID)
tt.Assert.NoError(err)
cBalance := ClaimableBalance{
BalanceID: id,
Claimants: []Claimant{
{
Destination: accountID,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
},
Asset: asset,
LastModifiedLedger: 123,
Amount: 10,
}

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)

for _, claimant := range cBalance.Claimants {
claimant := ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)

removed, err := q.RemoveClaimableBalanceClaimants(tt.Ctx, []string{id})
tt.Assert.NoError(err)
tt.Assert.Equal(int64(1), removed)
}

func TestFindClaimableBalancesByDestination(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down Expand Up @@ -92,6 +141,17 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {
err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance})
tt.Assert.NoError(err)

claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder(10)
for _, claimant := range cBalance.Claimants {
claimant := ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
}

balanceID = xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{3, 2, 1},
Expand Down Expand Up @@ -122,6 +182,19 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {
err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance})
tt.Assert.NoError(err)

for _, claimant := range cBalance.Claimants {
claimant := ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: claimant.Destination,
LastModifiedLedger: cBalance.LastModifiedLedger,
}
err = claimantsInsertBuilder.Add(tt.Ctx, claimant)
tt.Assert.NoError(err)
}

err = claimantsInsertBuilder.Exec(tt.Ctx)
tt.Assert.NoError(err)

query := ClaimableBalancesQuery{
PageQuery: db2.MustPageQuery("", false, "", 10),
Claimant: xdr.MustAddressPtr(dest1),
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func (q *Q) TruncateIngestStateTables(ctx context.Context) error {
"accounts_data",
"accounts_signers",
"claimable_balances",
"claimable_balance_claimants",
"exp_asset_stats",
"liquidity_pools",
"offers",
Expand Down
Loading

0 comments on commit 5c83343

Please sign in to comment.