From 54794bfd913b96df2e1f4aaedaab04a97a076e2a Mon Sep 17 00:00:00 2001 From: urvisavla Date: Fri, 17 Nov 2023 13:05:57 -0800 Subject: [PATCH 1/4] services/horizon: Use COPY for inserting into trust_lines table (#5113) --- services/horizon/internal/db2/history/main.go | 1 + .../db2/history/mock_q_trust_lines.go | 5 + .../mock_trust_lines_batch_insert_builder.go | 21 ++ .../trust_lines_batch_insert_builder.go | 39 +++ .../internal/ingest/processor_runner_test.go | 8 + .../processors/trust_lines_processor.go | 27 ++- .../processors/trust_lines_processor_test.go | 228 ++++++++---------- 7 files changed, 202 insertions(+), 127 deletions(-) create mode 100644 services/horizon/internal/db2/history/mock_trust_lines_batch_insert_builder.go create mode 100644 services/horizon/internal/db2/history/trust_lines_batch_insert_builder.go diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 850b8bef3c..7c05f2dfa6 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -846,6 +846,7 @@ type QTrustLines interface { GetTrustLinesByKeys(ctx context.Context, ledgerKeys []string) ([]TrustLine, error) UpsertTrustLines(ctx context.Context, trustlines []TrustLine) error RemoveTrustLines(ctx context.Context, ledgerKeys []string) (int64, error) + NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder } func (q *Q) NewAccountSignersBatchInsertBuilder() AccountSignersBatchInsertBuilder { diff --git a/services/horizon/internal/db2/history/mock_q_trust_lines.go b/services/horizon/internal/db2/history/mock_q_trust_lines.go index f6b5b53017..a074b0e7ef 100644 --- a/services/horizon/internal/db2/history/mock_q_trust_lines.go +++ b/services/horizon/internal/db2/history/mock_q_trust_lines.go @@ -25,3 +25,8 @@ func (m *MockQTrustLines) RemoveTrustLines(ctx context.Context, ledgerKeys []str a := m.Called(ctx, ledgerKeys) return a.Get(0).(int64), a.Error(1) } + +func (m *MockQTrustLines) NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder { + a := m.Called() + return a.Get(0).(TrustLinesBatchInsertBuilder) +} diff --git a/services/horizon/internal/db2/history/mock_trust_lines_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_trust_lines_batch_insert_builder.go new file mode 100644 index 0000000000..38e1b41db0 --- /dev/null +++ b/services/horizon/internal/db2/history/mock_trust_lines_batch_insert_builder.go @@ -0,0 +1,21 @@ +package history + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +type MockTrustLinesBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockTrustLinesBatchInsertBuilder) Add(line TrustLine) error { + a := m.Called(line) + return a.Error(0) +} + +func (m *MockTrustLinesBatchInsertBuilder) Exec(ctx context.Context) error { + a := m.Called(ctx) + return a.Error(0) +} diff --git a/services/horizon/internal/db2/history/trust_lines_batch_insert_builder.go b/services/horizon/internal/db2/history/trust_lines_batch_insert_builder.go new file mode 100644 index 0000000000..2c77469775 --- /dev/null +++ b/services/horizon/internal/db2/history/trust_lines_batch_insert_builder.go @@ -0,0 +1,39 @@ +package history + +import ( + "context" + + "github.com/stellar/go/support/db" +) + +// TrustLinesBatchInsertBuilder is used to insert trustlines into the trust_lines table +type TrustLinesBatchInsertBuilder interface { + Add(line TrustLine) error + Exec(ctx context.Context) error +} + +// trustLinesBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder +type trustLinesBatchInsertBuilder struct { + session db.SessionInterface + builder db.FastBatchInsertBuilder + table string +} + +// NewTrustLinesBatchInsertBuilder constructs a new TrustLinesBatchInsertBuilder instance +func (q *Q) NewTrustLinesBatchInsertBuilder() TrustLinesBatchInsertBuilder { + return &trustLinesBatchInsertBuilder{ + session: q, + builder: db.FastBatchInsertBuilder{}, + table: "trust_lines", + } +} + +// Add adds a new trustline to the batch +func (i *trustLinesBatchInsertBuilder) Add(line TrustLine) error { + return i.builder.RowStruct(line) +} + +// Exec writes the batch of trust lines to the database. +func (i *trustLinesBatchInsertBuilder) Exec(ctx context.Context) error { + return i.builder.Exec(ctx, i.session, i.table) +} diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index b932a05129..7dfea07193 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -523,10 +523,18 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec q.MockQOffers.On("NewOffersBatchInsertBuilder"). Return(mockOfferBatchInsertBuilder).Twice() + mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{} + if mockExec { + mockTrustLinesBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + } + q.MockQTrustLines.On("NewTrustLinesBatchInsertBuilder"). + Return(mockTrustLinesBatchInsertBuilder) + return []interface{}{mockAccountSignersBatchInsertBuilder, mockClaimableBalanceBatchInsertBuilder, mockClaimableBalanceClaimantBatchInsertBuilder, mockLiquidityPoolBatchInsertBuilder, mockOfferBatchInsertBuilder, + mockTrustLinesBatchInsertBuilder, } } diff --git a/services/horizon/internal/ingest/processors/trust_lines_processor.go b/services/horizon/internal/ingest/processors/trust_lines_processor.go index f8c9ca9245..3152ef0db7 100644 --- a/services/horizon/internal/ingest/processors/trust_lines_processor.go +++ b/services/horizon/internal/ingest/processors/trust_lines_processor.go @@ -12,7 +12,8 @@ import ( type TrustLinesProcessor struct { trustLinesQ history.QTrustLines - cache *ingest.ChangeCompactor + cache *ingest.ChangeCompactor + batchInsertBuilder history.TrustLinesBatchInsertBuilder } func NewTrustLinesProcessor(trustLinesQ history.QTrustLines) *TrustLinesProcessor { @@ -23,6 +24,7 @@ func NewTrustLinesProcessor(trustLinesQ history.QTrustLines) *TrustLinesProcesso func (p *TrustLinesProcessor) reset() { p.cache = ingest.NewChangeCompactor() + p.batchInsertBuilder = p.trustLinesQ.NewTrustLinesBatchInsertBuilder() } func (p *TrustLinesProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { @@ -40,7 +42,6 @@ func (p *TrustLinesProcessor) ProcessChange(ctx context.Context, change ingest.C if err != nil { return errors.Wrap(err, "error in Commit") } - p.reset() } return nil @@ -97,18 +98,31 @@ func xdrToTrustline(ledgerEntry xdr.LedgerEntry) (history.TrustLine, error) { } func (p *TrustLinesProcessor) Commit(ctx context.Context) error { + defer p.reset() + var batchUpsertTrustLines []history.TrustLine var batchRemoveTrustLineKeys []string changes := p.cache.GetChanges() for _, change := range changes { switch { - case change.Post != nil: - tl, err := xdrToTrustline(*change.Post) + case change.Pre == nil && change.Post != nil: + // Created + line, err := xdrToTrustline(*change.Post) if err != nil { return errors.Wrap(err, "Error extracting trustline") } + err = p.batchInsertBuilder.Add(line) + if err != nil { + return errors.Wrap(err, "Error adding to TrustLinesBatchInsertBuilder") + } + case change.Pre != nil && change.Post != nil: + // Updated + tl, err := xdrToTrustline(*change.Post) + if err != nil { + return errors.Wrap(err, "Error extracting trustline") + } batchUpsertTrustLines = append(batchUpsertTrustLines, tl) case change.Pre != nil && change.Post == nil: // Removed @@ -124,6 +138,11 @@ func (p *TrustLinesProcessor) Commit(ctx context.Context) error { } } + err := p.batchInsertBuilder.Exec(ctx) + if err != nil { + return errors.Wrap(err, "Error executing TrustLinesBatchInsertBuilder") + } + if len(batchUpsertTrustLines) > 0 { err := p.trustLinesQ.UpsertTrustLines(ctx, batchUpsertTrustLines) if err != nil { diff --git a/services/horizon/internal/ingest/processors/trust_lines_processor_test.go b/services/horizon/internal/ingest/processors/trust_lines_processor_test.go index 2e3d092f5a..07990a0658 100644 --- a/services/horizon/internal/ingest/processors/trust_lines_processor_test.go +++ b/services/horizon/internal/ingest/processors/trust_lines_processor_test.go @@ -24,14 +24,20 @@ func TestTrustLinesProcessorTestSuiteState(t *testing.T) { type TrustLinesProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *TrustLinesProcessor - mockQ *history.MockQTrustLines + ctx context.Context + processor *TrustLinesProcessor + mockQ *history.MockQTrustLines + mockTrustLinesBatchInsertBuilder *history.MockTrustLinesBatchInsertBuilder } func (s *TrustLinesProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQTrustLines{} + + s.mockTrustLinesBatchInsertBuilder = &history.MockTrustLinesBatchInsertBuilder{} + s.mockQ.On("NewTrustLinesBatchInsertBuilder").Return(s.mockTrustLinesBatchInsertBuilder).Twice() + s.mockTrustLinesBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.processor = NewTrustLinesProcessor(s.mockQ) } @@ -85,42 +91,33 @@ func (s *TrustLinesProcessorTestSuiteState) TestCreateTrustLine() { }) s.Assert().NoError(err) - s.mockQ.On( - "UpsertTrustLines", - s.ctx, - mock.AnythingOfType("[]history.TrustLine"), - ).Run(func(args mock.Arguments) { - arg := args.Get(1).([]history.TrustLine) - s.Assert().ElementsMatch( - []history.TrustLine{ - { - AccountID: trustLine.AccountId.Address(), - AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, - AssetIssuer: trustLineIssuer.Address(), - AssetCode: "EUR", - Balance: int64(trustLine.Balance), - LedgerKey: "AAAAAQAAAAAdBJqAD9qPq+j2nRDdjdp5KVoUh8riPkNO9ato7BNs8wAAAAFFVVIAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", - Limit: int64(trustLine.Limit), - LiquidityPoolID: "", - BuyingLiabilities: int64(trustLine.Liabilities().Buying), - SellingLiabilities: int64(trustLine.Liabilities().Selling), - Flags: uint32(trustLine.Flags), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - Sponsor: null.String{}, - }, - { - AccountID: poolShareTrustLine.AccountId.Address(), - AssetType: xdr.AssetTypeAssetTypePoolShare, - Balance: int64(poolShareTrustLine.Balance), - LedgerKey: "AAAAAQAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWgAAAAMBAgMEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", - Limit: int64(poolShareTrustLine.Limit), - LiquidityPoolID: "0102030400000000000000000000000000000000000000000000000000000000", - Flags: uint32(poolShareTrustLine.Flags), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - }, - }, - arg) + s.mockTrustLinesBatchInsertBuilder.On("Add", history.TrustLine{ + AccountID: trustLine.AccountId.Address(), + AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "EUR", + Balance: int64(trustLine.Balance), + LedgerKey: "AAAAAQAAAAAdBJqAD9qPq+j2nRDdjdp5KVoUh8riPkNO9ato7BNs8wAAAAFFVVIAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", + Limit: int64(trustLine.Limit), + LiquidityPoolID: "", + BuyingLiabilities: int64(trustLine.Liabilities().Buying), + SellingLiabilities: int64(trustLine.Liabilities().Selling), + Flags: uint32(trustLine.Flags), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.String{}, }).Return(nil).Once() + + s.mockTrustLinesBatchInsertBuilder.On("Add", history.TrustLine{ + AccountID: poolShareTrustLine.AccountId.Address(), + AssetType: xdr.AssetTypeAssetTypePoolShare, + Balance: int64(poolShareTrustLine.Balance), + LedgerKey: "AAAAAQAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWgAAAAMBAgMEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", + Limit: int64(poolShareTrustLine.Limit), + LiquidityPoolID: "0102030400000000000000000000000000000000000000000000000000000000", + Flags: uint32(poolShareTrustLine.Flags), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }).Return(nil).Once() + } func (s *TrustLinesProcessorTestSuiteState) TestCreateTrustLineUnauthorized() { @@ -142,27 +139,21 @@ func (s *TrustLinesProcessorTestSuiteState) TestCreateTrustLineUnauthorized() { }) s.Assert().NoError(err) - s.mockQ.On( - "UpsertTrustLines", - s.ctx, - []history.TrustLine{ - { - AccountID: trustLine.AccountId.Address(), - AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, - AssetIssuer: trustLineIssuer.Address(), - AssetCode: "EUR", - Balance: int64(trustLine.Balance), - LedgerKey: "AAAAAQAAAAAdBJqAD9qPq+j2nRDdjdp5KVoUh8riPkNO9ato7BNs8wAAAAFFVVIAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", - Limit: int64(trustLine.Limit), - LiquidityPoolID: "", - BuyingLiabilities: int64(trustLine.Liabilities().Buying), - SellingLiabilities: int64(trustLine.Liabilities().Selling), - Flags: uint32(trustLine.Flags), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - Sponsor: null.String{}, - }, - }, - ).Return(nil).Once() + s.mockTrustLinesBatchInsertBuilder.On("Add", history.TrustLine{ + AccountID: trustLine.AccountId.Address(), + AssetType: xdr.AssetTypeAssetTypeCreditAlphanum4, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "EUR", + Balance: int64(trustLine.Balance), + LedgerKey: "AAAAAQAAAAAdBJqAD9qPq+j2nRDdjdp5KVoUh8riPkNO9ato7BNs8wAAAAFFVVIAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", + Limit: int64(trustLine.Limit), + LiquidityPoolID: "", + BuyingLiabilities: int64(trustLine.Liabilities().Buying), + SellingLiabilities: int64(trustLine.Liabilities().Selling), + Flags: uint32(trustLine.Flags), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.String{}, + }).Return(nil).Once() } func TestTrustLinesProcessorTestSuiteLedger(t *testing.T) { @@ -171,14 +162,20 @@ func TestTrustLinesProcessorTestSuiteLedger(t *testing.T) { type TrustLinesProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *TrustLinesProcessor - mockQ *history.MockQTrustLines + ctx context.Context + processor *TrustLinesProcessor + mockQ *history.MockQTrustLines + mockTrustLinesBatchInsertBuilder *history.MockTrustLinesBatchInsertBuilder } func (s *TrustLinesProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQTrustLines{} + + s.mockTrustLinesBatchInsertBuilder = &history.MockTrustLinesBatchInsertBuilder{} + s.mockQ.On("NewTrustLinesBatchInsertBuilder").Return(s.mockTrustLinesBatchInsertBuilder).Twice() + s.mockTrustLinesBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.processor = NewTrustLinesProcessor(s.mockQ) } @@ -298,47 +295,36 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestInsertTrustLine() { }) s.Assert().NoError(err) - s.mockQ.On( - "UpsertTrustLines", - s.ctx, - mock.AnythingOfType("[]history.TrustLine"), - ).Run(func(args mock.Arguments) { - arg := args.Get(1).([]history.TrustLine) - s.Assert().ElementsMatch( - []history.TrustLine{ - { - AccountID: updatedTrustLine.AccountId.Address(), - AssetType: updatedTrustLine.Asset.Type, - AssetIssuer: trustLineIssuer.Address(), - AssetCode: "EUR", - Balance: int64(updatedTrustLine.Balance), - LedgerKey: "AAAAAQAAAAAdBJqAD9qPq+j2nRDdjdp5KVoUh8riPkNO9ato7BNs8wAAAAFFVVIAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", - Limit: int64(updatedTrustLine.Limit), - LiquidityPoolID: "", - BuyingLiabilities: int64(updatedTrustLine.Liabilities().Buying), - SellingLiabilities: int64(updatedTrustLine.Liabilities().Selling), - Flags: uint32(updatedTrustLine.Flags), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - Sponsor: null.String{}, - }, - { - AccountID: updatedUnauthorizedTrustline.AccountId.Address(), - AssetType: updatedUnauthorizedTrustline.Asset.Type, - AssetIssuer: trustLineIssuer.Address(), - AssetCode: "USD", - Balance: int64(updatedUnauthorizedTrustline.Balance), - LedgerKey: "AAAAAQAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWgAAAAFVU0QAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", - Limit: int64(updatedUnauthorizedTrustline.Limit), - LiquidityPoolID: "", - BuyingLiabilities: int64(updatedUnauthorizedTrustline.Liabilities().Buying), - SellingLiabilities: int64(updatedUnauthorizedTrustline.Liabilities().Selling), - Flags: uint32(updatedUnauthorizedTrustline.Flags), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - Sponsor: null.String{}, - }, - }, - arg, - ) + s.mockTrustLinesBatchInsertBuilder.On("Add", history.TrustLine{ + AccountID: updatedTrustLine.AccountId.Address(), + AssetType: updatedTrustLine.Asset.Type, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "EUR", + Balance: int64(updatedTrustLine.Balance), + LedgerKey: "AAAAAQAAAAAdBJqAD9qPq+j2nRDdjdp5KVoUh8riPkNO9ato7BNs8wAAAAFFVVIAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", + Limit: int64(updatedTrustLine.Limit), + LiquidityPoolID: "", + BuyingLiabilities: int64(updatedTrustLine.Liabilities().Buying), + SellingLiabilities: int64(updatedTrustLine.Liabilities().Selling), + Flags: uint32(updatedTrustLine.Flags), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.String{}, + }).Return(nil).Once() + + s.mockTrustLinesBatchInsertBuilder.On("Add", history.TrustLine{ + AccountID: updatedUnauthorizedTrustline.AccountId.Address(), + AssetType: updatedUnauthorizedTrustline.Asset.Type, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "USD", + Balance: int64(updatedUnauthorizedTrustline.Balance), + LedgerKey: "AAAAAQAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWgAAAAFVU0QAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", + Limit: int64(updatedUnauthorizedTrustline.Limit), + LiquidityPoolID: "", + BuyingLiabilities: int64(updatedUnauthorizedTrustline.Liabilities().Buying), + SellingLiabilities: int64(updatedUnauthorizedTrustline.Liabilities().Selling), + Flags: uint32(updatedUnauthorizedTrustline.Flags), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.String{}, }).Return(nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) } @@ -626,26 +612,22 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestProcessUpgradeChange() { }) s.Assert().NoError(err) - s.mockQ.On( - "UpsertTrustLines", s.ctx, - []history.TrustLine{ - { - AccountID: updatedTrustLine.AccountId.Address(), - AssetType: updatedTrustLine.Asset.Type, - AssetIssuer: trustLineIssuer.Address(), - AssetCode: "EUR", - Balance: int64(updatedTrustLine.Balance), - LedgerKey: "AAAAAQAAAAAdBJqAD9qPq+j2nRDdjdp5KVoUh8riPkNO9ato7BNs8wAAAAFFVVIAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", - Limit: int64(updatedTrustLine.Limit), - LiquidityPoolID: "", - BuyingLiabilities: int64(updatedTrustLine.Liabilities().Buying), - SellingLiabilities: int64(updatedTrustLine.Liabilities().Selling), - Flags: uint32(updatedTrustLine.Flags), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - Sponsor: null.String{}, - }, - }, - ).Return(nil).Once() + s.mockTrustLinesBatchInsertBuilder.On("Add", history.TrustLine{ + AccountID: updatedTrustLine.AccountId.Address(), + AssetType: updatedTrustLine.Asset.Type, + AssetIssuer: trustLineIssuer.Address(), + AssetCode: "EUR", + Balance: int64(updatedTrustLine.Balance), + LedgerKey: "AAAAAQAAAAAdBJqAD9qPq+j2nRDdjdp5KVoUh8riPkNO9ato7BNs8wAAAAFFVVIAAAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3", + Limit: int64(updatedTrustLine.Limit), + LiquidityPoolID: "", + BuyingLiabilities: int64(updatedTrustLine.Liabilities().Buying), + SellingLiabilities: int64(updatedTrustLine.Liabilities().Selling), + Flags: uint32(updatedTrustLine.Flags), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + Sponsor: null.String{}, + }).Return(nil).Once() + s.Assert().NoError(s.processor.Commit(s.ctx)) } From 89f7ea09df641a8340b4b9075512bca75dd9da0d Mon Sep 17 00:00:00 2001 From: urvisavla Date: Sun, 19 Nov 2023 20:21:46 -0800 Subject: [PATCH 2/4] services/horizon: Use COPY for inserting into accounts table (#5115) --- .../history/accounts_batch_insert_builder.go | 39 ++++++ services/horizon/internal/db2/history/main.go | 1 + .../mock_accounts_batch_insert_builder.go | 21 +++ .../internal/db2/history/mock_q_accounts.go | 5 + .../internal/ingest/processor_runner_test.go | 48 ++++--- .../ingest/processors/accounts_processor.go | 23 +++- .../processors/accounts_processor_test.go | 123 ++++++++---------- 7 files changed, 167 insertions(+), 93 deletions(-) create mode 100644 services/horizon/internal/db2/history/accounts_batch_insert_builder.go create mode 100644 services/horizon/internal/db2/history/mock_accounts_batch_insert_builder.go diff --git a/services/horizon/internal/db2/history/accounts_batch_insert_builder.go b/services/horizon/internal/db2/history/accounts_batch_insert_builder.go new file mode 100644 index 0000000000..5e68468094 --- /dev/null +++ b/services/horizon/internal/db2/history/accounts_batch_insert_builder.go @@ -0,0 +1,39 @@ +package history + +import ( + "context" + + "github.com/stellar/go/support/db" +) + +// AccountsBatchInsertBuilder is used to insert accounts into the accounts table +type AccountsBatchInsertBuilder interface { + Add(account AccountEntry) error + Exec(ctx context.Context) error +} + +// AccountsBatchInsertBuilder is a simple wrapper around db.FastBatchInsertBuilder +type accountsBatchInsertBuilder struct { + session db.SessionInterface + builder db.FastBatchInsertBuilder + table string +} + +// NewAccountsBatchInsertBuilder constructs a new AccountsBatchInsertBuilder instance +func (q *Q) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder { + return &accountsBatchInsertBuilder{ + session: q, + builder: db.FastBatchInsertBuilder{}, + table: "accounts", + } +} + +// Add adds a new account to the batch +func (i *accountsBatchInsertBuilder) Add(account AccountEntry) error { + return i.builder.RowStruct(account) +} + +// Exec writes the batch of accounts to the database. +func (i *accountsBatchInsertBuilder) Exec(ctx context.Context) error { + return i.builder.Exec(ctx, i.session, i.table) +} diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 7c05f2dfa6..ab2f69aa3c 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -312,6 +312,7 @@ type QAccounts interface { GetAccountsByIDs(ctx context.Context, ids []string) ([]AccountEntry, error) UpsertAccounts(ctx context.Context, accounts []AccountEntry) error RemoveAccounts(ctx context.Context, accountIDs []string) (int64, error) + NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder } // AccountSigner is a row of data from the `accounts_signers` table diff --git a/services/horizon/internal/db2/history/mock_accounts_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_accounts_batch_insert_builder.go new file mode 100644 index 0000000000..a200a15e15 --- /dev/null +++ b/services/horizon/internal/db2/history/mock_accounts_batch_insert_builder.go @@ -0,0 +1,21 @@ +package history + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +type MockAccountsBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockAccountsBatchInsertBuilder) Add(account AccountEntry) error { + a := m.Called(account) + return a.Error(0) +} + +func (m *MockAccountsBatchInsertBuilder) Exec(ctx context.Context) error { + a := m.Called(ctx) + return a.Error(0) +} diff --git a/services/horizon/internal/db2/history/mock_q_accounts.go b/services/horizon/internal/db2/history/mock_q_accounts.go index 99f793d147..6d60802d17 100644 --- a/services/horizon/internal/db2/history/mock_q_accounts.go +++ b/services/horizon/internal/db2/history/mock_q_accounts.go @@ -25,3 +25,8 @@ func (m *MockQAccounts) RemoveAccounts(ctx context.Context, accountIDs []string) a := m.Called(ctx, accountIDs) return a.Get(0).(int64), a.Error(1) } + +func (m *MockQAccounts) NewAccountsBatchInsertBuilder() AccountsBatchInsertBuilder { + a := m.Called() + return a.Get(0).(AccountsBatchInsertBuilder) +} diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 7dfea07193..7e7838cf43 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -27,17 +27,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { q := &mockDBQ{} - q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{ - { - LastModifiedLedger: 1, - AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", - Balance: int64(1000000000000000000), - SequenceNumber: 0, - SequenceTime: zero.IntFrom(0), - MasterWeight: 1, - }, - }).Return(nil).Once() - batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true) defer mock.AssertExpectationsForObjects(t, batchBuilders...) @@ -49,6 +38,16 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { Sponsor: null.String{}, }).Return(nil).Once() + assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1]) + batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{ + LastModifiedLedger: 1, + AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + Balance: int64(1000000000000000000), + SequenceNumber: 0, + SequenceTime: zero.IntFrom(0), + MasterWeight: 1, + }).Return(nil).Once() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -94,16 +93,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { nil, ).Once() - q.MockQAccounts.On("UpsertAccounts", ctx, []history.AccountEntry{ - { - LastModifiedLedger: 1, - AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", - Balance: int64(1000000000000000000), - SequenceNumber: 0, - MasterWeight: 1, - }, - }).Return(nil).Once() - batchBuilders := mockChangeProcessorBatchBuilders(q, ctx, true) defer mock.AssertExpectationsForObjects(t, batchBuilders...) @@ -114,6 +103,15 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { Weight: 1, }).Return(nil).Once() + assert.IsType(t, &history.MockAccountsBatchInsertBuilder{}, batchBuilders[1]) + batchBuilders[1].(*history.MockAccountsBatchInsertBuilder).On("Add", history.AccountEntry{ + LastModifiedLedger: 1, + AccountID: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + Balance: int64(1000000000000000000), + SequenceNumber: 0, + MasterWeight: 1, + }).Return(nil).Once() + q.MockQAssetStats.On("InsertAssetStats", ctx, []history.ExpAssetStat{}, 100000). Return(nil) @@ -494,6 +492,13 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec q.MockQSigners.On("NewAccountSignersBatchInsertBuilder"). Return(mockAccountSignersBatchInsertBuilder).Twice() + mockAccountsBatchInsertBuilder := &history.MockAccountsBatchInsertBuilder{} + if mockExec { + mockAccountsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + } + q.MockQAccounts.On("NewAccountsBatchInsertBuilder"). + Return(mockAccountsBatchInsertBuilder).Twice() + mockClaimableBalanceClaimantBatchInsertBuilder := &history.MockClaimableBalanceClaimantBatchInsertBuilder{} if mockExec { mockClaimableBalanceClaimantBatchInsertBuilder.On("Exec", ctx). @@ -531,6 +536,7 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec Return(mockTrustLinesBatchInsertBuilder) return []interface{}{mockAccountSignersBatchInsertBuilder, + mockAccountsBatchInsertBuilder, mockClaimableBalanceBatchInsertBuilder, mockClaimableBalanceClaimantBatchInsertBuilder, mockLiquidityPoolBatchInsertBuilder, diff --git a/services/horizon/internal/ingest/processors/accounts_processor.go b/services/horizon/internal/ingest/processors/accounts_processor.go index 8130175ab6..681b7d3847 100644 --- a/services/horizon/internal/ingest/processors/accounts_processor.go +++ b/services/horizon/internal/ingest/processors/accounts_processor.go @@ -13,7 +13,8 @@ import ( type AccountsProcessor struct { accountsQ history.QAccounts - cache *ingest.ChangeCompactor + cache *ingest.ChangeCompactor + batchInsertBuilder history.AccountsBatchInsertBuilder } func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor { @@ -24,6 +25,7 @@ func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor { func (p *AccountsProcessor) reset() { p.cache = ingest.NewChangeCompactor() + p.batchInsertBuilder = p.accountsQ.NewAccountsBatchInsertBuilder() } func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { @@ -41,13 +43,14 @@ func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Cha if err != nil { return errors.Wrap(err, "error in Commit") } - p.reset() } return nil } func (p *AccountsProcessor) Commit(ctx context.Context) error { + defer p.reset() + batchUpsertAccounts := []history.AccountEntry{} removeBatch := []string{} @@ -63,8 +66,15 @@ func (p *AccountsProcessor) Commit(ctx context.Context) error { } switch { - case change.Post != nil: - // Created and updated + case change.Pre == nil && change.Post != nil: + // Created + row := p.ledgerEntryToRow(*change.Post) + err := p.batchInsertBuilder.Add(row) + if err != nil { + return errors.Wrap(err, "Error adding to AccountsBatchInsertBuilder") + } + case change.Pre != nil && change.Post != nil: + // Updated row := p.ledgerEntryToRow(*change.Post) batchUpsertAccounts = append(batchUpsertAccounts, row) case change.Pre != nil && change.Post == nil: @@ -77,6 +87,11 @@ func (p *AccountsProcessor) Commit(ctx context.Context) error { } } + err := p.batchInsertBuilder.Exec(ctx) + if err != nil { + return errors.Wrap(err, "Error executing AccountsBatchInsertBuilder") + } + // Upsert accounts if len(batchUpsertAccounts) > 0 { err := p.accountsQ.UpsertAccounts(ctx, batchUpsertAccounts) diff --git a/services/horizon/internal/ingest/processors/accounts_processor_test.go b/services/horizon/internal/ingest/processors/accounts_processor_test.go index 9b9c98d7b5..26f6af3487 100644 --- a/services/horizon/internal/ingest/processors/accounts_processor_test.go +++ b/services/horizon/internal/ingest/processors/accounts_processor_test.go @@ -19,15 +19,20 @@ func TestAccountsProcessorTestSuiteState(t *testing.T) { type AccountsProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *AccountsProcessor - mockQ *history.MockQAccounts + ctx context.Context + processor *AccountsProcessor + mockQ *history.MockQAccounts + mockAccountsBatchInsertBuilder *history.MockAccountsBatchInsertBuilder } func (s *AccountsProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQAccounts{} + s.mockAccountsBatchInsertBuilder = &history.MockAccountsBatchInsertBuilder{} + s.mockQ.On("NewAccountsBatchInsertBuilder").Return(s.mockAccountsBatchInsertBuilder).Twice() + s.mockAccountsBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.processor = NewAccountsProcessor(s.mockQ) } @@ -42,19 +47,14 @@ func (s *AccountsProcessorTestSuiteState) TestNoEntries() { func (s *AccountsProcessorTestSuiteState) TestCreatesAccounts() { // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On( - "UpsertAccounts", s.ctx, - []history.AccountEntry{ - { - LastModifiedLedger: 123, - AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - MasterWeight: 1, - ThresholdLow: 1, - ThresholdMedium: 1, - ThresholdHigh: 1, - }, - }, - ).Return(nil).Once() + s.mockAccountsBatchInsertBuilder.On("Add", history.AccountEntry{ + LastModifiedLedger: 123, + AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + MasterWeight: 1, + ThresholdLow: 1, + ThresholdMedium: 1, + ThresholdHigh: 1, + }).Return(nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ Type: xdr.LedgerEntryTypeAccount, @@ -79,15 +79,20 @@ func TestAccountsProcessorTestSuiteLedger(t *testing.T) { type AccountsProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *AccountsProcessor - mockQ *history.MockQAccounts + ctx context.Context + processor *AccountsProcessor + mockQ *history.MockQAccounts + mockAccountsBatchInsertBuilder *history.MockAccountsBatchInsertBuilder } func (s *AccountsProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQAccounts{} + s.mockAccountsBatchInsertBuilder = &history.MockAccountsBatchInsertBuilder{} + s.mockQ.On("NewAccountsBatchInsertBuilder").Return(s.mockAccountsBatchInsertBuilder).Twice() + s.mockAccountsBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() + s.processor = NewAccountsProcessor(s.mockQ) } @@ -146,21 +151,15 @@ func (s *AccountsProcessorTestSuiteLedger) TestNewAccount() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On( - "UpsertAccounts", - s.ctx, - []history.AccountEntry{ - { - AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - MasterWeight: 0, - ThresholdLow: 1, - ThresholdMedium: 2, - ThresholdHigh: 3, - HomeDomain: "stellar.org", - LastModifiedLedger: uint32(123), - }, - }, - ).Return(nil).Once() + s.mockAccountsBatchInsertBuilder.On("Add", history.AccountEntry{ + AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + MasterWeight: 0, + ThresholdLow: 1, + ThresholdMedium: 2, + ThresholdHigh: 3, + HomeDomain: "stellar.org", + LastModifiedLedger: uint32(123), + }).Return(nil).Once() } func (s *AccountsProcessorTestSuiteLedger) TestNewAccountUpgrade() { @@ -235,23 +234,17 @@ func (s *AccountsProcessorTestSuiteLedger) TestNewAccountUpgrade() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockQ.On( - "UpsertAccounts", - s.ctx, - []history.AccountEntry{ - { - AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - SequenceLedger: zero.IntFrom(2346), - SequenceTime: zero.IntFrom(1647265534), - MasterWeight: 0, - ThresholdLow: 1, - ThresholdMedium: 2, - ThresholdHigh: 3, - HomeDomain: "stellar.org", - LastModifiedLedger: uint32(123), - }, - }, - ).Return(nil).Once() + s.mockAccountsBatchInsertBuilder.On("Add", history.AccountEntry{ + AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + SequenceLedger: zero.IntFrom(2346), + SequenceTime: zero.IntFrom(1647265534), + MasterWeight: 0, + ThresholdLow: 1, + ThresholdMedium: 2, + ThresholdHigh: 3, + HomeDomain: "stellar.org", + LastModifiedLedger: uint32(123), + }).Return(nil).Once() } func (s *AccountsProcessorTestSuiteLedger) TestRemoveAccount() { @@ -322,23 +315,17 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() { }) s.Assert().NoError(err) - s.mockQ.On( - "UpsertAccounts", - s.ctx, - []history.AccountEntry{ - { - LastModifiedLedger: uint32(lastModifiedLedgerSeq) + 1, - AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - SequenceTime: zero.IntFrom(0), - SequenceLedger: zero.IntFrom(0), - MasterWeight: 0, - ThresholdLow: 1, - ThresholdMedium: 2, - ThresholdHigh: 3, - HomeDomain: "stellar.org", - }, - }, - ).Return(nil).Once() + s.mockAccountsBatchInsertBuilder.On("Add", history.AccountEntry{ + LastModifiedLedger: uint32(lastModifiedLedgerSeq) + 1, + AccountID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + SequenceTime: zero.IntFrom(0), + SequenceLedger: zero.IntFrom(0), + MasterWeight: 0, + ThresholdLow: 1, + ThresholdMedium: 2, + ThresholdHigh: 3, + HomeDomain: "stellar.org", + }).Return(nil).Once() } func (s *AccountsProcessorTestSuiteLedger) TestFeeProcessedBeforeEverythingElse() { From 7d0b9a1f28223c767ef5aaa31b410edb72245ee8 Mon Sep 17 00:00:00 2001 From: urvisavla Date: Sun, 19 Nov 2023 22:43:32 -0800 Subject: [PATCH 3/4] services/horizon: use COPY for inserting into accounts_data table (#5118) --- .../account_data_batch_insert_builder.go | 50 +++++++++++++++++++ .../db2/history/account_data_value.go | 4 +- services/horizon/internal/db2/history/main.go | 1 + .../mock_account_data_batch_insert_builder.go | 21 ++++++++ .../internal/db2/history/mock_q_data.go | 5 ++ .../internal/ingest/processor_runner_test.go | 8 +++ .../processors/account_data_processor.go | 16 ++++-- .../accounts_data_processor_test.go | 29 ++++++++--- 8 files changed, 122 insertions(+), 12 deletions(-) create mode 100644 services/horizon/internal/db2/history/account_data_batch_insert_builder.go create mode 100644 services/horizon/internal/db2/history/mock_account_data_batch_insert_builder.go diff --git a/services/horizon/internal/db2/history/account_data_batch_insert_builder.go b/services/horizon/internal/db2/history/account_data_batch_insert_builder.go new file mode 100644 index 0000000000..75c8ff6124 --- /dev/null +++ b/services/horizon/internal/db2/history/account_data_batch_insert_builder.go @@ -0,0 +1,50 @@ +package history + +import ( + "context" + + "github.com/stellar/go/support/db" +) + +type AccountDataBatchInsertBuilder interface { + Add(data Data) error + Exec(ctx context.Context) error +} + +type accountDataBatchInsertBuilder struct { + session db.SessionInterface + builder db.FastBatchInsertBuilder + table string +} + +func (q *Q) NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder { + return &accountDataBatchInsertBuilder{ + session: q, + builder: db.FastBatchInsertBuilder{}, + table: "accounts_data", + } +} + +// Add adds a new account data to the batch +func (i *accountDataBatchInsertBuilder) Add(data Data) error { + ledgerKey, err := accountDataKeyToString(AccountDataKey{ + AccountID: data.AccountID, + DataName: data.Name, + }) + if err != nil { + return err + } + return i.builder.Row(map[string]interface{}{ + "ledger_key": ledgerKey, + "account_id": data.AccountID, + "name": data.Name, + "value": data.Value, + "last_modified_ledger": data.LastModifiedLedger, + "sponsor": data.Sponsor, + }) +} + +// Exec writes the batch of account data to the database. +func (i *accountDataBatchInsertBuilder) Exec(ctx context.Context) error { + return i.builder.Exec(ctx, i.session, i.table) +} diff --git a/services/horizon/internal/db2/history/account_data_value.go b/services/horizon/internal/db2/history/account_data_value.go index efcd8d319b..693420e034 100644 --- a/services/horizon/internal/db2/history/account_data_value.go +++ b/services/horizon/internal/db2/history/account_data_value.go @@ -22,7 +22,9 @@ func (t *AccountDataValue) Scan(src interface{}) error { // Value implements driver.Valuer func (value AccountDataValue) Value() (driver.Value, error) { - return driver.Value([]uint8(base64.StdEncoding.EncodeToString(value))), nil + // Return string to bypass buggy encoding in pq driver for []byte. + // More info https://github.com/stellar/go/issues/5086#issuecomment-1773215436) + return driver.Value(base64.StdEncoding.EncodeToString(value)), nil } func (value AccountDataValue) Base64() string { diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index ab2f69aa3c..0acae7cb18 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -357,6 +357,7 @@ type QData interface { GetAccountDataByKeys(ctx context.Context, keys []AccountDataKey) ([]Data, error) UpsertAccountData(ctx context.Context, data []Data) error RemoveAccountData(ctx context.Context, keys []AccountDataKey) (int64, error) + NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder } // Asset is a row of data from the `history_assets` table diff --git a/services/horizon/internal/db2/history/mock_account_data_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_account_data_batch_insert_builder.go new file mode 100644 index 0000000000..aa5af10730 --- /dev/null +++ b/services/horizon/internal/db2/history/mock_account_data_batch_insert_builder.go @@ -0,0 +1,21 @@ +package history + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +type MockAccountDataBatchInsertBuilder struct { + mock.Mock +} + +func (m *MockAccountDataBatchInsertBuilder) Add(data Data) error { + a := m.Called(data) + return a.Error(0) +} + +func (m *MockAccountDataBatchInsertBuilder) Exec(ctx context.Context) error { + a := m.Called(ctx) + return a.Error(0) +} diff --git a/services/horizon/internal/db2/history/mock_q_data.go b/services/horizon/internal/db2/history/mock_q_data.go index 3316aaa51b..8d418e896e 100644 --- a/services/horizon/internal/db2/history/mock_q_data.go +++ b/services/horizon/internal/db2/history/mock_q_data.go @@ -30,3 +30,8 @@ func (m *MockQData) RemoveAccountData(ctx context.Context, keys []AccountDataKey a := m.Called(ctx, keys) return a.Get(0).(int64), a.Error(1) } + +func (m *MockQData) NewAccountDataBatchInsertBuilder() AccountDataBatchInsertBuilder { + a := m.Called() + return a.Get(0).(AccountDataBatchInsertBuilder) +} diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 7e7838cf43..fc82ed3cfe 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -528,6 +528,13 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec q.MockQOffers.On("NewOffersBatchInsertBuilder"). Return(mockOfferBatchInsertBuilder).Twice() + mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} + if mockExec { + mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + } + q.MockQData.On("NewAccountDataBatchInsertBuilder"). + Return(mockAccountDataBatchInsertBuilder).Twice() + mockTrustLinesBatchInsertBuilder := &history.MockTrustLinesBatchInsertBuilder{} if mockExec { mockTrustLinesBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -541,6 +548,7 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec mockClaimableBalanceClaimantBatchInsertBuilder, mockLiquidityPoolBatchInsertBuilder, mockOfferBatchInsertBuilder, + mockAccountDataBatchInsertBuilder, mockTrustLinesBatchInsertBuilder, } } diff --git a/services/horizon/internal/ingest/processors/account_data_processor.go b/services/horizon/internal/ingest/processors/account_data_processor.go index dfdbecb43e..f774700fbd 100644 --- a/services/horizon/internal/ingest/processors/account_data_processor.go +++ b/services/horizon/internal/ingest/processors/account_data_processor.go @@ -12,7 +12,8 @@ import ( type AccountDataProcessor struct { dataQ history.QData - cache *ingest.ChangeCompactor + cache *ingest.ChangeCompactor + batchInsertBuilder history.AccountDataBatchInsertBuilder } func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor { @@ -23,6 +24,7 @@ func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor { func (p *AccountDataProcessor) reset() { p.cache = ingest.NewChangeCompactor() + p.batchInsertBuilder = p.dataQ.NewAccountDataBatchInsertBuilder() } func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.Change) error { @@ -41,13 +43,13 @@ func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest. if err != nil { return errors.Wrap(err, "error in Commit") } - p.reset() } return nil } func (p *AccountDataProcessor) Commit(ctx context.Context) error { + defer p.reset() var ( datasToUpsert []history.Data datasToDelete []history.AccountDataKey @@ -57,7 +59,10 @@ func (p *AccountDataProcessor) Commit(ctx context.Context) error { switch { case change.Pre == nil && change.Post != nil: // Created - datasToUpsert = append(datasToUpsert, p.ledgerEntryToRow(change.Post)) + err := p.batchInsertBuilder.Add(p.ledgerEntryToRow(change.Post)) + if err != nil { + return errors.Wrap(err, "Error adding to AccountDataBatchInsertBuilder") + } case change.Pre != nil && change.Post == nil: // Removed data := change.Pre.Data.MustData() @@ -72,6 +77,11 @@ func (p *AccountDataProcessor) Commit(ctx context.Context) error { } } + err := p.batchInsertBuilder.Exec(ctx) + if err != nil { + return errors.Wrap(err, "Error executing AccountDataBatchInsertBuilder") + } + if len(datasToUpsert) > 0 { if err := p.dataQ.UpsertAccountData(ctx, datasToUpsert); err != nil { return errors.Wrap(err, "error executing upsert") diff --git a/services/horizon/internal/ingest/processors/accounts_data_processor_test.go b/services/horizon/internal/ingest/processors/accounts_data_processor_test.go index 86ff1e13de..273cfcca4b 100644 --- a/services/horizon/internal/ingest/processors/accounts_data_processor_test.go +++ b/services/horizon/internal/ingest/processors/accounts_data_processor_test.go @@ -18,15 +18,21 @@ func TestAccountsDataProcessorTestSuiteState(t *testing.T) { type AccountsDataProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *AccountDataProcessor - mockQ *history.MockQData + ctx context.Context + processor *AccountDataProcessor + mockQ *history.MockQData + mockAccountDataBatchInsertBuilder *history.MockAccountDataBatchInsertBuilder } func (s *AccountsDataProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQData{} + s.mockAccountDataBatchInsertBuilder = &history.MockAccountDataBatchInsertBuilder{} + s.mockQ.On("NewAccountDataBatchInsertBuilder"). + Return(s.mockAccountDataBatchInsertBuilder) + s.mockAccountDataBatchInsertBuilder.On("Exec", s.ctx).Return(nil) + s.processor = NewAccountDataProcessor(s.mockQ) } @@ -60,7 +66,7 @@ func (s *AccountsDataProcessorTestSuiteState) TestCreatesAccounts() { Value: history.AccountDataValue(data.DataValue), LastModifiedLedger: uint32(entry.LastModifiedLedgerSeq), } - s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once() + s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ Type: xdr.LedgerEntryTypeData, @@ -76,15 +82,21 @@ func TestAccountsDataProcessorTestSuiteLedger(t *testing.T) { type AccountsDataProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *AccountDataProcessor - mockQ *history.MockQData + ctx context.Context + processor *AccountDataProcessor + mockQ *history.MockQData + mockAccountDataBatchInsertBuilder *history.MockAccountDataBatchInsertBuilder } func (s *AccountsDataProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQData{} + s.mockAccountDataBatchInsertBuilder = &history.MockAccountDataBatchInsertBuilder{} + s.mockQ.On("NewAccountDataBatchInsertBuilder"). + Return(s.mockAccountDataBatchInsertBuilder) + s.mockAccountDataBatchInsertBuilder.On("Exec", s.ctx).Return(nil) + s.processor = NewAccountDataProcessor(s.mockQ) } @@ -152,7 +164,7 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestNewAccountData() { Value: history.AccountDataValue(updatedData.DataValue), LastModifiedLedger: uint32(updatedEntry.LastModifiedLedgerSeq), } - s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once() + s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once() } func (s *AccountsDataProcessorTestSuiteLedger) TestUpdateAccountData() { @@ -196,6 +208,7 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestUpdateAccountData() { Value: history.AccountDataValue(updatedData.DataValue), LastModifiedLedger: uint32(updatedEntry.LastModifiedLedgerSeq), } + s.mockAccountDataBatchInsertBuilder.On("Add", historyData).Return(nil).Once() s.mockQ.On("UpsertAccountData", s.ctx, []history.Data{historyData}).Return(nil).Once() } From 69266de4154fdf4dd5e8f39ada37f642cb421ca1 Mon Sep 17 00:00:00 2001 From: urvisavla Date: Tue, 21 Nov 2023 09:23:27 -0800 Subject: [PATCH 4/4] services/horizon: Fix docker compose Files for Horizon "testnet" and Integration Tests (#5121) --- services/horizon/docker/docker-compose.integration-tests.yml | 3 ++- services/horizon/docker/docker-compose.yml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/services/horizon/docker/docker-compose.integration-tests.yml b/services/horizon/docker/docker-compose.integration-tests.yml index efe27ca802..630e8b6b60 100644 --- a/services/horizon/docker/docker-compose.integration-tests.yml +++ b/services/horizon/docker/docker-compose.integration-tests.yml @@ -14,7 +14,8 @@ services: # Note: Please keep the image pinned to an immutable tag matching the Captive Core version. # This avoid implicit updates which break compatibility between # the Core container and captive core. - image: ${CORE_IMAGE:-chowbao/stellar-core:19.11.1-1357.e38ee728d.focal-sorobanP10} + image: ${CORE_IMAGE:-stellar/stellar-core:19.13.1-1481.3acf6dd26.focal} + depends_on: - core-postgres restart: on-failure diff --git a/services/horizon/docker/docker-compose.yml b/services/horizon/docker/docker-compose.yml index dbaa8994fe..309df946a1 100644 --- a/services/horizon/docker/docker-compose.yml +++ b/services/horizon/docker/docker-compose.yml @@ -2,7 +2,7 @@ version: '3' services: horizon-postgres: platform: linux/amd64 - image: postgres:postgres:12-bullseye + image: postgres:12-bullseye restart: on-failure environment: - POSTGRES_HOST_AUTH_METHOD=trust