diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go index c198ee162d..abdf4ed758 100644 --- a/services/horizon/internal/db2/history/claimable_balances.go +++ b/services/horizon/internal/db2/history/claimable_balances.go @@ -140,6 +140,7 @@ type Claimant struct { // QClaimableBalances defines claimable-balance-related related queries. type QClaimableBalances interface { + UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error) GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error) @@ -185,6 +186,66 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) ( return claimantsMap, err } +// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table. +// It also upserts the corresponding claimants in the claimable_balance_claimants table. +func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error { + if err := q.upsertCBs(ctx, cbs); err != nil { + return errors.Wrap(err, "could not upsert claimable balances") + } + + if err := q.upsertCBClaimants(ctx, cbs); err != nil { + return errors.Wrap(err, "could not upsert claimable balance claimants") + } + + return nil +} + +func (q *Q) upsertCBClaimants(ctx context.Context, cbs []ClaimableBalance) error { + var id, lastModifiedLedger, destination []interface{} + + for _, cb := range cbs { + for _, claimant := range cb.Claimants { + id = append(id, cb.BalanceID) + lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger) + destination = append(destination, claimant.Destination) + } + } + + upsertFields := []upsertField{ + {"id", "text", id}, + {"destination", "text", destination}, + {"last_modified_ledger", "integer", lastModifiedLedger}, + } + + return q.upsertRows(ctx, "claimable_balance_claimants", "id, destination", upsertFields) +} + +func (q *Q) upsertCBs(ctx context.Context, cbs []ClaimableBalance) error { + var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{} + + for _, cb := range cbs { + id = append(id, cb.BalanceID) + claimants = append(claimants, cb.Claimants) + asset = append(asset, cb.Asset) + amount = append(amount, cb.Amount) + sponsor = append(sponsor, cb.Sponsor) + lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger) + flags = append(flags, cb.Flags) + } + + upsertFields := []upsertField{ + {"id", "text", id}, + {"claimants", "jsonb", claimants}, + {"asset", "text", asset}, + {"amount", "bigint", amount}, + {"sponsor", "text", sponsor}, + {"last_modified_ledger", "integer", lastModifiedLedger}, + {"flags", "int", flags}, + } + + return q.upsertRows(ctx, "claimable_balances", "id", upsertFields) +} + // RemoveClaimableBalances deletes claimable balances table. // Returns number of rows affected and error. func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) { diff --git a/services/horizon/internal/db2/history/claimable_balances_test.go b/services/horizon/internal/db2/history/claimable_balances_test.go index 2e6d621945..1ffe442244 100644 --- a/services/horizon/internal/db2/history/claimable_balances_test.go +++ b/services/horizon/internal/db2/history/claimable_balances_test.go @@ -588,6 +588,85 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) { }) } +func TestUpdateClaimableBalance(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" + lastModifiedLedgerSeq := xdr.Uint32(123) + asset := xdr.MustNewCreditAsset("USD", accountID) + balanceID := xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{1, 2, 3}, + } + id, err := xdr.MarshalHex(balanceID) + tt.Assert.NoError(err) + cBalance := ClaimableBalance{ + BalanceID: id, + Claimants: []Claimant{ + { + Destination: accountID, + Predicate: xdr.ClaimPredicate{ + Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, + }, + }, + }, + Asset: asset, + LastModifiedLedger: 123, + Amount: 10, + } + + err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) + tt.Assert.NoError(err) + + cBalancesClaimants, err := q.GetClaimantsByClaimableBalances(tt.Ctx, []string{cBalance.BalanceID}) + tt.Assert.NoError(err) + tt.Assert.Len(cBalancesClaimants[cBalance.BalanceID], 1) + tt.Assert.Equal(ClaimableBalanceClaimant{ + BalanceID: cBalance.BalanceID, + Destination: accountID, + LastModifiedLedger: cBalance.LastModifiedLedger, + }, cBalancesClaimants[cBalance.BalanceID][0]) + + // add sponsor + cBalance2 := ClaimableBalance{ + BalanceID: id, + Claimants: []Claimant{ + { + Destination: accountID, + Predicate: xdr.ClaimPredicate{ + Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, + }, + }, + }, + Asset: asset, + LastModifiedLedger: 123 + 1, + Amount: 10, + Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + } + + err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance2}) + tt.Assert.NoError(err) + + cbs := []ClaimableBalance{} + err = q.Select(tt.Ctx, &cbs, selectClaimableBalances) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", cbs[0].Sponsor.String) + tt.Assert.Equal(uint32(lastModifiedLedgerSeq+1), cbs[0].LastModifiedLedger) + + cBalancesClaimants, err = q.GetClaimantsByClaimableBalances(tt.Ctx, []string{cBalance2.BalanceID}) + tt.Assert.NoError(err) + tt.Assert.Len(cBalancesClaimants[cBalance2.BalanceID], 1) + tt.Assert.Equal(ClaimableBalanceClaimant{ + BalanceID: cBalance2.BalanceID, + Destination: accountID, + LastModifiedLedger: cBalance2.LastModifiedLedger, + }, cBalancesClaimants[cBalance2.BalanceID][0]) +} + func TestFindClaimableBalance(t *testing.T) { tt := test.Start(t) defer tt.Finish() diff --git a/services/horizon/internal/db2/history/mock_q_claimable_balances.go b/services/horizon/internal/db2/history/mock_q_claimable_balances.go index 64b65cf1a3..6a3adffac1 100644 --- a/services/horizon/internal/db2/history/mock_q_claimable_balances.go +++ b/services/horizon/internal/db2/history/mock_q_claimable_balances.go @@ -21,6 +21,11 @@ func (m *MockQClaimableBalances) GetClaimableBalancesByID(ctx context.Context, i return a.Get(0).([]ClaimableBalance), a.Error(1) } +func (m *MockQClaimableBalances) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error { + a := m.Called(ctx, cbs) + return a.Error(0) +} + func (m *MockQClaimableBalances) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) { a := m.Called(ctx, ids) return a.Get(0).(int64), a.Error(1) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go index de729f9605..fce002881c 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go @@ -2,7 +2,6 @@ package processors import ( "context" - "fmt" "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -60,7 +59,8 @@ func (p *ClaimableBalancesChangeProcessor) ProcessChange(ctx context.Context, ch func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { defer p.reset() var ( - cbIDsToDelete []string + cbIDsToDelete []string + updatedBalances []history.ClaimableBalance ) changes := p.cache.GetChanges() for _, change := range changes { @@ -97,8 +97,13 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { } cbIDsToDelete = append(cbIDsToDelete, id) default: - // claimable balance can only be created or removed - return fmt.Errorf("invalid change entry for a claimable balance was detected") + // this case should only occur if the sponsor has changed in the claimable balance + // the other fields of a claimable balance are immutable + postCB, err := p.ledgerEntryToRow(change.Post) + if err != nil { + return err + } + updatedBalances = append(updatedBalances, postCB) } } @@ -112,6 +117,12 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { return errors.Wrap(err, "error executing ClaimableBalanceBatchInsertBuilder") } + if len(updatedBalances) > 0 { + if err = p.qClaimableBalances.UpsertClaimableBalances(ctx, updatedBalances); err != nil { + return errors.Wrap(err, "error updating claimable balances") + } + } + if len(cbIDsToDelete) > 0 { count, err := p.qClaimableBalances.RemoveClaimableBalances(ctx, cbIDsToDelete) if err != nil { diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go index 524de095f7..a10cc9db7d 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go @@ -8,10 +8,11 @@ import ( "github.com/guregu/null" + "github.com/stretchr/testify/suite" + "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/suite" ) func TestClaimableBalancesChangeProcessorTestSuiteState(t *testing.T) { @@ -249,3 +250,71 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestRemoveClaimableBal []string{id}, ).Return(int64(1), nil).Once() } + +func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBalanceAddSponsor() { + balanceID := xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{1, 2, 3}, + } + cBalance := xdr.ClaimableBalanceEntry{ + BalanceId: balanceID, + Claimants: []xdr.Claimant{}, + Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 10, + } + lastModifiedLedgerSeq := xdr.Uint32(123) + + pre := xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &cBalance, + }, + LastModifiedLedgerSeq: lastModifiedLedgerSeq - 1, + Ext: xdr.LedgerEntryExt{ + V: 1, + V1: &xdr.LedgerEntryExtensionV1{ + SponsoringId: nil, + }, + }, + } + + // add sponsor + updated := xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &cBalance, + }, + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Ext: xdr.LedgerEntryExt{ + V: 1, + V1: &xdr.LedgerEntryExtensionV1{ + SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + } + s.mockClaimableBalanceBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + + err := s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: &pre, + Post: &updated, + }) + s.Assert().NoError(err) + + id, err := xdr.MarshalHex(balanceID) + s.Assert().NoError(err) + s.mockQ.On( + "UpsertClaimableBalances", + s.ctx, + []history.ClaimableBalance{ + { + BalanceID: id, + Claimants: []history.Claimant{}, + Asset: cBalance.Asset, + Amount: cBalance.Amount, + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + ).Return(nil).Once() +}