From 28088892ef5f1c280cdaec1fd81ba72b8aec464d Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 7 Jul 2023 18:53:46 +0100 Subject: [PATCH] Use FastBatchInsertBuilder to insert ledgers into the history_ledgers table --- .../horizon/internal/action_offers_test.go | 6 ++- .../horizon/internal/actions/account_test.go | 19 +++++-- .../horizon/internal/actions/offer_test.go | 16 ++++-- .../horizon/internal/actions_account_test.go | 7 ++- .../horizon/internal/actions_data_test.go | 6 ++- .../horizon/internal/db2/history/ledger.go | 46 +++++++++++------ .../internal/db2/history/ledger_test.go | 14 ++++-- .../internal/db2/history/mock_q_ledgers.go | 27 +++++++--- .../internal/db2/history/transaction_test.go | 6 ++- services/horizon/internal/ingest/main.go | 1 + .../internal/ingest/processor_runner.go | 4 +- .../internal/ingest/processor_runner_test.go | 29 +++++++++-- .../ingest/processors/ledgers_processor.go | 27 +++------- .../processors/ledgers_processor_test.go | 49 +++++++++---------- services/horizon/internal/middleware_test.go | 8 ++- 15 files changed, 178 insertions(+), 87 deletions(-) diff --git a/services/horizon/internal/action_offers_test.go b/services/horizon/internal/action_offers_test.go index 13458db9fe..d10e720636 100644 --- a/services/horizon/internal/action_offers_test.go +++ b/services/horizon/internal/action_offers_test.go @@ -24,7 +24,9 @@ func TestOfferActions_Show(t *testing.T) { ht.Assert.NoError(err) ledgerCloseTime := time.Now().Unix() - _, err = q.InsertLedger(ctx, xdr.LedgerHeaderHistoryEntry{ + ht.Assert.NoError(q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: 100, ScpValue: xdr.StellarValue{ @@ -33,6 +35,8 @@ func TestOfferActions_Show(t *testing.T) { }, }, 0, 0, 0, 0, 0) ht.Assert.NoError(err) + ht.Assert.NoError(ledgerBatch.Exec(ht.Ctx, q)) + ht.Assert.NoError(q.Commit()) issuer := xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") nativeAsset := xdr.MustNewNativeAsset() diff --git a/services/horizon/internal/actions/account_test.go b/services/horizon/internal/actions/account_test.go index ed3d529575..ff265721da 100644 --- a/services/horizon/internal/actions/account_test.go +++ b/services/horizon/internal/actions/account_test.go @@ -228,7 +228,9 @@ func TestAccountInfo(t *testing.T) { assert.NoError(t, err) ledgerFourCloseTime := time.Now().Unix() - _, err = q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{ + assert.NoError(t, q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: 4, ScpValue: xdr.StellarValue{ @@ -237,6 +239,8 @@ func TestAccountInfo(t *testing.T) { }, }, 0, 0, 0, 0, 0) assert.NoError(t, err) + assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q)) + assert.NoError(t, q.Commit()) account, err := AccountInfo(tt.Ctx, &history.Q{tt.HorizonSession()}, accountID) tt.Assert.NoError(err) @@ -408,7 +412,9 @@ func TestGetAccountsHandlerPageResultsByAsset(t *testing.T) { err := q.UpsertAccounts(tt.Ctx, []history.AccountEntry{account1, account2}) assert.NoError(t, err) ledgerCloseTime := time.Now().Unix() - _, err = q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{ + assert.NoError(t, q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: 1234, ScpValue: xdr.StellarValue{ @@ -417,6 +423,8 @@ func TestGetAccountsHandlerPageResultsByAsset(t *testing.T) { }, }, 0, 0, 0, 0, 0) assert.NoError(t, err) + assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q)) + assert.NoError(t, q.Commit()) for _, row := range accountSigners { _, err = q.CreateAccountSigner(tt.Ctx, row.Account, row.Signer, row.Weight, nil) @@ -511,7 +519,9 @@ func TestGetAccountsHandlerPageResultsByLiquidityPool(t *testing.T) { assert.NoError(t, err) ledgerCloseTime := time.Now().Unix() - _, err = q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{ + assert.NoError(t, q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: 1234, ScpValue: xdr.StellarValue{ @@ -520,6 +530,9 @@ func TestGetAccountsHandlerPageResultsByLiquidityPool(t *testing.T) { }, }, 0, 0, 0, 0, 0) assert.NoError(t, err) + assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q)) + assert.NoError(t, q.Commit()) + var assetType, code, issuer string usd.MustExtract(&assetType, &code, &issuer) params := map[string]string{ diff --git a/services/horizon/internal/actions/offer_test.go b/services/horizon/internal/actions/offer_test.go index 41663c65d5..578e284bc6 100644 --- a/services/horizon/internal/actions/offer_test.go +++ b/services/horizon/internal/actions/offer_test.go @@ -79,7 +79,9 @@ func TestGetOfferByIDHandler(t *testing.T) { handler := GetOfferByID{} ledgerCloseTime := time.Now().Unix() - _, err := q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{ + assert.NoError(t, q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err := ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: 3, ScpValue: xdr.StellarValue{ @@ -87,7 +89,9 @@ func TestGetOfferByIDHandler(t *testing.T) { }, }, }, 0, 0, 0, 0, 0) - tt.Assert.NoError(err) + assert.NoError(t, err) + assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q)) + assert.NoError(t, q.Commit()) err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, usdOffer}) tt.Assert.NoError(err) @@ -186,7 +190,9 @@ func TestGetOffersHandler(t *testing.T) { handler := GetOffersHandler{} ledgerCloseTime := time.Now().Unix() - _, err := q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{ + assert.NoError(t, q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err := ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: 3, ScpValue: xdr.StellarValue{ @@ -194,7 +200,9 @@ func TestGetOffersHandler(t *testing.T) { }, }, }, 0, 0, 0, 0, 0) - tt.Assert.NoError(err) + assert.NoError(t, err) + assert.NoError(t, ledgerBatch.Exec(tt.Ctx, q)) + assert.NoError(t, q.Commit()) err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer}) tt.Assert.NoError(err) diff --git a/services/horizon/internal/actions_account_test.go b/services/horizon/internal/actions_account_test.go index 541300c3a6..e3e71e0b3a 100644 --- a/services/horizon/internal/actions_account_test.go +++ b/services/horizon/internal/actions_account_test.go @@ -18,12 +18,17 @@ func TestAccountActions_InvalidID(t *testing.T) { ht.Assert.NoError(err) err = q.UpdateIngestVersion(ht.Ctx, ingest.CurrentVersion) ht.Assert.NoError(err) - _, err = q.InsertLedger(ht.Ctx, xdr.LedgerHeaderHistoryEntry{ + + ht.Assert.NoError(q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: 100, }, }, 0, 0, 0, 0, 0) ht.Assert.NoError(err) + ht.Assert.NoError(ledgerBatch.Exec(ht.Ctx, q)) + ht.Assert.NoError(q.Commit()) // existing account w := ht.Get( diff --git a/services/horizon/internal/actions_data_test.go b/services/horizon/internal/actions_data_test.go index 3de82a915d..8cdc4a07db 100644 --- a/services/horizon/internal/actions_data_test.go +++ b/services/horizon/internal/actions_data_test.go @@ -44,12 +44,16 @@ func TestDataActions_Show(t *testing.T) { ht.Assert.NoError(err) err = q.UpdateIngestVersion(ht.Ctx, ingest.CurrentVersion) ht.Assert.NoError(err) - _, err = q.InsertLedger(ht.Ctx, xdr.LedgerHeaderHistoryEntry{ + ht.Assert.NoError(q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: 100, }, }, 0, 0, 0, 0, 0) ht.Assert.NoError(err) + ht.Assert.NoError(ledgerBatch.Exec(ht.Ctx, q)) + ht.Assert.NoError(q.Commit()) err = q.UpsertAccountData(ht.Ctx, []history.Data{data1, data2}) assert.NoError(t, err) diff --git a/services/horizon/internal/db2/history/ledger.go b/services/horizon/internal/db2/history/ledger.go index 7d367a8464..ca89534702 100644 --- a/services/horizon/internal/db2/history/ledger.go +++ b/services/horizon/internal/db2/history/ledger.go @@ -10,6 +10,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/guregu/null" "github.com/stellar/go/services/horizon/internal/db2" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/ordered" "github.com/stellar/go/toid" @@ -90,27 +91,46 @@ func (q *LedgersQ) Select(ctx context.Context, dest interface{}) error { // QLedgers defines ingestion ledger related queries. type QLedgers interface { - InsertLedger( - ctx context.Context, + NewLedgerBatchInsertBuilder() LedgerBatchInsertBuilder +} + +// LedgerBatchInsertBuilder is used to insert ledgers into the +// history_ledgers table +type LedgerBatchInsertBuilder interface { + Add( ledger xdr.LedgerHeaderHistoryEntry, successTxsCount int, failedTxsCount int, opCount int, txSetOpCount int, ingestVersion int, - ) (int64, error) + ) error + Exec(ctx context.Context, session db.SessionInterface) error +} + +// ledgerBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder +type ledgerBatchInsertBuilder struct { + builder db.FastBatchInsertBuilder + table string +} + +// NewLedgerBatchInsertBuilder constructs a new EffectBatchInsertBuilder instance +func (q *Q) NewLedgerBatchInsertBuilder() LedgerBatchInsertBuilder { + return &ledgerBatchInsertBuilder{ + table: "history_ledgers", + builder: db.FastBatchInsertBuilder{}, + } } -// InsertLedger creates a row in the history_ledgers table. -// Returns number of rows affected and error. -func (q *Q) InsertLedger(ctx context.Context, +// Add adds a effect to the batch +func (i *ledgerBatchInsertBuilder) Add( ledger xdr.LedgerHeaderHistoryEntry, successTxsCount int, failedTxsCount int, opCount int, txSetOpCount int, ingestVersion int, -) (int64, error) { +) error { m, err := ledgerHeaderToMap( ledger, successTxsCount, @@ -120,16 +140,14 @@ func (q *Q) InsertLedger(ctx context.Context, ingestVersion, ) if err != nil { - return 0, err + return err } - sql := sq.Insert("history_ledgers").SetMap(m) - result, err := q.Exec(ctx, sql) - if err != nil { - return 0, err - } + return i.builder.Row(m) +} - return result.RowsAffected() +func (i *ledgerBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + return i.builder.Exec(ctx, session, i.table) } // GetLedgerGaps obtains ingestion gaps in the history_ledgers table. diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index 4bf6d7b058..c8526fddd6 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -119,7 +119,8 @@ func TestInsertLedger(t *testing.T) { tt.Assert.NoError(err) expectedLedger.LedgerHeaderXDR = null.NewString(ledgerHeaderBase64, true) - rowsAffected, err := q.InsertLedger(tt.Ctx, + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add( ledgerEntry, 12, 3, @@ -128,7 +129,9 @@ func TestInsertLedger(t *testing.T) { int(expectedLedger.ImporterVersion), ) tt.Assert.NoError(err) - tt.Assert.Equal(rowsAffected, int64(1)) + tt.Assert.NoError(q.Begin()) + tt.Assert.NoError(ledgerBatch.Exec(tt.Ctx, q.SessionInterface)) + tt.Assert.NoError(q.Commit()) err = q.LedgerBySequence(tt.Ctx, &ledgerFromDB, 69859) tt.Assert.NoError(err) @@ -204,7 +207,8 @@ func insertLedgerWithSequence(tt *test.T, q *Q, seq uint32) { ledgerHeaderBase64, err := xdr.MarshalBase64(ledgerEntry.Header) tt.Assert.NoError(err) expectedLedger.LedgerHeaderXDR = null.NewString(ledgerHeaderBase64, true) - rowsAffected, err := q.InsertLedger(tt.Ctx, + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add( ledgerEntry, 12, 3, @@ -213,7 +217,9 @@ func insertLedgerWithSequence(tt *test.T, q *Q, seq uint32) { int(expectedLedger.ImporterVersion), ) tt.Assert.NoError(err) - tt.Assert.Equal(rowsAffected, int64(1)) + tt.Assert.NoError(q.Begin()) + tt.Assert.NoError(ledgerBatch.Exec(tt.Ctx, q.SessionInterface)) + tt.Assert.NoError(q.Commit()) } func TestGetLedgerGaps(t *testing.T) { diff --git a/services/horizon/internal/db2/history/mock_q_ledgers.go b/services/horizon/internal/db2/history/mock_q_ledgers.go index 16d3ef5524..f02cd7517c 100644 --- a/services/horizon/internal/db2/history/mock_q_ledgers.go +++ b/services/horizon/internal/db2/history/mock_q_ledgers.go @@ -3,23 +3,38 @@ package history import ( "context" - "github.com/stretchr/testify/mock" - + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" + + "github.com/stretchr/testify/mock" ) type MockQLedgers struct { mock.Mock } -func (m *MockQLedgers) InsertLedger(ctx context.Context, +func (m *MockQLedgers) NewLedgerBatchInsertBuilder() LedgerBatchInsertBuilder { + a := m.Called() + return a.Get(0).(LedgerBatchInsertBuilder) +} + +type MockLedgersBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockLedgersBatchInsertBuilder) Add( ledger xdr.LedgerHeaderHistoryEntry, successTxsCount int, failedTxsCount int, opCount int, txSetOpCount int, ingestVersion int, -) (int64, error) { - a := m.Called(ctx, ledger, successTxsCount, failedTxsCount, opCount, txSetOpCount, ingestVersion) - return a.Get(0).(int64), a.Error(1) +) error { + a := m.Called(ledger, successTxsCount, failedTxsCount, opCount, txSetOpCount, ingestVersion) + return a.Error(0) +} + +func (m *MockLedgersBatchInsertBuilder) Exec(ctx context.Context, session db.SessionInterface) error { + a := m.Called(ctx, session) + return a.Error(0) } diff --git a/services/horizon/internal/db2/history/transaction_test.go b/services/horizon/internal/db2/history/transaction_test.go index 0f3592c439..576b93ffb9 100644 --- a/services/horizon/internal/db2/history/transaction_test.go +++ b/services/horizon/internal/db2/history/transaction_test.go @@ -45,7 +45,8 @@ func TestTransactionByLiquidityPool(t *testing.T) { // Insert a phony ledger ledgerCloseTime := time.Now().Unix() - _, err := q.InsertLedger(tt.Ctx, xdr.LedgerHeaderHistoryEntry{ + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err := ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ LedgerSeq: xdr.Uint32(sequence), ScpValue: xdr.StellarValue{ @@ -54,6 +55,9 @@ func TestTransactionByLiquidityPool(t *testing.T) { }, }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) + tt.Assert.NoError(q.Begin()) + tt.Assert.NoError(ledgerBatch.Exec(tt.Ctx, q.SessionInterface)) + tt.Assert.NoError(q.Commit()) // Insert a phony transaction transactionBuilder := q.NewTransactionBatchInsertBuilder(2) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index dc6dc8dd46..783cc29681 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -310,6 +310,7 @@ func NewSystem(config Config) (System, error) { ctx: ctx, config: config, historyQ: historyQ, + session: historyQ, historyAdapter: historyAdapter, filters: filters, }, diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 29635dce66..c6fcd75f2c 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -10,6 +10,7 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ingest/filters" "github.com/stellar/go/services/horizon/internal/ingest/processors" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -88,6 +89,7 @@ type ProcessorRunner struct { ctx context.Context historyQ history.IngestionQ + session db.SessionInterface historyAdapter historyArchiveAdapterInterface logMemoryStats bool filters filters.Filters @@ -143,7 +145,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( return newGroupTransactionProcessors([]horizonTransactionProcessor{ statsLedgerTransactionProcessor, processors.NewEffectProcessor(s.historyQ, sequence), - processors.NewLedgerProcessor(s.historyQ, ledger, CurrentVersion), + processors.NewLedgerProcessor(s.session, s.historyQ, ledger, CurrentVersion), processors.NewOperationProcessor(s.historyQ, sequence), tradeProcessor, processors.NewParticipantsProcessor(s.historyQ, sequence), diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index f1fa7747fb..14c1daf0df 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -16,6 +16,7 @@ import ( "github.com/stellar/go/network" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ingest/processors" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -316,13 +317,23 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) - q.MockQLedgers.On("InsertLedger", ctx, ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion). - Return(int64(1), nil).Once() + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} + q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) + mockBatchInsertBuilder.On( + "Add", + ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil) + mockSession := &db.MockSession{} + mockBatchInsertBuilder.On( + "Exec", + ctx, + mockSession, + ).Return(nil) runner := ProcessorRunner{ ctx: ctx, config: config, historyQ: q, + session: mockSession, filters: &MockFilters{}, } @@ -372,13 +383,23 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() - q.MockQLedgers.On("InsertLedger", ctx, ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion). - Return(int64(1), nil).Once() + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} + q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) + mockBatchInsertBuilder.On( + "Add", + ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil) + mockSession := &db.MockSession{} + mockBatchInsertBuilder.On( + "Exec", + ctx, + mockSession, + ).Return(nil) runner := ProcessorRunner{ ctx: ctx, config: config, historyQ: q, + session: mockSession, filters: &MockFilters{}, } diff --git a/services/horizon/internal/ingest/processors/ledgers_processor.go b/services/horizon/internal/ingest/processors/ledgers_processor.go index 01c29b43d9..aee2f12709 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor.go @@ -5,11 +5,13 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) type LedgersProcessor struct { + session db.SessionInterface ledgersQ history.QLedgers ledger xdr.LedgerHeaderHistoryEntry ingestVersion int @@ -20,11 +22,13 @@ type LedgersProcessor struct { } func NewLedgerProcessor( + session db.SessionInterface, ledgerQ history.QLedgers, ledger xdr.LedgerHeaderHistoryEntry, ingestVersion int, ) *LedgersProcessor { return &LedgersProcessor{ + session: session, ledger: ledger, ledgersQ: ledgerQ, ingestVersion: ingestVersion, @@ -45,29 +49,14 @@ func (p *LedgersProcessor) ProcessTransaction(ctx context.Context, transaction i } func (p *LedgersProcessor) Commit(ctx context.Context) error { - rowsAffected, err := p.ledgersQ.InsertLedger(ctx, - p.ledger, - p.successTxCount, - p.failedTxCount, - p.opCount, - p.txSetOpCount, - p.ingestVersion, - ) - + batch := p.ledgersQ.NewLedgerBatchInsertBuilder() + err := batch.Add(p.ledger, p.successTxCount, p.failedTxCount, p.opCount, p.txSetOpCount, p.ingestVersion) if err != nil { return errors.Wrap(err, "Could not insert ledger") } - sequence := uint32(p.ledger.Header.LedgerSeq) - - if rowsAffected != 1 { - log.WithField("rowsAffected", rowsAffected). - WithField("sequence", sequence). - Error("Invalid number of rows affected when ingesting new ledger") - return errors.Errorf( - "0 rows affected when ingesting new ledger: %v", - sequence, - ) + if err = batch.Exec(ctx, p.session); err != nil { + return errors.Wrap(err, "Could not commit ledger") } return nil diff --git a/services/horizon/internal/ingest/processors/ledgers_processor_test.go b/services/horizon/internal/ingest/processors/ledgers_processor_test.go index 05bd2c3c3b..9cbd2c8643 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor_test.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor_test.go @@ -8,6 +8,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" "github.com/stretchr/testify/mock" @@ -17,6 +18,7 @@ import ( type LedgersProcessorTestSuiteLedger struct { suite.Suite processor *LedgersProcessor + mockSession *db.MockSession mockQ *history.MockQLedgers header xdr.LedgerHeaderHistoryEntry successCount int @@ -85,7 +87,9 @@ func (s *LedgersProcessorTestSuiteLedger) SetupTest() { LedgerSeq: xdr.Uint32(20), }, } + s.processor = NewLedgerProcessor( + s.mockSession, s.mockQ, s.header, s.ingestVersion, @@ -109,16 +113,23 @@ func (s *LedgersProcessorTestSuiteLedger) TearDownTest() { func (s *LedgersProcessorTestSuiteLedger) TestInsertLedgerSucceeds() { ctx := context.Background() - s.mockQ.On( - "InsertLedger", - ctx, + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} + s.mockQ.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) + + mockBatchInsertBuilder.On( + "Add", s.header, s.successCount, s.failedCount, s.opCount, s.txSetOpCount, s.ingestVersion, - ).Return(int64(1), nil) + ).Return(nil) + mockBatchInsertBuilder.On( + "Exec", + ctx, + s.mockSession, + ).Return(nil) for _, tx := range s.txs { err := s.processor.ProcessTransaction(ctx, tx) @@ -130,37 +141,21 @@ func (s *LedgersProcessorTestSuiteLedger) TestInsertLedgerSucceeds() { } func (s *LedgersProcessorTestSuiteLedger) TestInsertLedgerReturnsError() { - ctx := context.Background() - s.mockQ.On( - "InsertLedger", - ctx, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(int64(0), errors.New("transient error")) - - err := s.processor.Commit(ctx) - s.Assert().Error(err) - s.Assert().EqualError(err, "Could not insert ledger: transient error") -} + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} + s.mockQ.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) -func (s *LedgersProcessorTestSuiteLedger) TestInsertLedgerNoRowsAffected() { - ctx := context.Background() - s.mockQ.On( - "InsertLedger", - ctx, + mockBatchInsertBuilder.On( + "Add", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, - ).Return(int64(0), nil) + ).Return(errors.New("transient error")) + ctx := context.Background() err := s.processor.Commit(ctx) s.Assert().Error(err) - s.Assert().EqualError(err, "0 rows affected when ingesting new ledger: 20") + s.Assert().EqualError(err, "Could not insert ledger: transient error") } diff --git a/services/horizon/internal/middleware_test.go b/services/horizon/internal/middleware_test.go index 08b90465f3..6dd1a46681 100644 --- a/services/horizon/internal/middleware_test.go +++ b/services/horizon/internal/middleware_test.go @@ -284,7 +284,10 @@ func TestStateMiddleware(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { stateMiddleware.NoStateVerification = testCase.noStateVerification tt.Assert.NoError(q.UpdateExpStateInvalid(context.Background(), testCase.stateInvalid)) - _, err = q.InsertLedger(context.Background(), xdr.LedgerHeaderHistoryEntry{ + + tt.Assert.NoError(q.Begin()) + ledgerBatch := q.NewLedgerBatchInsertBuilder() + err = ledgerBatch.Add(xdr.LedgerHeaderHistoryEntry{ Hash: xdr.Hash{byte(i)}, Header: xdr.LedgerHeader{ LedgerSeq: testCase.latestHistoryLedger, @@ -292,6 +295,9 @@ func TestStateMiddleware(t *testing.T) { }, }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) + tt.Assert.NoError(ledgerBatch.Exec(tt.Ctx, q)) + tt.Assert.NoError(q.Commit()) + tt.Assert.NoError(q.UpdateLastLedgerIngest(context.Background(), testCase.lastIngestedLedger)) tt.Assert.NoError(q.UpdateIngestVersion(context.Background(), testCase.ingestionVersion))