Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest/processors: Fix bug in claimable balance change processor #5246

Merged
merged 7 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type Claimant struct {

// QClaimableBalances defines claimable-balance-related related queries.
type QClaimableBalances interface {
UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error
RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error)
RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error)
GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error)
Expand Down Expand Up @@ -185,6 +186,66 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (
return claimantsMap, err
}

// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table.
// It also upserts the corresponding claimants in the claimable_balance_claimants table.
func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error {
if err := q.upsertCBs(ctx, cbs); err != nil {
return errors.Wrap(err, "could not upsert claimable balances")
}

if err := q.upsertCBClaimants(ctx, cbs); err != nil {
return errors.Wrap(err, "could not upsert claimable balance claimants")
}

return nil
}

func (q *Q) upsertCBClaimants(ctx context.Context, cbs []ClaimableBalance) error {
var id, lastModifiedLedger, destination []interface{}

for _, cb := range cbs {
for _, claimant := range cb.Claimants {
id = append(id, cb.BalanceID)
lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger)
destination = append(destination, claimant.Destination)
}
}

upsertFields := []upsertField{
{"id", "text", id},
{"destination", "text", destination},
{"last_modified_ledger", "integer", lastModifiedLedger},
}

return q.upsertRows(ctx, "claimable_balance_claimants", "id, destination", upsertFields)
}

func (q *Q) upsertCBs(ctx context.Context, cbs []ClaimableBalance) error {
var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{}

for _, cb := range cbs {
id = append(id, cb.BalanceID)
claimants = append(claimants, cb.Claimants)
asset = append(asset, cb.Asset)
amount = append(amount, cb.Amount)
sponsor = append(sponsor, cb.Sponsor)
lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger)
flags = append(flags, cb.Flags)
}

upsertFields := []upsertField{
{"id", "text", id},
{"claimants", "jsonb", claimants},
{"asset", "text", asset},
{"amount", "bigint", amount},
{"sponsor", "text", sponsor},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"flags", "int", flags},
}

return q.upsertRows(ctx, "claimable_balances", "id", upsertFields)
}

// RemoveClaimableBalances deletes claimable balances table.
// Returns number of rows affected and error.
func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) {
Expand Down
79 changes: 79 additions & 0 deletions services/horizon/internal/db2/history/claimable_balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,85 @@
})
}

func TestUpdateClaimableBalance(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
lastModifiedLedgerSeq := xdr.Uint32(123)
asset := xdr.MustNewCreditAsset("USD", accountID)
balanceID := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{1, 2, 3},
}
id, err := xdr.MarshalHex(balanceID)
tt.Assert.NoError(err)
cBalance := ClaimableBalance{
BalanceID: id,
Claimants: []Claimant{
{
Destination: accountID,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
},
Asset: asset,
LastModifiedLedger: 123,
Amount: 10,
}

err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance})
tt.Assert.NoError(err)

cBalancesClaimants, err := q.GetClaimantsByClaimableBalances(tt.Ctx, []string{cBalance.BalanceID})
tt.Assert.NoError(err)
tt.Assert.Len(cBalancesClaimants[cBalance.BalanceID], 1)
tt.Assert.Equal(ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: accountID,
LastModifiedLedger: cBalance.LastModifiedLedger,
}, cBalancesClaimants[cBalance.BalanceID][0])

// add sponsor
cBalance2 := ClaimableBalance{
BalanceID: id,
Claimants: []Claimant{
{
Destination: accountID,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
},
Asset: asset,
LastModifiedLedger: 123 + 1,
Amount: 10,
Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
}

err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance2})
tt.Assert.NoError(err)

cbs := []ClaimableBalance{}
err = q.Select(tt.Ctx, &cbs, selectClaimableBalances)

Check failure on line 654 in services/horizon/internal/db2/history/claimable_balances_test.go

View workflow job for this annotation

GitHub Actions / golangci

q.Select undefined (type *Q has no field or method Select) (typecheck)
tt.Assert.NoError(err)
tt.Assert.Len(cbs, 1)
tt.Assert.Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", cbs[0].Sponsor.String)
tt.Assert.Equal(uint32(lastModifiedLedgerSeq+1), cbs[0].LastModifiedLedger)

cBalancesClaimants, err = q.GetClaimantsByClaimableBalances(tt.Ctx, []string{cBalance2.BalanceID})
tt.Assert.NoError(err)
tt.Assert.Len(cBalancesClaimants[cBalance2.BalanceID], 1)
tt.Assert.Equal(ClaimableBalanceClaimant{
BalanceID: cBalance2.BalanceID,
Destination: accountID,
LastModifiedLedger: cBalance2.LastModifiedLedger,
}, cBalancesClaimants[cBalance2.BalanceID][0])
}

