Skip to content

Commit

Permalink
services/horizon/internal/ingest/processors: Fix bug in claimable bal…
Browse files Browse the repository at this point in the history
…ance change processor (#5246)


Co-authored-by: Urvi <[email protected]>
  • Loading branch information
tamirms and urvisavla authored Mar 14, 2024
1 parent 4b0b078 commit 71fef3c
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 5 deletions.
61 changes: 61 additions & 0 deletions services/horizon/internal/db2/history/claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type Claimant struct {

// QClaimableBalances defines claimable-balance-related related queries.
type QClaimableBalances interface {
UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error
RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error)
RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error)
GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error)
Expand Down Expand Up @@ -185,6 +186,66 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) (
return claimantsMap, err
}

// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table.
// It also upserts the corresponding claimants in the claimable_balance_claimants table.
func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error {
if err := q.upsertCBs(ctx, cbs); err != nil {
return errors.Wrap(err, "could not upsert claimable balances")
}

if err := q.upsertCBClaimants(ctx, cbs); err != nil {
return errors.Wrap(err, "could not upsert claimable balance claimants")
}

return nil
}

func (q *Q) upsertCBClaimants(ctx context.Context, cbs []ClaimableBalance) error {
var id, lastModifiedLedger, destination []interface{}

for _, cb := range cbs {
for _, claimant := range cb.Claimants {
id = append(id, cb.BalanceID)
lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger)
destination = append(destination, claimant.Destination)
}
}

upsertFields := []upsertField{
{"id", "text", id},
{"destination", "text", destination},
{"last_modified_ledger", "integer", lastModifiedLedger},
}

return q.upsertRows(ctx, "claimable_balance_claimants", "id, destination", upsertFields)
}

func (q *Q) upsertCBs(ctx context.Context, cbs []ClaimableBalance) error {
var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{}

for _, cb := range cbs {
id = append(id, cb.BalanceID)
claimants = append(claimants, cb.Claimants)
asset = append(asset, cb.Asset)
amount = append(amount, cb.Amount)
sponsor = append(sponsor, cb.Sponsor)
lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger)
flags = append(flags, cb.Flags)
}

upsertFields := []upsertField{
{"id", "text", id},
{"claimants", "jsonb", claimants},
{"asset", "text", asset},
{"amount", "bigint", amount},
{"sponsor", "text", sponsor},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"flags", "int", flags},
}

return q.upsertRows(ctx, "claimable_balances", "id", upsertFields)
}

// RemoveClaimableBalances deletes claimable balances table.
// Returns number of rows affected and error.
func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) {
Expand Down
79 changes: 79 additions & 0 deletions services/horizon/internal/db2/history/claimable_balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,85 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) {
})
}

func TestUpdateClaimableBalance(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
lastModifiedLedgerSeq := xdr.Uint32(123)
asset := xdr.MustNewCreditAsset("USD", accountID)
balanceID := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{1, 2, 3},
}
id, err := xdr.MarshalHex(balanceID)
tt.Assert.NoError(err)
cBalance := ClaimableBalance{
BalanceID: id,
Claimants: []Claimant{
{
Destination: accountID,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
},
Asset: asset,
LastModifiedLedger: 123,
Amount: 10,
}

err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance})
tt.Assert.NoError(err)

cBalancesClaimants, err := q.GetClaimantsByClaimableBalances(tt.Ctx, []string{cBalance.BalanceID})
tt.Assert.NoError(err)
tt.Assert.Len(cBalancesClaimants[cBalance.BalanceID], 1)
tt.Assert.Equal(ClaimableBalanceClaimant{
BalanceID: cBalance.BalanceID,
Destination: accountID,
LastModifiedLedger: cBalance.LastModifiedLedger,
}, cBalancesClaimants[cBalance.BalanceID][0])

// add sponsor
cBalance2 := ClaimableBalance{
BalanceID: id,
Claimants: []Claimant{
{
Destination: accountID,
Predicate: xdr.ClaimPredicate{
Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
},
},
},
Asset: asset,
LastModifiedLedger: 123 + 1,
Amount: 10,
Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
}

err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance2})
tt.Assert.NoError(err)

cbs := []ClaimableBalance{}
err = q.Select(tt.Ctx, &cbs, selectClaimableBalances)
tt.Assert.NoError(err)
tt.Assert.Len(cbs, 1)
tt.Assert.Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", cbs[0].Sponsor.String)
tt.Assert.Equal(uint32(lastModifiedLedgerSeq+1), cbs[0].LastModifiedLedger)

