diff --git a/services/horizon/internal/actions/claimable_balance_test.go b/services/horizon/internal/actions/claimable_balance_test.go index 5a3ce4c7ba..add2dac150 100644 --- a/services/horizon/internal/actions/claimable_balance_test.go +++ b/services/horizon/internal/actions/claimable_balance_test.go @@ -21,7 +21,7 @@ func TestGetClaimableBalanceByID(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{tt.HorizonSession()} - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }() @@ -156,15 +156,10 @@ func TestGetClaimableBalances(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{tt.HorizonSession()} -<<<<<<< HEAD - q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) - defer q.SessionInterface.Rollback() -======= - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }() ->>>>>>> c7bd7ac1 (Fix linter errors) entriesMeta := []struct { id xdr.Hash @@ -206,6 +201,7 @@ func TestGetClaimableBalances(t *testing.T) { } balanceInsertbuilder := q.NewClaimableBalanceBatchInsertBuilder() + claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder() for _, cBalance := range hCBs { @@ -301,6 +297,7 @@ func TestGetClaimableBalances(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Len(response, 0) + // new claimable balances are ingested, they should appear in the next pages balanceInsertbuilder = q.NewClaimableBalanceBatchInsertBuilder() claimantsInsertBuilder = q.NewClaimableBalanceClaimantBatchInsertBuilder() @@ -324,13 +321,12 @@ func TestGetClaimableBalances(t *testing.T) { }, } - hCBs = nil for _, e := range entriesMeta { entry := buildClaimableBalance(tt, e.id, e.accountID, e.ledger, e.asset) hCBs = append(hCBs, entry) } - for _, cBalance := range hCBs { + for _, cBalance := range hCBs[4:] { tt.Assert.NoError(balanceInsertbuilder.Add(cBalance)) for _, claimant := range cBalance.Claimants { @@ -370,7 +366,7 @@ func TestGetClaimableBalances(t *testing.T) { tt.Assert.Len(response, 2) // response should be the first 2 elements of entries - for i, entry := range hCBs { + for i, entry := range hCBs[4:] { tt.Assert.Equal(entry.BalanceID, response[i].(protocol.ClaimableBalance).BalanceID) } @@ -401,7 +397,9 @@ func TestGetClaimableBalances(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Len(response, 2) - tt.Assert.Equal(hCBs[1].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID) + tt.Assert.Equal(hCBs[5].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID) + + tt.Assert.Equal(hCBs[4].BalanceID, response[1].(protocol.ClaimableBalance).BalanceID) response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest( t, @@ -417,6 +415,8 @@ func TestGetClaimableBalances(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Len(response, 1) + tt.Assert.Equal(hCBs[0].BalanceID, response[0].(protocol.ClaimableBalance).BalanceID) + // filter by asset response, err = handler.GetResourcePage(httptest.NewRecorder(), makeRequest( t, diff --git a/services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go b/services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go index 0b1621014f..93a6abd9de 100644 --- a/services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/claimable_balance_batch_insert_builder.go @@ -36,7 +36,7 @@ func (i *claimableBalanceBatchInsertBuilder) Add(claimableBalance ClaimableBalan return i.builder.RowStruct(claimableBalance) } -// Exec inserts claimable balance rows to the database +// Exec writes the batch of claimable balances to the database. func (i *claimableBalanceBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { return i.builder.Exec(ctx, session, i.table) } diff --git a/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go b/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go index 458ade97e4..37eaa67e35 100644 --- a/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/claimable_balance_claimant_batch_insert_builder.go @@ -7,8 +7,8 @@ import ( "github.com/stellar/go/xdr" ) -// ClaimableBalanceClaimantBatchInsertBuilder is used to insert transactions into the -// history_transactions table +// ClaimableBalanceClaimantBatchInsertBuilder is used to insert claimants into the +// claimable_balance_claimants table type ClaimableBalanceClaimantBatchInsertBuilder interface { Add(claimableBalanceClaimant ClaimableBalanceClaimant) error Exec(ctx context.Context, session db.SessionInterface) error @@ -31,12 +31,12 @@ func (q *Q) NewClaimableBalanceClaimantBatchInsertBuilder() ClaimableBalanceClai } } -// Add adds a new claimant for a claimable Balance to the batch +// Add adds a new claimant to the batch func (i *claimableBalanceClaimantBatchInsertBuilder) Add(claimableBalanceClaimant ClaimableBalanceClaimant) error { return i.builder.RowStruct(claimableBalanceClaimant) } -// Exec flushes the entire batch into the database +// Exec writes the batch of claimants to the database. func (i *claimableBalanceClaimantBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { return i.builder.Exec(ctx, session, i.table) } diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go index 1b9254991d..5490bef11c 100644 --- a/services/horizon/internal/db2/history/claimable_balances.go +++ b/services/horizon/internal/db2/history/claimable_balances.go @@ -110,8 +110,6 @@ func (c Claimants) Value() (driver.Value, error) { // Convert the byte array into a string as a workaround to bypass buggy encoding in the pq driver // (More info about this bug here https://github.com/stellar/go/issues/5086#issuecomment-1773215436). // By doing so, the data will be written as a string rather than hex encoded bytes. - // According to this https://www.postgresql.org/docs/current/datatype-json.html - // this may even improve write performance due to reduced conversion overhead. val, err := json.Marshal(c) return string(val), err } diff --git a/services/horizon/internal/db2/history/claimable_balances_test.go b/services/horizon/internal/db2/history/claimable_balances_test.go index cc03867e91..f38674866d 100644 --- a/services/horizon/internal/db2/history/claimable_balances_test.go +++ b/services/horizon/internal/db2/history/claimable_balances_test.go @@ -3,9 +3,9 @@ package history import ( "database/sql" "fmt" + "github.com/guregu/null" "testing" - "github.com/guregu/null" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/xdr" @@ -16,7 +16,7 @@ func TestRemoveClaimableBalance(t *testing.T) { defer tt.Finish() test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }() @@ -69,7 +69,7 @@ func TestRemoveClaimableBalanceClaimants(t *testing.T) { defer tt.Finish() test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }() @@ -112,7 +112,7 @@ func TestFindClaimableBalancesByDestination(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }() @@ -409,9 +409,7 @@ func TestFindClaimableBalance(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) - defer q.SessionInterface.Rollback() - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }() @@ -460,9 +458,7 @@ func TestGetClaimableBalancesByID(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{}) - defer q.SessionInterface.Rollback() - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }() diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 2bc5657e21..764ff9cb5c 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "github.com/stellar/go/support/db" "time" "github.com/stellar/go/ingest" @@ -407,10 +406,7 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( groupChangeProcessors := buildChangeProcessor( s.historyQ, -<<<<<<< HEAD -======= s.session, ->>>>>>> 8f1836b1 (Use FastBatchInsertBuilder to insert to insert into claimable_balances and claimable_balance_claimants tables) &changeStatsProcessor, ledgerSource, ledger.LedgerSequence(), diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index dfbb9aed6a..6ca0f7319e 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -200,7 +200,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { } stats := &ingest.StatsChangeProcessor{} - processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "") + processor := buildChangeProcessor(runner.historyQ, &db.MockSession{}, stats, ledgerSource, 123, "") assert.IsType(t, &groupChangeProcessors{}, processor) assert.IsType(t, &statsChangeProcessor{}, processor.processors[0]) @@ -221,11 +221,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { filters: &MockFilters{}, } -<<<<<<< HEAD - processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "") -======= processor = buildChangeProcessor(runner.historyQ, &db.MockSession{}, stats, historyArchiveSource, 456, "") ->>>>>>> 8f1836b1 (Use FastBatchInsertBuilder to insert to insert into claimable_balances and claimable_balance_claimants tables) assert.IsType(t, &groupChangeProcessors{}, processor) assert.IsType(t, &statsChangeProcessor{}, processor.processors[0]) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go index 699694c28e..5806ffa85a 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor.go @@ -81,8 +81,10 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { } } - if err := p.InsertClaimableBalanceAndClaimants(ctx, cbsToInsert); err != nil { - return errors.Wrap(err, "error inserting claimable balance") + if len(cbsToInsert) > 0 { + if err := p.insertClaimableBalancesAndClaimants(ctx, cbsToInsert); err != nil { + return errors.Wrap(err, "error inserting claimable balance") + } } if len(cbIDsToDelete) > 0 { @@ -108,19 +110,18 @@ func (p *ClaimableBalancesChangeProcessor) Commit(ctx context.Context) error { return nil } -func (p *ClaimableBalancesChangeProcessor) InsertClaimableBalanceAndClaimants(ctx context.Context, claimableBalances []history.ClaimableBalance) error { - if len(claimableBalances) == 0 { - return nil - } - +func (p *ClaimableBalancesChangeProcessor) insertClaimableBalancesAndClaimants(ctx context.Context, + claimableBalances []history.ClaimableBalance) error { defer p.claimantsInsertBuilder.Reset() defer p.claimableBalanceInsertBuilder.Reset() for _, cb := range claimableBalances { + // Add claimable balance if err := p.claimableBalanceInsertBuilder.Add(cb); err != nil { - return errors.Wrap(err, "error executing insert") + return errors.Wrap(err, "error adding to ClaimableBalanceBatchInsertBuilder") } + // Add claimants for _, claimant := range cb.Claimants { claimant := history.ClaimableBalanceClaimant{ @@ -130,19 +131,19 @@ func (p *ClaimableBalancesChangeProcessor) InsertClaimableBalanceAndClaimants(ct } if err := p.claimantsInsertBuilder.Add(claimant); err != nil { - return errors.Wrap(err, "error adding to claimantsInsertBuilder") + return errors.Wrap(err, "error adding to ClaimableBalanceClaimantBatchInsertBuilder") } } } err := p.claimantsInsertBuilder.Exec(ctx, p.session) if err != nil { - return errors.Wrap(err, "error executing claimableBalanceInsertBuilder") + return errors.Wrap(err, "error executing ClaimableBalanceClaimantBatchInsertBuilder") } err = p.claimableBalanceInsertBuilder.Exec(ctx, p.session) if err != nil { - return errors.Wrap(err, "error executing claimantsInsertBuilder") + return errors.Wrap(err, "error executing ClaimableBalanceBatchInsertBuilder") } return nil } diff --git a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go index f7ca1c7b11..5a952d1dc3 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_change_processor_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/guregu/null" - "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -183,7 +182,7 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc Ext: xdr.LedgerEntryExt{ V: 1, V1: &xdr.LedgerEntryExtensionV1{ - SponsoringId: nil, + SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), }, }, } @@ -200,6 +199,7 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc Asset: cBalance.Asset, Amount: cBalance.Amount, LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), }, ).Return(nil).Once() @@ -209,42 +209,6 @@ func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestNewClaimableBalanc Post: &entry, }) s.Assert().NoError(err) - - // add sponsor - updated := xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeClaimableBalance, - ClaimableBalance: &cBalance, - }, - LastModifiedLedgerSeq: lastModifiedLedgerSeq, - Ext: xdr.LedgerEntryExt{ - V: 1, - V1: &xdr.LedgerEntryExtensionV1{ - SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - }, - }, - } - - entry.LastModifiedLedgerSeq = entry.LastModifiedLedgerSeq - 1 - err = s.processor.ProcessChange(s.ctx, ingest.Change{ - Type: xdr.LedgerEntryTypeClaimableBalance, - Pre: &entry, - Post: &updated, - }) - s.Assert().NoError(err) - - // We use LedgerEntryChangesCache so all changes are squashed - s.mockClaimableBalanceBatchInsertBuilder.On( - "Add", - history.ClaimableBalance{ - BalanceID: id, - Claimants: []history.Claimant{}, - Asset: cBalance.Asset, - Amount: cBalance.Amount, - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), - }, - ).Return(nil).Once() } func (s *ClaimableBalancesChangeProcessorTestSuiteLedger) TestRemoveClaimableBalance() { diff --git a/services/horizon/internal/ingest/verify_test.go b/services/horizon/internal/ingest/verify_test.go index 53441bf00d..11b510fe54 100644 --- a/services/horizon/internal/ingest/verify_test.go +++ b/services/horizon/internal/ingest/verify_test.go @@ -271,13 +271,13 @@ func TestStateVerifierLockBusy(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{&db.Session{DB: tt.HorizonDB}} - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }() checkpointLedger := uint32(63) - changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "") + changeProcessor := buildChangeProcessor(q, q.SessionInterface, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "") gen := randxdr.NewGenerator() var changes []xdr.LedgerEntryChange @@ -296,6 +296,8 @@ func TestStateVerifierLockBusy(t *testing.T) { } tt.Assert.NoError(changeProcessor.Commit(tt.Ctx)) + tt.Assert.NoError(q.SessionInterface.Commit()) + q.UpdateLastLedgerIngest(tt.Ctx, checkpointLedger) mockHistoryAdapter := &mockHistoryArchiveAdapter{} @@ -329,7 +331,7 @@ func TestStateVerifier(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &history.Q{&db.Session{DB: tt.HorizonDB}} - tt.Assert.NoError(q.SessionInterface.BeginTx(&sql.TxOptions{})) + tt.Assert.NoError(q.SessionInterface.BeginTx(tt.Ctx, &sql.TxOptions{})) defer func() { _ = q.SessionInterface.Rollback() }()