From 0ed9eea506408bf938901521dd08f6a11a13e321 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 14 Mar 2024 07:37:08 +0000 Subject: [PATCH 1/6] Fix update case in cb change processor --- .../db2/history/claimable_balances.go | 31 +++++++ .../db2/history/claimable_balances_test.go | 61 +++++++++++++ .../db2/history/mock_q_claimable_balances.go | 5 ++ .../claimable_balances_change_processor.go | 89 ++++++++++++++++++- 4 files changed, 183 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go index c198ee162d..74994493ef 100644 --- a/services/horizon/internal/db2/history/claimable_balances.go +++ b/services/horizon/internal/db2/history/claimable_balances.go @@ -140,6 +140,7 @@ type Claimant struct { // QClaimableBalances defines claimable-balance-related related queries. type QClaimableBalances interface { + UpsertClaimableBalances(ctx context.Context, cb []ClaimableBalance) error RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) RemoveClaimableBalanceClaimants(ctx context.Context, ids []string) (int64, error) GetClaimableBalancesByID(ctx context.Context, ids []string) ([]ClaimableBalance, error) @@ -185,6 +186,36 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) ( return claimantsMap, err } +// UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table. +// There's currently no limit of the number of offers this method can +// accept other than 2GB limit of the query string length what should be enough +// for each ledger with the current limits. +func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error { + var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{} + + for _, cb := range cbs { + id = append(id, cb.BalanceID) + claimants = append(claimants, cb.Claimants) + asset = append(asset, cb.Asset) + amount = append(amount, cb.Amount) + sponsor = append(sponsor, cb.Sponsor) + lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger) + flags = append(flags, cb.Flags) + } + + upsertFields := []upsertField{ + {"id", "text", id}, + {"claimants", "jsonb", claimants}, + {"asset", "text", asset}, + {"amount", "bigint", amount}, + {"sponsor", "text", sponsor}, + {"last_modified_ledger", "integer", lastModifiedLedger}, + {"flags", "int", flags}, + } + + return q.upsertRows(ctx, "claimable_balances", "id", upsertFields) +} + // RemoveClaimableBalances deletes claimable balances table. // Returns number of rows affected and error. func (q *Q) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) { diff --git a/services/horizon/internal/db2/history/claimable_balances_test.go b/services/horizon/internal/db2/history/claimable_balances_test.go index 2e6d621945..3b3aa376b1 100644 --- a/services/horizon/internal/db2/history/claimable_balances_test.go +++ b/services/horizon/internal/db2/history/claimable_balances_test.go @@ -588,6 +588,67 @@ func TestFindClaimableBalancesByDestinationWithLimit(t *testing.T) { }) } +func TestUpdateClaimableBalance(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + accountID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" + lastModifiedLedgerSeq := xdr.Uint32(123) + asset := xdr.MustNewCreditAsset("USD", accountID) + balanceID := xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{1, 2, 3}, + } + id, err := xdr.MarshalHex(balanceID) + tt.Assert.NoError(err) + cBalance := ClaimableBalance{ + BalanceID: id, + Claimants: []Claimant{ + { + Destination: accountID, + Predicate: xdr.ClaimPredicate{ + Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, + }, + }, + }, + Asset: asset, + LastModifiedLedger: 123, + Amount: 10, + } + + err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) + tt.Assert.NoError(err) + + // add sponsor + cBalance2 := ClaimableBalance{ + BalanceID: id, + Claimants: []Claimant{ + { + Destination: accountID, + Predicate: xdr.ClaimPredicate{ + Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, + }, + }, + }, + Asset: asset, + LastModifiedLedger: 123 + 1, + Amount: 10, + Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + } + + err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance2}) + tt.Assert.NoError(err) + + cbs := []ClaimableBalance{} + err = q.Select(tt.Ctx, &cbs, selectClaimableBalances) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", cbs[0].Sponsor.String) + tt.Assert.Equal(uint32(lastModifiedLedgerSeq+1), cbs[0].LastModifiedLedger) +} + func TestFindClaimableBalance(t *testing.T) { tt := test.Start(t) defer tt.Finish() diff --git a/services/horizon/internal/db2/history/mock_q_claimable_balances.go b/services/horizon/internal/db2/history/mock_q_claimable_balances.go index 64b65cf1a3..6a3adffac1 100644 --- a/services/horizon/internal/db2/history/mock_q_claimable_balances.go +++ b/services/horizon/internal/db2/history/mock_q_claimable_balances.go @@ -21,6 +21,11 @@ func (m *MockQClaimableBalances) GetClaimableBalancesByID(ctx context.Context, i return a.Get(0).([]ClaimableBalance), a.Error(1) } +func (m *MockQClaimableBalances) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error { + a := m.Called(ctx, cbs) + return a.Error(0) +} + func (m *MockQClaimableBalances) RemoveClaimableBalances(ctx context.Context, ids []string) (int64, error) { a := m.Called(ctx, ids) return a.Get(0).(int64), a.Error(1) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go index de729f9605..58bdb8d15a 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go @@ -1,8 +1,10 @@ package processors import ( + "bytes" "context" "fmt" + "sort" "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -60,7 +62,8 @@ func (p *ClaimableBalancesChangeProcessor) ProcessChange(ctx context.Context, ch func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { defer p.reset() var ( - cbIDsToDelete []string + cbIDsToDelete []string + updatedBalances []history.ClaimableBalance ) changes := p.cache.GetChanges() for _, change := range changes { @@ -97,8 +100,24 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { } cbIDsToDelete = append(cbIDsToDelete, id) default: - // claimable balance can only be created or removed - return fmt.Errorf("invalid change entry for a claimable balance was detected") + // this case should only occur if the sponsor has changed in the claimable balance + // the other fields of a claimable balance are immutable + postCB, err := p.ledgerEntryToRow(change.Post) + if err != nil { + return err + } + preCB, err := p.ledgerEntryToRow(change.Pre) + if err != nil { + return err + } + equal, err := claimantsAreEqual(preCB.Claimants, postCB.Claimants) + if err != nil { + return errors.Wrap(err, "error comparing claimants") + } + if !equal { + return fmt.Errorf("invalid change entry for a claimable balance was detected: claimants have changed") + } + updatedBalances = append(updatedBalances, postCB) } } @@ -112,6 +131,12 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { return errors.Wrap(err, "error executing ClaimableBalanceBatchInsertBuilder") } + if len(updatedBalances) > 0 { + if err = p.qClaimableBalances.UpsertClaimableBalances(ctx, updatedBalances); err != nil { + return errors.Wrap(err, "error updating claimable balances") + } + } + if len(cbIDsToDelete) > 0 { count, err := p.qClaimableBalances.RemoveClaimableBalances(ctx, cbIDsToDelete) if err != nil { @@ -135,6 +160,64 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { return nil } +type comparableClaimant struct { + destination string + predicate []byte +} + +func (c comparableClaimant) equal(o comparableClaimant) bool { + return c.destination == o.destination && bytes.Equal(c.predicate, o.predicate) +} + +func sortClaimants(claimants []comparableClaimant) { + sort.Slice(claimants, func(i, j int) bool { + if claimants[i].destination != claimants[j].destination { + return claimants[i].destination < claimants[j].destination + } + return bytes.Compare(claimants[i].predicate, claimants[j].predicate) < 0 + }) +} + +func makeComparableClaimants(claimants history.Claimants) ([]comparableClaimant, error) { + result := make([]comparableClaimant, len(claimants)) + for i, claimant := range claimants { + predicate, err := claimant.Predicate.MarshalBinary() + if err != nil { + return nil, err + } + result[i] = comparableClaimant{ + destination: claimant.Destination, + predicate: predicate, + } + } + return result, nil +} + +func claimantsAreEqual(a, b history.Claimants) (bool, error) { + if len(a) != len(b) { + return false, nil + } + + compA, err := makeComparableClaimants(a) + if err != nil { + return false, nil + } + compB, err := makeComparableClaimants(a) + if err != nil { + return false, nil + } + sortClaimants(compA) + sortClaimants(compB) + + for i := range compA { + if !compA[i].equal(compB[i]) { + return false, nil + } + } + + return true, nil +} + func buildClaimants(claimants []xdr.Claimant) history.Claimants { hClaimants := history.Claimants{} for _, c := range claimants { From 29c6a4a8031bca2087402cffe956ed6f1761758c Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 14 Mar 2024 01:21:35 -0700 Subject: [PATCH 2/6] Add unit tests for claimable balanace update case --- ...laimable_balances_change_processor_test.go | 137 +++++++++++++++++- 1 file changed, 136 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go index 524de095f7..f6e2c421a1 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go @@ -144,7 +144,7 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) SetupTest() { } func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TearDownTest() { - s.Assert().NoError(s.processor.Commit(s.ctx)) + s.processor.reset() s.mockQ.AssertExpectations(s.T()) } @@ -201,6 +201,7 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc Post: &entry, }) s.Assert().NoError(err) + s.Assert().NoError(s.processor.Commit(s.ctx)) } func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestRemoveClaimableBalance() { @@ -248,4 +249,138 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestRemoveClaimableBal s.ctx, []string{id}, ).Return(int64(1), nil).Once() + s.Assert().NoError(s.processor.Commit(s.ctx)) +} + +func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBalanceAddSponsor() { + balanceID := xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{1, 2, 3}, + } + cBalance := xdr.ClaimableBalanceEntry{ + BalanceId: balanceID, + Claimants: []xdr.Claimant{}, + Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 10, + } + lastModifiedLedgerSeq := xdr.Uint32(123) + + pre := xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &cBalance, + }, + LastModifiedLedgerSeq: lastModifiedLedgerSeq - 1, + Ext: xdr.LedgerEntryExt{ + V: 1, + V1: &xdr.LedgerEntryExtensionV1{ + SponsoringId: nil, + }, + }, + } + + // add sponsor + updated := xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &cBalance, + }, + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Ext: xdr.LedgerEntryExt{ + V: 1, + V1: &xdr.LedgerEntryExtensionV1{ + SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + } + s.mockClaimableBalanceBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + + err := s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: &pre, + Post: &updated, + }) + s.Assert().NoError(err) + + id, err := xdr.MarshalHex(balanceID) + s.Assert().NoError(err) + s.mockQ.On( + "UpsertClaimableBalances", + s.ctx, + []history.ClaimableBalance{ + { + BalanceID: id, + Claimants: []history.Claimant{}, + Asset: cBalance.Asset, + Amount: cBalance.Amount, + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + ).Return(nil).Once() + s.Assert().NoError(s.processor.Commit(s.ctx)) +} + +func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBalanceAddClaimantInvalid() { + balanceID := xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{1, 2, 3}, + } + cBalance := xdr.ClaimableBalanceEntry{ + BalanceId: balanceID, + Claimants: []xdr.Claimant{}, + Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 10, + } + lastModifiedLedgerSeq := xdr.Uint32(123) + + pre := xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &cBalance, + }, + LastModifiedLedgerSeq: lastModifiedLedgerSeq - 1, + Ext: xdr.LedgerEntryExt{ + V: 1, + V1: &xdr.LedgerEntryExtensionV1{ + SponsoringId: nil, + }, + }, + } + + cBalanceUpdated := xdr.ClaimableBalanceEntry{ + BalanceId: balanceID, + Claimants: []xdr.Claimant{ + { + V0: &xdr.ClaimantV0{ + Destination: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Amount: 10, + } + + // add sponsor, claimant + updated := xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &cBalanceUpdated, + }, + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Ext: xdr.LedgerEntryExt{ + V: 1, + V1: &xdr.LedgerEntryExtensionV1{ + SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + } + s.mockClaimableBalanceBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + + err := s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeClaimableBalance, + Pre: &pre, + Post: &updated, + }) + s.Assert().NoError(err) + s.Assert().EqualError(s.processor.Commit(s.ctx), "invalid change entry for a claimable balance was detected: claimants have changed") } From 672d637f6c13979c12da6b63446a2a2aa77515ed Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 14 Mar 2024 08:47:28 +0000 Subject: [PATCH 3/6] relax immutable restrictions on updates --- .../db2/history/claimable_balances.go | 35 ++++++++- .../db2/history/claimable_balances_test.go | 18 +++++ .../claimable_balances_change_processor.go | 72 ------------------- 3 files changed, 50 insertions(+), 75 deletions(-) diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go index 74994493ef..77654fc1cd 100644 --- a/services/horizon/internal/db2/history/claimable_balances.go +++ b/services/horizon/internal/db2/history/claimable_balances.go @@ -187,10 +187,39 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) ( } // UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table. -// There's currently no limit of the number of offers this method can -// accept other than 2GB limit of the query string length what should be enough -// for each ledger with the current limits. func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error { + if err := q.upsertCBs(ctx, cbs); err != nil { + return errors.Wrap(err, "could not upsert claimable balances") + } + + if err := q.upsertCBClaimants(ctx, cbs); err != nil { + return errors.Wrap(err, "could not upsert claimable balance claimants") + } + + return nil +} + +func (q *Q) upsertCBClaimants(ctx context.Context, cbs []ClaimableBalance) error { + var id, lastModifiedLedger, destination []interface{} + + for _, cb := range cbs { + for _, claimant := range cb.Claimants { + id = append(id, cb.BalanceID) + lastModifiedLedger = append(lastModifiedLedger, cb.LastModifiedLedger) + destination = append(destination, claimant.Destination) + } + } + + upsertFields := []upsertField{ + {"id", "text", id}, + {"destination", "text", destination}, + {"last_modified_ledger", "integer", lastModifiedLedger}, + } + + return q.upsertRows(ctx, "claimable_balance_claimants", "id, destination", upsertFields) +} + +func (q *Q) upsertCBs(ctx context.Context, cbs []ClaimableBalance) error { var id, claimants, asset, amount, sponsor, lastModifiedLedger, flags []interface{} for _, cb := range cbs { diff --git a/services/horizon/internal/db2/history/claimable_balances_test.go b/services/horizon/internal/db2/history/claimable_balances_test.go index 3b3aa376b1..1ffe442244 100644 --- a/services/horizon/internal/db2/history/claimable_balances_test.go +++ b/services/horizon/internal/db2/history/claimable_balances_test.go @@ -621,6 +621,15 @@ func TestUpdateClaimableBalance(t *testing.T) { err = q.UpsertClaimableBalances(tt.Ctx, []ClaimableBalance{cBalance}) tt.Assert.NoError(err) + cBalancesClaimants, err := q.GetClaimantsByClaimableBalances(tt.Ctx, []string{cBalance.BalanceID}) + tt.Assert.NoError(err) + tt.Assert.Len(cBalancesClaimants[cBalance.BalanceID], 1) + tt.Assert.Equal(ClaimableBalanceClaimant{ + BalanceID: cBalance.BalanceID, + Destination: accountID, + LastModifiedLedger: cBalance.LastModifiedLedger, + }, cBalancesClaimants[cBalance.BalanceID][0]) + // add sponsor cBalance2 := ClaimableBalance{ BalanceID: id, @@ -647,6 +656,15 @@ func TestUpdateClaimableBalance(t *testing.T) { tt.Assert.Len(cbs, 1) tt.Assert.Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", cbs[0].Sponsor.String) tt.Assert.Equal(uint32(lastModifiedLedgerSeq+1), cbs[0].LastModifiedLedger) + + cBalancesClaimants, err = q.GetClaimantsByClaimableBalances(tt.Ctx, []string{cBalance2.BalanceID}) + tt.Assert.NoError(err) + tt.Assert.Len(cBalancesClaimants[cBalance2.BalanceID], 1) + tt.Assert.Equal(ClaimableBalanceClaimant{ + BalanceID: cBalance2.BalanceID, + Destination: accountID, + LastModifiedLedger: cBalance2.LastModifiedLedger, + }, cBalancesClaimants[cBalance2.BalanceID][0]) } func TestFindClaimableBalance(t *testing.T) { diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go index 58bdb8d15a..fce002881c 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go @@ -1,10 +1,7 @@ package processors import ( - "bytes" "context" - "fmt" - "sort" "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -106,17 +103,6 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { if err != nil { return err } - preCB, err := p.ledgerEntryToRow(change.Pre) - if err != nil { - return err - } - equal, err := claimantsAreEqual(preCB.Claimants, postCB.Claimants) - if err != nil { - return errors.Wrap(err, "error comparing claimants") - } - if !equal { - return fmt.Errorf("invalid change entry for a claimable balance was detected: claimants have changed") - } updatedBalances = append(updatedBalances, postCB) } } @@ -160,64 +146,6 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { return nil } -type comparableClaimant struct { - destination string - predicate []byte -} - -func (c comparableClaimant) equal(o comparableClaimant) bool { - return c.destination == o.destination && bytes.Equal(c.predicate, o.predicate) -} - -func sortClaimants(claimants []comparableClaimant) { - sort.Slice(claimants, func(i, j int) bool { - if claimants[i].destination != claimants[j].destination { - return claimants[i].destination < claimants[j].destination - } - return bytes.Compare(claimants[i].predicate, claimants[j].predicate) < 0 - }) -} - -func makeComparableClaimants(claimants history.Claimants) ([]comparableClaimant, error) { - result := make([]comparableClaimant, len(claimants)) - for i, claimant := range claimants { - predicate, err := claimant.Predicate.MarshalBinary() - if err != nil { - return nil, err - } - result[i] = comparableClaimant{ - destination: claimant.Destination, - predicate: predicate, - } - } - return result, nil -} - -func claimantsAreEqual(a, b history.Claimants) (bool, error) { - if len(a) != len(b) { - return false, nil - } - - compA, err := makeComparableClaimants(a) - if err != nil { - return false, nil - } - compB, err := makeComparableClaimants(a) - if err != nil { - return false, nil - } - sortClaimants(compA) - sortClaimants(compB) - - for i := range compA { - if !compA[i].equal(compB[i]) { - return false, nil - } - } - - return true, nil -} - func buildClaimants(claimants []xdr.Claimant) history.Claimants { hClaimants := history.Claimants{} for _, c := range claimants { From d74d3351f42bed04b630d4e71ab2904cbd29bba9 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 14 Mar 2024 08:51:17 +0000 Subject: [PATCH 4/6] remove TestUpdateClaimableBalanceAddClaimantInvalid --- ...laimable_balances_change_processor_test.go | 67 +------------------ 1 file changed, 2 insertions(+), 65 deletions(-) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go index f6e2c421a1..0b892d66a6 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go @@ -8,10 +8,11 @@ import ( "github.com/guregu/null" + "github.com/stretchr/testify/suite" + "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/suite" ) func TestClaimableBalancesChangeProcessorTestSuiteState(t *testing.T) { @@ -320,67 +321,3 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBal ).Return(nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) } - -func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBalanceAddClaimantInvalid() { - balanceID := xdr.ClaimableBalanceId{ - Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, - V0: &xdr.Hash{1, 2, 3}, - } - cBalance := xdr.ClaimableBalanceEntry{ - BalanceId: balanceID, - Claimants: []xdr.Claimant{}, - Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Amount: 10, - } - lastModifiedLedgerSeq := xdr.Uint32(123) - - pre := xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeClaimableBalance, - ClaimableBalance: &cBalance, - }, - LastModifiedLedgerSeq: lastModifiedLedgerSeq - 1, - Ext: xdr.LedgerEntryExt{ - V: 1, - V1: &xdr.LedgerEntryExtensionV1{ - SponsoringId: nil, - }, - }, - } - - cBalanceUpdated := xdr.ClaimableBalanceEntry{ - BalanceId: balanceID, - Claimants: []xdr.Claimant{ - { - V0: &xdr.ClaimantV0{ - Destination: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - }, - }, - }, Asset: xdr.MustNewCreditAsset("USD", "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - Amount: 10, - } - - // add sponsor, claimant - updated := xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeClaimableBalance, - ClaimableBalance: &cBalanceUpdated, - }, - LastModifiedLedgerSeq: lastModifiedLedgerSeq, - Ext: xdr.LedgerEntryExt{ - V: 1, - V1: &xdr.LedgerEntryExtensionV1{ - SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - }, - }, - } - s.mockClaimableBalanceBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() - - err := s.processor.ProcessChange(s.ctx, ingest.Change{ - Type: xdr.LedgerEntryTypeClaimableBalance, - Pre: &pre, - Post: &updated, - }) - s.Assert().NoError(err) - s.Assert().EqualError(s.processor.Commit(s.ctx), "invalid change entry for a claimable balance was detected: claimants have changed") -} From 301ce2dc182450a0cbed4e427b7c3d51a13fbeff Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 14 Mar 2024 08:56:09 +0000 Subject: [PATCH 5/6] update comment --- services/horizon/internal/db2/history/claimable_balances.go | 1 + 1 file changed, 1 insertion(+) diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go index 77654fc1cd..abdf4ed758 100644 --- a/services/horizon/internal/db2/history/claimable_balances.go +++ b/services/horizon/internal/db2/history/claimable_balances.go @@ -187,6 +187,7 @@ func (q *Q) GetClaimantsByClaimableBalances(ctx context.Context, ids []string) ( } // UpsertClaimableBalances upserts a batch of claimable balances in the claimable_balances table. +// It also upserts the corresponding claimants in the claimable_balance_claimants table. func (q *Q) UpsertClaimableBalances(ctx context.Context, cbs []ClaimableBalance) error { if err := q.upsertCBs(ctx, cbs); err != nil { return errors.Wrap(err, "could not upsert claimable balances") From 822c0b180a2c64526bf570c774c08ddb4e4eaa34 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 14 Mar 2024 08:58:54 +0000 Subject: [PATCH 6/6] fix TearDownTest() --- .../processors/claimable_balances_change_processor_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go index 0b892d66a6..a10cc9db7d 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go @@ -145,7 +145,7 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) SetupTest() { } func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TearDownTest() { - + s.Assert().NoError(s.processor.Commit(s.ctx)) s.processor.reset() s.mockQ.AssertExpectations(s.T()) } @@ -202,7 +202,6 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc Post: &entry, }) s.Assert().NoError(err) - s.Assert().NoError(s.processor.Commit(s.ctx)) } func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestRemoveClaimableBalance() { @@ -250,7 +249,6 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestRemoveClaimableBal s.ctx, []string{id}, ).Return(int64(1), nil).Once() - s.Assert().NoError(s.processor.Commit(s.ctx)) } func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBalanceAddSponsor() { @@ -319,5 +317,4 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestUpdateClaimableBal }, }, ).Return(nil).Once() - s.Assert().NoError(s.processor.Commit(s.ctx)) }