cBalancesClaimants, err = q.GetClaimantsByClaimableBalances(tt.Ctx, []string{cBalance2.BalanceID})
tt.Assert.NoError(err)
tt.Assert.Len(cBalancesClaimants[cBalance2.BalanceID], 1)
tt.Assert.Equal(ClaimableBalanceClaimant{
BalanceID: cBalance2.BalanceID,
Destination: accountID,
LastModifiedLedger: cBalance2.LastModifiedLedger,
}, cBalancesClaimants[cBalance2.BalanceID][0])
}

func TestFindClaimableBalance(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func (m *MockQClaimableBalances) GetClaimableBalancesByID(ctx context.Context, i
return a.Get(0).([]ClaimableBalance), a.Error(1)
}

func (m *MockQClaimableBalances) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error {
a := m.Called(ctx, cbs)
return a.Error(0)
}

func (m *MockQClaimableBalances) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) {
a := m.Called(ctx, ids)
return a.Get(0).(int64), a.Error(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processors

import (
"context"
"fmt"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -60,7 +59,8 @@ func (p *ClaimableBalancesChangeProcessor) ProcessChange(ctx context.Context, ch
func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
defer p.reset()
var (
cbIDsToDelete []string
cbIDsToDelete []string
updatedBalances []history.ClaimableBalance
)
changes := p.cache.GetChanges()
for _, change := range changes {
Expand Down Expand Up @@ -97,8 +97,13 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
}
cbIDsToDelete = append(cbIDsToDelete, id)
default:
// claimable balance can only be created or removed
return fmt.Errorf("invalid change entry for a claimable balance was detected")
// this case should only occur if the sponsor has changed in the claimable balance
// the other fields of a claimable balance are immutable
postCB, err := p.ledgerEntryToRow(change.Post)
if err != nil {
return err
}
updatedBalances = append(updatedBalances, postCB)
}
}

Expand All @@ -112,6 +117,12 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error {
return errors.Wrap(err, "error executing ClaimableBalanceBatchInsertBuilder")
}

if len(updatedBalances) > 0 {
if err = p.qClaimableBalances.UpsertClaimableBalances(ctx, updatedBalances); err != nil {
return errors.Wrap(err, "error updating claimable balances")
}
}

if len(cbIDsToDelete) > 0 {
count, err := p.qClaimableBalances.RemoveClaimableBalances(ctx, cbIDsToDelete)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (

"github.com/guregu/null"

"github.com/stretchr/testify/suite"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/suite"
)

func TestClaimableBalancesChangeProcessorTestSuiteState(t *testing.T) {
Expand Down Expand Up @@ -249,3 +250,71 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestRemoveClaimableBal
[]string{id},
).Return(int64(1), nil).Once()
}

func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBalanceAddSponsor() {
balanceID := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
V0: &xdr.Hash{1, 2, 3},
}
cBalance := xdr.ClaimableBalanceEntry{
BalanceId: balanceID,
Claimants: []xdr.Claimant{},
Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
Amount: 10,
}
lastModifiedLedgerSeq := xdr.Uint32(123)

pre := xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeClaimableBalance,
ClaimableBalance: &cBalance,
},
LastModifiedLedgerSeq: lastModifiedLedgerSeq - 1,
Ext: xdr.LedgerEntryExt{
V: 1,
V1: &xdr.LedgerEntryExtensionV1{
SponsoringId: nil,
},
},
}

// add sponsor
updated := xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeClaimableBalance,
ClaimableBalance: &cBalance,
},
LastModifiedLedgerSeq: lastModifiedLedgerSeq,
Ext: xdr.LedgerEntryExt{
V: 1,
V1: &xdr.LedgerEntryExtensionV1{
SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
}
s.mockClaimableBalanceBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once()

err := s.processor.ProcessChange(s.ctx, ingest.Change{
Type: xdr.LedgerEntryTypeClaimableBalance,
Pre: &pre,
Post: &updated,
})
s.Assert().NoError(err)

id, err := xdr.MarshalHex(balanceID)
s.Assert().NoError(err)
s.mockQ.On(
"UpsertClaimableBalances",
s.ctx,
[]history.ClaimableBalance{
{
BalanceID: id,
Claimants: []history.Claimant{},
Asset: cBalance.Asset,
Amount: cBalance.Amount,
LastModifiedLedger: uint32(lastModifiedLedgerSeq),
Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
).Return(nil).Once()
}

0 comments on commit 71fef3c

Please sign in to comment.