func TestFindClaimableBalance(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
return a.Get(0).([]ClaimableBalance), a.Error(1)
}

func (m *MockQClaimableBalances) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error {
a := m.Called(ctx, cbs)

Check failure on line 25 in services/horizon/internal/db2/history/mock_q_claimable_balances.go

View workflow job for this annotation

GitHub Actions / golangci

m.Called undefined (type *MockQClaimableBalances has no field or method Called) (typecheck)
return a.Error(0)
}

func (m *MockQClaimableBalances) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) {
a := m.Called(ctx, ids)
return a.Get(0).(int64), a.Error(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processors

import (
"context"
"fmt"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -60,7 +59,8 @@ func (p *ClaimableBalancesChangeProcessor) ProcessChange(ctx context.Context, ch
func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
defer p.reset()
var (
cbIDsToDelete []string
cbIDsToDelete []string
updatedBalances []history.ClaimableBalance
)
changes := p.cache.GetChanges()
for _, change := range changes {
Expand Down Expand Up @@ -97,8 +97,13 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
}
cbIDsToDelete = append(cbIDsToDelete, id)
default:
// claimable balance can only be created or removed
return fmt.Errorf("invalid change entry for a claimable balance was detected")
// this case should only occur if the sponsor has changed in the claimable balance
// the other fields of a claimable balance are immutable
postCB, err := p.ledgerEntryToRow(change.Post)
if err != nil {
return err
}
updatedBalances = append(updatedBalances, postCB)
}
}

Expand All @@ -112,6 +117,12 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
return errors.Wrap(err, "error executing ClaimableBalanceBatchInsertBuilder")
}

if len(updatedBalances) > 0 {
if err = p.qClaimableBalances.UpsertClaimableBalances(ctx, updatedBalances); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to restore the BatchInsertBuilder to handle upserts for claimants? Like what happens if new claimants are added? Will we skip adding those to the claimants table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpsertClaimableBalances() will upsert to both the claimable_balances and the claimable_balance_claimants tables

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it seemed to me that UpsertClaimableBalances exclusively inserts to claimable_balances table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the function in a later commit:

672d637

return errors.Wrap(err, "error updating claimable balances")
}
}

if len(cbIDsToDelete) > 0 {
count, err := p.qClaimableBalances.RemoveClaimableBalances(ctx, cbIDsToDelete)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

"github.com/guregu/null"

"github.com/stretchr/testify/suite"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/suite"
)

func TestClaimableBalancesChangeProcessorTestSuiteState(t *testing.T) {
Expand Down Expand Up @@ -249,3 +250,71 @@
[]string{id},
).Return(int64(1), nil).Once()
}

func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBalanceAddSponsor() {
balanceID := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{1, 2, 3},
}
cBalance := xdr.ClaimableBalanceEntry{
BalanceId: balanceID,
Claimants: []xdr.Claimant{},
Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
Amount: 10,
}
lastModifiedLedgerSeq := xdr.Uint32(123)

pre := xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeClaimableBalance,
ClaimableBalance: &cBalance,
},
LastModifiedLedgerSeq: lastModifiedLedgerSeq - 1,
Ext: xdr.LedgerEntryExt{
V: 1,
V1: &xdr.LedgerEntryExtensionV1{
SponsoringId: nil,
},
},
}

// add sponsor
updated := xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeClaimableBalance,
ClaimableBalance: &cBalance,
},
LastModifiedLedgerSeq: lastModifiedLedgerSeq,
Ext: xdr.LedgerEntryExt{
V: 1,
V1: &xdr.LedgerEntryExtensionV1{
SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
}
s.mockClaimableBalanceBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

err := s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeClaimableBalance,
Pre: &pre,
Post: &updated,
})
s.Assert().NoError(err)

Check failure on line 302 in services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.Assert undefined (type *ClaimableBalancesChangeProcessorTestSuiteLedger has no field or method Assert) (typecheck)

id, err := xdr.MarshalHex(balanceID)
s.Assert().NoError(err)

Check failure on line 305 in services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go

View workflow job for this annotation

GitHub Actions / golangci

s.Assert undefined (type *ClaimableBalancesChangeProcessorTestSuiteLedger has no field or method Assert) (typecheck)
s.mockQ.On(
"UpsertClaimableBalances",
s.ctx,
[]history.ClaimableBalance{
{
BalanceID: id,
Claimants: []history.Claimant{},
Asset: cBalance.Asset,
Amount: cBalance.Amount,
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
).Return(nil).Once()
}
Loading