From fd46a145d21bb7cdf0240c8a31c2482ffe3fc6b4 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 15 Nov 2023 15:04:10 -0800 Subject: [PATCH 1/9] #5099: changed historyRange and reingestHistoryRange states to send batches of ledgers to processors --- services/horizon/internal/ingest/fsm.go | 283 ------------ .../ingest/fsm_history_range_state.go | 107 +++++ .../fsm_reingest_history_range_state.go | 187 ++++++++ .../internal/ingest/group_processors.go | 41 +- .../internal/ingest/group_processors_test.go | 11 +- .../ingest/ingest_history_range_state_test.go | 402 +++++++++--------- services/horizon/internal/ingest/main.go | 5 +- services/horizon/internal/ingest/main_test.go | 5 + .../internal/ingest/processor_runner.go | 196 ++++++--- .../internal/ingest/processor_runner_test.go | 124 +++++- .../ingest/processors/ledgers_processor.go | 2 +- .../processors/ledgers_processor_test.go | 2 +- .../stats_ledger_transaction_processor.go | 8 + ...stats_ledger_transaction_processor_test.go | 32 +- .../ingest/processors/trades_processor.go | 5 + .../processors/trades_processor_test.go | 3 + 16 files changed, 854 insertions(+), 559 deletions(-) create mode 100644 services/horizon/internal/ingest/fsm_history_range_state.go create mode 100644 services/horizon/internal/ingest/fsm_reingest_history_range_state.go diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 2f9a40783c..4d7640932b 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -12,7 +12,6 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" - "github.com/stellar/go/toid" "github.com/stellar/go/xdr" ) @@ -585,288 +584,6 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string] } } -type historyRangeState struct { - fromLedger uint32 - toLedger uint32 -} - -func (h historyRangeState) String() string { - return fmt.Sprintf( - "historyRange(fromLedger=%d, toLedger=%d)", - h.fromLedger, - h.toLedger, - ) -} - -func (historyRangeState) GetState() State { - return HistoryRange -} - -// historyRangeState is used when catching up history data -func (h historyRangeState) run(s *system) (transition, error) { - if h.fromLedger == 0 || h.toLedger == 0 || - h.fromLedger > h.toLedger { - return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) - } - - err := s.maybePrepareRange(s.ctx, h.fromLedger) - if err != nil { - return start(), err - } - - if err = s.historyQ.Begin(s.ctx); err != nil { - return start(), errors.Wrap(err, "Error starting a transaction") - } - defer s.historyQ.Rollback() - - // acquire distributed lock so no one else can perform ingestion operations. - if _, err = s.historyQ.GetLastLedgerIngest(s.ctx); err != nil { - return start(), errors.Wrap(err, getLastIngestedErrMsg) - } - - lastHistoryLedger, err := s.historyQ.GetLatestHistoryLedger(s.ctx) - if err != nil { - return start(), errors.Wrap(err, "could not get latest history ledger") - } - - // We should be ingesting the ledger which occurs after - // lastHistoryLedger. Otherwise, some other horizon node has - // already completed the ingest history range operation and - // we should go back to the init state - if lastHistoryLedger != h.fromLedger-1 { - return start(), nil - } - - for cur := h.fromLedger; cur <= h.toLedger; cur++ { - var ledgerCloseMeta xdr.LedgerCloseMeta - - log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...") - startTime := time.Now() - - ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) - if err != nil { - // Commit finished work in case of ledger backend error. - commitErr := s.historyQ.Commit() - if commitErr != nil { - log.WithError(commitErr).Error("Error committing partial range results") - } else { - log.Info("Committed partial range results") - } - return start(), errors.Wrap(err, "error getting ledger") - } - - log.WithFields(logpkg.F{ - "sequence": cur, - "duration": time.Since(startTime).Seconds(), - }).Info("Ledger returned from the backend") - - if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil { - return start(), err - } - } - - if err = s.historyQ.Commit(); err != nil { - return start(), errors.Wrap(err, commitErrMsg) - } - - return start(), nil -} - -func runTransactionProcessorsOnLedger(s *system, ledger xdr.LedgerCloseMeta) error { - log.WithFields(logpkg.F{ - "sequence": ledger.LedgerSequence(), - "state": false, - "ledger": true, - "commit": false, - }).Info("Processing ledger") - startTime := time.Now() - - ledgerTransactionStats, _, tradeStats, err := s.runner.RunTransactionProcessorsOnLedger(ledger) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", ledger.LedgerSequence())) - } - - log. - WithFields(ledgerTransactionStats.Map()). - WithFields(tradeStats.Map()). - WithFields(logpkg.F{ - "sequence": ledger.LedgerSequence(), - "duration": time.Since(startTime).Seconds(), - "state": false, - "ledger": true, - "commit": false, - }). - Info("Processed ledger") - return nil -} - -type reingestHistoryRangeState struct { - fromLedger uint32 - toLedger uint32 - force bool -} - -func (h reingestHistoryRangeState) String() string { - return fmt.Sprintf( - "reingestHistoryRange(fromLedger=%d, toLedger=%d, force=%t)", - h.fromLedger, - h.toLedger, - h.force, - ) -} - -func (reingestHistoryRangeState) GetState() State { - return ReingestHistoryRange -} - -func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error { - if s.historyQ.GetTx() == nil { - return errors.New("expected transaction to be present") - } - - // Clear history data before ingesting - used in `reingest range` command. - start, end, err := toid.LedgerRangeInclusive( - int32(fromLedger), - int32(toLedger), - ) - if err != nil { - return errors.Wrap(err, "Invalid range") - } - - err = s.historyQ.DeleteRangeAll(s.ctx, start, end) - if err != nil { - return errors.Wrap(err, "error in DeleteRangeAll") - } - - for cur := fromLedger; cur <= toLedger; cur++ { - var ledgerCloseMeta xdr.LedgerCloseMeta - ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) - if err != nil { - return errors.Wrap(err, "error getting ledger") - } - - if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil { - return err - } - } - - return nil -} - -func (h reingestHistoryRangeState) prepareRange(s *system) (transition, error) { - log.WithFields(logpkg.F{ - "from": h.fromLedger, - "to": h.toLedger, - }).Info("Preparing ledger backend to retrieve range") - startTime := time.Now() - - err := s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(h.fromLedger, h.toLedger)) - if err != nil { - return stop(), errors.Wrap(err, "error preparing range") - } - - log.WithFields(logpkg.F{ - "from": h.fromLedger, - "to": h.toLedger, - "duration": time.Since(startTime).Seconds(), - }).Info("Range ready") - - return transition{}, nil -} - -// reingestHistoryRangeState is used as a command to reingest historical data -func (h reingestHistoryRangeState) run(s *system) (transition, error) { - if h.fromLedger == 0 || h.toLedger == 0 || - h.fromLedger > h.toLedger { - return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) - } - - if h.fromLedger == 1 { - log.Warn("Ledger 1 is pregenerated and not available, starting from ledger 2.") - h.fromLedger = 2 - } - - var startTime time.Time - - if h.force { - if t, err := h.prepareRange(s); err != nil { - return t, err - } - startTime = time.Now() - - if err := s.historyQ.Begin(s.ctx); err != nil { - return stop(), errors.Wrap(err, "Error starting a transaction") - } - defer s.historyQ.Rollback() - - // acquire distributed lock so no one else can perform ingestion operations. - if _, err := s.historyQ.GetLastLedgerIngest(s.ctx); err != nil { - return stop(), errors.Wrap(err, getLastIngestedErrMsg) - } - - if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil { - return stop(), err - } - - if err := s.historyQ.Commit(); err != nil { - return stop(), errors.Wrap(err, commitErrMsg) - } - } else { - lastIngestedLedger, err := s.historyQ.GetLastLedgerIngestNonBlocking(s.ctx) - if err != nil { - return stop(), errors.Wrap(err, getLastIngestedErrMsg) - } - - if lastIngestedLedger > 0 && h.toLedger >= lastIngestedLedger { - return stop(), ErrReingestRangeConflict{lastIngestedLedger} - } - - // Only prepare the range after checking the bounds to enable an early error return - var t transition - if t, err = h.prepareRange(s); err != nil { - return t, err - } - startTime = time.Now() - - for cur := h.fromLedger; cur <= h.toLedger; cur++ { - err = func(ledger uint32) error { - if e := s.historyQ.Begin(s.ctx); e != nil { - return errors.Wrap(e, "Error starting a transaction") - } - defer s.historyQ.Rollback() - - // ingest each ledger in a separate transaction to prevent deadlocks - // when acquiring ShareLocks from multiple parallel reingest range processes - if e := h.ingestRange(s, ledger, ledger); e != nil { - return e - } - - if e := s.historyQ.Commit(); e != nil { - return errors.Wrap(e, commitErrMsg) - } - - return nil - }(cur) - if err != nil { - return stop(), err - } - } - } - - err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter) - if err != nil { - return stop(), errors.Wrap(err, "Error rebuilding trade aggregations") - } - - log.WithFields(logpkg.F{ - "from": h.fromLedger, - "to": h.toLedger, - "duration": time.Since(startTime).Seconds(), - }).Info("Reingestion done") - - return stop(), nil -} - type waitForCheckpointState struct{} func (waitForCheckpointState) String() string { diff --git a/services/horizon/internal/ingest/fsm_history_range_state.go b/services/horizon/internal/ingest/fsm_history_range_state.go new file mode 100644 index 0000000000..321da34c2c --- /dev/null +++ b/services/horizon/internal/ingest/fsm_history_range_state.go @@ -0,0 +1,107 @@ +package ingest + +import ( + "fmt" + "time" + + "github.com/stellar/go/support/errors" + logpkg "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" +) + +type historyRangeState struct { + fromLedger uint32 + toLedger uint32 +} + +func (h historyRangeState) String() string { + return fmt.Sprintf( + "historyRange(fromLedger=%d, toLedger=%d)", + h.fromLedger, + h.toLedger, + ) +} + +func (historyRangeState) GetState() State { + return HistoryRange +} + +// historyRangeState is used when catching up history data +func (h historyRangeState) run(s *system) (transition, error) { + if h.fromLedger == 0 || h.toLedger == 0 || + h.fromLedger > h.toLedger { + return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) + } + + err := s.maybePrepareRange(s.ctx, h.fromLedger) + if err != nil { + return start(), err + } + + if err = s.historyQ.Begin(s.ctx); err != nil { + return start(), errors.Wrap(err, "Error starting a transaction") + } + defer s.historyQ.Rollback() + + // acquire distributed lock so no one else can perform ingestion operations. + if _, err = s.historyQ.GetLastLedgerIngest(s.ctx); err != nil { + return start(), errors.Wrap(err, getLastIngestedErrMsg) + } + + lastHistoryLedger, err := s.historyQ.GetLatestHistoryLedger(s.ctx) + if err != nil { + return start(), errors.Wrap(err, "could not get latest history ledger") + } + + // We should be ingesting the ledger which occurs after + // lastHistoryLedger. Otherwise, some other horizon node has + // already completed the ingest history range operation and + // we should go back to the init state + if lastHistoryLedger != h.fromLedger-1 { + return start(), nil + } + + ledgers := []xdr.LedgerCloseMeta{} + for cur := h.fromLedger; cur <= h.toLedger; cur++ { + var ledgerCloseMeta xdr.LedgerCloseMeta + + log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...") + startTime := time.Now() + + ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) + if err != nil { + // Commit finished work in case of ledger backend error. + commitErr := s.historyQ.Commit() + if commitErr != nil { + log.WithError(commitErr).Error("Error committing partial range results") + } else { + log.Info("Committed partial range results") + } + return start(), errors.Wrap(err, "error getting ledger") + } + + log.WithFields(logpkg.F{ + "sequence": cur, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") + ledgers = append(ledgers, ledgerCloseMeta) + + if s.maxLedgerPerFlush > 0 && len(ledgers)%int(s.maxLedgerPerFlush) == 0 { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) + } + ledgers = []xdr.LedgerCloseMeta{} + } + } + + if len(ledgers) > 0 { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) + } + } + if err = s.historyQ.Commit(); err != nil { + return start(), errors.Wrap(err, commitErrMsg) + } + + return start(), nil +} diff --git a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go new file mode 100644 index 0000000000..eaa4402687 --- /dev/null +++ b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go @@ -0,0 +1,187 @@ +package ingest + +import ( + "fmt" + "time" + + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/errors" + logpkg "github.com/stellar/go/support/log" + "github.com/stellar/go/toid" + "github.com/stellar/go/xdr" +) + +type reingestHistoryRangeState struct { + fromLedger uint32 + toLedger uint32 + force bool +} + +func (h reingestHistoryRangeState) String() string { + return fmt.Sprintf( + "reingestHistoryRange(fromLedger=%d, toLedger=%d, force=%t)", + h.fromLedger, + h.toLedger, + h.force, + ) +} + +func (reingestHistoryRangeState) GetState() State { + return ReingestHistoryRange +} + +func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error { + if s.historyQ.GetTx() == nil { + return errors.New("expected transaction to be present") + } + + // Clear history data before ingesting - used in `reingest range` command. + start, end, err := toid.LedgerRangeInclusive( + int32(fromLedger), + int32(toLedger), + ) + if err != nil { + return errors.Wrap(err, "Invalid range") + } + + err = s.historyQ.DeleteRangeAll(s.ctx, start, end) + if err != nil { + return errors.Wrap(err, "error in DeleteRangeAll") + } + + ledgers := []xdr.LedgerCloseMeta{} + for cur := fromLedger; cur <= toLedger; cur++ { + var ledgerCloseMeta xdr.LedgerCloseMeta + + log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...") + startTime := time.Now() + ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) + + if err != nil { + return errors.Wrap(err, "error getting ledger") + } + + log.WithFields(logpkg.F{ + "sequence": cur, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") + + ledgers = append(ledgers, ledgerCloseMeta) + + if s.maxLedgerPerFlush > 0 && len(ledgers)%int(s.maxLedgerPerFlush) == 0 { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) + } + ledgers = []xdr.LedgerCloseMeta{} + } + } + + if len(ledgers) > 0 { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) + } + } + + return nil +} + +func (h reingestHistoryRangeState) prepareRange(s *system) (transition, error) { + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + }).Info("Preparing ledger backend to retrieve range") + startTime := time.Now() + + err := s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(h.fromLedger, h.toLedger)) + if err != nil { + return stop(), errors.Wrap(err, "error preparing range") + } + + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + "duration": time.Since(startTime).Seconds(), + }).Info("Range ready") + + return transition{}, nil +} + +// reingestHistoryRangeState is used as a command to reingest historical data +func (h reingestHistoryRangeState) run(s *system) (transition, error) { + if h.fromLedger == 0 || h.toLedger == 0 || + h.fromLedger > h.toLedger { + return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) + } + + if h.fromLedger == 1 { + log.Warn("Ledger 1 is pregenerated and not available, starting from ledger 2.") + h.fromLedger = 2 + } + + startTime := time.Now() + + if h.force { + if t, err := h.prepareRange(s); err != nil { + return t, err + } + + if err := s.historyQ.Begin(s.ctx); err != nil { + return stop(), errors.Wrap(err, "Error starting a transaction") + } + defer s.historyQ.Rollback() + + // acquire distributed lock so no one else can perform ingestion operations. + if _, err := s.historyQ.GetLastLedgerIngest(s.ctx); err != nil { + return stop(), errors.Wrap(err, getLastIngestedErrMsg) + } + + if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil { + return stop(), err + } + + if err := s.historyQ.Commit(); err != nil { + return stop(), errors.Wrap(err, commitErrMsg) + } + } else { + lastIngestedLedger, err := s.historyQ.GetLastLedgerIngestNonBlocking(s.ctx) + if err != nil { + return stop(), errors.Wrap(err, getLastIngestedErrMsg) + } + + if lastIngestedLedger > 0 && h.toLedger >= lastIngestedLedger { + return stop(), ErrReingestRangeConflict{lastIngestedLedger} + } + + // Only prepare the range after checking the bounds to enable an early error return + var t transition + if t, err = h.prepareRange(s); err != nil { + return t, err + } + + if err := s.historyQ.Begin(s.ctx); err != nil { + return stop(), errors.Wrap(err, "Error starting a transaction") + } + defer s.historyQ.Rollback() + + if e := h.ingestRange(s, h.fromLedger, h.toLedger); e != nil { + return stop(), e + } + + if e := s.historyQ.Commit(); e != nil { + return stop(), errors.Wrap(e, commitErrMsg) + } + } + + err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter) + if err != nil { + return stop(), errors.Wrap(err, "Error rebuilding trade aggregations") + } + + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + "duration": time.Since(startTime).Seconds(), + }).Info("Reingestion done") + + return stop(), nil +} diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index af486a35cf..5af3d024ef 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -56,13 +56,31 @@ type groupTransactionProcessors struct { processors []horizonTransactionProcessor lazyLoaders []horizonLazyLoader processorsRunDurations + transactionStatsProcessor *processors.StatsLedgerTransactionProcessor + tradeProcessor *processors.TradeProcessor } -func newGroupTransactionProcessors(processors []horizonTransactionProcessor, lazyLoaders []horizonLazyLoader) *groupTransactionProcessors { +// build the group processor for all tx processors +// processors - list of processors this should include StatsLedgerTransactionProcessor and TradeProcessor +// transactionStatsProcessor - provide a direct reference to the stats processor that is in processors or nil, +// +// group processing will reset stats as needed +// +// tradeProcessor - provide a direct reference to the trades processor in processors or nil, +// +// so group processing will reset stats as needed +func newGroupTransactionProcessors(processors []horizonTransactionProcessor, + lazyLoaders []horizonLazyLoader, + transactionStatsProcessor *processors.StatsLedgerTransactionProcessor, + tradeProcessor *processors.TradeProcessor, +) *groupTransactionProcessors { + return &groupTransactionProcessors{ - processors: processors, - processorsRunDurations: make(map[string]time.Duration), - lazyLoaders: lazyLoaders, + processors: processors, + processorsRunDurations: make(map[string]time.Duration), + lazyLoaders: lazyLoaders, + transactionStatsProcessor: transactionStatsProcessor, + tradeProcessor: tradeProcessor, } } @@ -98,6 +116,16 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio return nil } +func (g *groupTransactionProcessors) ResetStats() { + g.processorsRunDurations = make(map[string]time.Duration) + if g.tradeProcessor != nil { + g.tradeProcessor.ResetStats() + } + if g.transactionStatsProcessor != nil { + g.transactionStatsProcessor.ResetStats() + } +} + type groupTransactionFilterers struct { filterers []processors.LedgerTransactionFilterer processorsRunDurations @@ -127,3 +155,8 @@ func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx in } return true, nil } + +func (g *groupTransactionFilterers) ResetStats() { + g.droppedTransactions = 0 + g.processorsRunDurations = make(map[string]time.Duration) +} diff --git a/services/horizon/internal/ingest/group_processors_test.go b/services/horizon/internal/ingest/group_processors_test.go index 73d4f56f3f..71e50d3911 100644 --- a/services/horizon/internal/ingest/group_processors_test.go +++ b/services/horizon/internal/ingest/group_processors_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest" + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -135,12 +137,19 @@ func TestGroupTransactionProcessorsTestSuiteLedger(t *testing.T) { func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() { s.ctx = context.Background() + statsProcessor := processors.NewStatsLedgerTransactionProcessor() + + tradesProcessor := processors.NewTradeProcessor(history.NewAccountLoaderStub().Loader, + history.NewLiquidityPoolLoaderStub().Loader, + history.NewAssetLoaderStub().Loader, + &history.MockTradeBatchInsertBuilder{}) + s.processorA = &mockHorizonTransactionProcessor{} s.processorB = &mockHorizonTransactionProcessor{} s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{ s.processorA, s.processorB, - }, nil) + }, nil, statsProcessor, tradesProcessor) s.session = &db.MockSession{} } diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index c757703822..d1382b9bd9 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -11,7 +11,6 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/db2/history" - "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/support/errors" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" @@ -45,9 +44,6 @@ func (s *IngestHistoryRangeStateTestSuite) SetupTest() { runner: s.runner, } s.system.initMetrics() - - s.historyQ.On("Rollback").Return(nil).Once() - s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(nil).Once() } @@ -60,9 +56,7 @@ func (s *IngestHistoryRangeStateTestSuite) TearDownTest() { s.runner.AssertExpectations(t) } -func (s *IngestHistoryRangeStateTestSuite) TestInvalidRange() { - // Recreate mock in this single test to remove Rollback assertion. - *s.historyQ = mockDBQ{} +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidRange() { *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} next, err := historyRangeState{fromLedger: 0, toLedger: 0}.run(s.system) @@ -86,11 +80,8 @@ func (s *IngestHistoryRangeStateTestSuite) TestInvalidRange() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestRangeNotPreparedFailPrepare() { - // Recreate mock in this single test to remove assertions. - *s.historyQ = mockDBQ{} +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeFailPrepare() { *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} - s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(errors.New("my error")).Once() @@ -100,10 +91,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestRangeNotPreparedFailPrepare() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestBeginReturnsError() { - // Recreate mock in this single test to remove Rollback assertion. - *s.historyQ = mockDBQ{} - +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeBeginReturnsError() { s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -112,8 +100,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestBeginReturnsError() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestGetLastLedgerIngestReturnsError() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeGetLastLedgerIngestReturnsError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -122,8 +111,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestGetLastLedgerIngestReturnsError() s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestGetLatestLedgerReturnsError() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeGetLatestLedgerReturnsError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(0), errors.New("my error")).Once() @@ -135,8 +125,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestGetLatestLedgerReturnsError() { // TestAnotherNodeIngested tests the case when another node has ingested the range. // In such case we go back to `init` state without processing. -func (s *IngestHistoryRangeStateTestSuite) TestAnotherNodeIngested() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeAnotherNodeIngested() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(200), nil).Once() @@ -145,40 +136,41 @@ func (s *IngestHistoryRangeStateTestSuite) TestAnotherNodeIngested() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerReturnsError() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeRunTransactionProcessorsReturnsError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: 100, + ledgers := []xdr.LedgerCloseMeta{} + for i := 100; i <= 200; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, }, }, - }, + } + ledgers = append(ledgers, meta) + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - errors.New("my error"), - ).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) - s.Assert().EqualError(err, "error processing ledger sequence=100: my error") + s.Assert().EqualError(err, "error processing ledger range 100 - 200: my error") s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestSuccess() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessNoFlushMax() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() + ledgers := []xdr.LedgerCloseMeta{} for i := 100; i <= 200; i++ { meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -189,16 +181,10 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccess() { }, }, } + ledgers = append(ledgers, meta) s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() } - + s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once() s.historyQ.On("Commit").Return(nil).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -206,40 +192,47 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccess() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestSuccessOneLedger() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessWithFlushMax() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(100), + firstLedgersBatch := []xdr.LedgerCloseMeta{} + secondLedgersBatch := []xdr.LedgerCloseMeta{} + for i := 100; i <= 200; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, }, }, - }, + } + if i-60 < 100 { + firstLedgersBatch = append(firstLedgersBatch, meta) + } else { + secondLedgersBatch = append(secondLedgersBatch, meta) + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() - - s.historyQ.On("Commit").Return(nil).Once() - - next, err := historyRangeState{fromLedger: 100, toLedger: 100}.run(s.system) + s.system.maxLedgerPerFlush = 60 + next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().NoError(err) s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestCommitsWorkOnLedgerBackendFailure() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeCommitsWorkOnLedgerBackendFailure() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -253,17 +246,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestCommitsWorkOnLedgerBackendFailure s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.ledgerBackend.On("GetLedger", s.ctx, uint32(101)). Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() + s.runner.AssertNotCalled(s.T(), "RunTransactionProcessorsOnLedgers") - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() - - s.historyQ.On("Commit").Return(nil).Once() - - next, err := historyRangeState{fromLedger: 100, toLedger: 102}.run(s.system) + next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) s.Assert().EqualError(err, "error getting ledger: my error") s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) @@ -296,11 +281,7 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { ledgerBackend: s.ledgerBackend, runner: s.runner, } - s.historyQ.On("GetTx").Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() } @@ -311,10 +292,9 @@ func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() { s.runner.AssertExpectations(t) } -func (s *ReingestHistoryRangeStateTestSuite) TestInvalidRange() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvalidRange() { // Recreate mock in this single test to remove Rollback assertion. - *s.historyQ = mockDBQ{} - + s.historyQ = &mockDBQ{} err := s.system.ReingestRange([]history.LedgerRange{{0, 0}}, false) s.Assert().EqualError(err, "Invalid range: {0 0} genesis ledger starts at 1") @@ -328,157 +308,132 @@ func (s *ReingestHistoryRangeStateTestSuite) TestInvalidRange() { s.Assert().EqualError(err, "Invalid range: {100 99} from > to") } -func (s *ReingestHistoryRangeStateTestSuite) TestBeginReturnsError() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateBeginReturnsError() { // Recreate mock in this single test to remove Rollback assertion. - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil) s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "Error starting a transaction: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestGetLastLedgerIngestNonBlockingError() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() - +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateGetLastLedgerIngestNonBlockingError() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "Error getting last ingested ledger: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeOverlaps() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() - +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRangeOverlaps() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(190), nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().Equal(ErrReingestRangeConflict{190}, err) } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeOverlapsAtEnd() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() - +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStatRangeOverlapsAtEnd() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(200), nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().Equal(ErrReingestRangeConflict{200}, err) } -func (s *ReingestHistoryRangeStateTestSuite) TestClearHistoryFails() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateClearHistoryFails() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(101, 0, 0) + // the state deletes range once, calc'd by toid.LedgerRangeInclusive(), which adjusts to = to + 1 + toidTo := toid.New(201, 0, 0) s.historyQ.On( "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(errors.New("my error")).Once() - s.historyQ.On("Rollback").Return(nil).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "error in DeleteRangeAll: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerReturnsError() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTransactionProcessorsReturnsError() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(101, 0, 0) + toidTo := toid.New(201, 0, 0) s.historyQ.On( "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(100), + ledgers := []xdr.LedgerCloseMeta{} + for i := uint32(100); i <= uint32(200); i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, }, }, - }, + } + ledgers = append(ledgers, meta) + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - - s.runner.On("RunTransactionProcessorsOnLedger", meta). - Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - errors.New("my error"), - ).Once() - s.historyQ.On("Rollback").Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) - s.Assert().EqualError(err, "error processing ledger sequence=100: my error") + s.Assert().EqualError(err, "error processing ledger range 100 - 200: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestCommitFails() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommitFails() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("Commit").Return(errors.New("my error")).Once() + toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(101, 0, 0) + toidTo := toid.New(201, 0, 0) s.historyQ.On( "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(100), + ledgers := []xdr.LedgerCloseMeta{} + for i := uint32(100); i <= uint32(200); i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, }, }, - }, + } + ledgers = append(ledgers, meta) + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() - - s.historyQ.On("Commit").Return(errors.New("my error")).Once() - s.historyQ.On("Rollback").Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "Error committing db transaction: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccessNoFlushMax() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + s.historyQ.On("Commit").Return(nil).Once() + s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() + ledgers := []xdr.LedgerCloseMeta{} for i := uint32(100); i <= uint32(200); i++ { - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() - - toidFrom := toid.New(int32(i), 0, 0) - toidTo := toid.New(int32(i+1), 0, 0) - s.historyQ.On( - "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), - ).Return(nil).Once() - meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ @@ -488,27 +443,65 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { }, }, } + ledgers = append(ledgers, meta) s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + } - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once() + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + s.Assert().NoError(err) +} - s.historyQ.On("Commit").Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - } +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccessWithFlushMax() { + s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() + firstLedgersBatch := []xdr.LedgerCloseMeta{} + secondLedgersBatch := []xdr.LedgerCloseMeta{} + for i := uint32(100); i <= uint32(200); i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + } + // triggers two flushes, one for max during loop and one for remainder + if i-60 < 100 { + firstLedgersBatch = append(firstLedgersBatch, meta) + } else { + secondLedgersBatch = append(secondLedgersBatch, meta) + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + } + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() + s.system.maxLedgerPerFlush = 60 err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().NoError(err) } -func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccessOneLedger() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("Commit").Return(nil).Once() + s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(100), 0).Return(nil).Once() + // Recreate mock in this single ledger test to remove setup assertion on ledger range. + *s.ledgerBackend = mockLedgerBackend{} + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 100)).Return(nil).Once() toidFrom := toid.New(100, 0, 0) toidTo := toid.New(101, 0, 0) @@ -526,35 +519,29 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { }, } - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() - s.historyQ.On("Commit").Return(nil).Once() - s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(100), 0).Return(nil).Once() - - // Recreate mock in this single test to remove previous assertion. - *s.ledgerBackend = mockLedgerBackend{} - s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 100)).Return(nil).Once() s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false) s.Assert().NoError(err) } -func (s *ReingestHistoryRangeStateTestSuite) TestGetLastLedgerIngestError() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceGetLastLedgerIngestError() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) s.Assert().EqualError(err, "Error getting last ingested ledger: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() { - s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() - +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceWithoutFlushMax() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() + s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() toidFrom := toid.New(100, 0, 0) toidTo := toid.New(201, 0, 0) @@ -562,6 +549,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() { "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() + ledgers := []xdr.LedgerCloseMeta{} for i := 100; i <= 200; i++ { meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -572,20 +560,52 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() { }, }, } + ledgers = append(ledgers, meta) s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() } + s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once() + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + s.Assert().NoError(err) +} +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceWithFlushMax() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() s.historyQ.On("Commit").Return(nil).Once() - s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + + firstLedgersBatch := []xdr.LedgerCloseMeta{} + secondLedgersBatch := []xdr.LedgerCloseMeta{} + for i := 100; i <= 200; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + } + // triggers two flushes, one for max during loop and one for remainder + if i-60 < 100 { + firstLedgersBatch = append(firstLedgersBatch, meta) + } else { + secondLedgersBatch = append(secondLedgersBatch, meta) + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + } + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() + + s.system.maxLedgerPerFlush = 60 err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) s.Assert().NoError(err) } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index b9ade405de..680cd3011a 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -103,6 +103,7 @@ type Config struct { RoundingSlippageFilter int EnableIngestionFiltering bool + MaxLedgerPerFlush uint32 } // LocalCaptiveCoreEnabled returns true if configured to run @@ -217,7 +218,8 @@ type system struct { runStateVerificationOnLedger func(uint32) bool - reapOffsets map[string]int64 + reapOffsets map[string]int64 + maxLedgerPerFlush uint32 currentStateMutex sync.Mutex currentState State @@ -307,6 +309,7 @@ func NewSystem(config Config) (System, error) { config.CheckpointFrequency, config.StateVerificationCheckpointFrequency, ), + maxLedgerPerFlush: config.MaxLedgerPerFlush, } system.initMetrics() diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index be2687b8e7..569307b26b 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -542,6 +542,11 @@ func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedger(ledger xdr.Ledge args.Error(3) } +func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error { + args := m.Called(ledgers) + return args.Error(0) +} + var _ ProcessorRunnerInterface = (*mockProcessorsRunner)(nil) type mockStellarCoreClient struct { diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index cff8960c1d..fac7f92f99 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" + logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -46,10 +47,6 @@ func (statsChangeProcessor) Commit(ctx context.Context) error { return nil } -type statsLedgerTransactionProcessor struct { - *processors.StatsLedgerTransactionProcessor -} - type ledgerStats struct { changeStats ingest.StatsChangeProcessorResults changeDurations processorsRunDurations @@ -75,6 +72,7 @@ type ProcessorRunnerInterface interface { tradeStats processors.TradeStats, err error, ) + RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( stats ledgerStats, err error, @@ -132,22 +130,16 @@ func buildChangeProcessor( }) } -func (s *ProcessorRunner) buildTransactionProcessor( - ledgerTransactionStats *processors.StatsLedgerTransactionProcessor, - tradeProcessor *processors.TradeProcessor, - ledgersProcessor *processors.LedgersProcessor, -) *groupTransactionProcessors { +func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor) *groupTransactionProcessors { accountLoader := history.NewAccountLoader() assetLoader := history.NewAssetLoader() lpLoader := history.NewLiquidityPoolLoader() cbLoader := history.NewClaimableBalanceLoader() lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader} + statsLedgerTransactionProcessor := processors.NewStatsLedgerTransactionProcessor() - statsLedgerTransactionProcessor := &statsLedgerTransactionProcessor{ - StatsLedgerTransactionProcessor: ledgerTransactionStats, - } - *tradeProcessor = *processors.NewTradeProcessor(accountLoader, + tradeProcessor := processors.NewTradeProcessor(accountLoader, lpLoader, assetLoader, s.historyQ.NewTradeBatchInsertBuilder()) processors := []horizonTransactionProcessor{ @@ -164,7 +156,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( processors.NewLiquidityPoolsTransactionProcessor(lpLoader, s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())} - return newGroupTransactionProcessors(processors, lazyLoaders) + return newGroupTransactionProcessors(processors, lazyLoaders, statsLedgerTransactionProcessor, tradeProcessor) } func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers { @@ -184,7 +176,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessor p = append(p, txSubProc) } - return newGroupTransactionProcessors(p, nil) + return newGroupTransactionProcessors(p, nil, nil, nil) } // checkIfProtocolVersionSupported checks if this Horizon version supports the @@ -317,49 +309,158 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( return nil } -func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( - transactionStats processors.StatsLedgerTransactionProcessorResults, - transactionDurations processorsRunDurations, - tradeStats processors.TradeStats, - err error, -) { +func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta, + groupFilterers *groupTransactionFilterers, + groupFilteredOutProcessors *groupTransactionProcessors, + groupProcessors *groupTransactionProcessors) error { var ( - ledgerTransactionStats processors.StatsLedgerTransactionProcessor - tradeProcessor processors.TradeProcessor - transactionReader *ingest.LedgerTransactionReader + transactionReader *ingest.LedgerTransactionReader ) - if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil { + if err := s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil { err = errors.Wrap(err, "Error while checking for supported protocol version") - return + return err } - // ensure capture of the ledger to history regardless of whether it has transactions. - ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) - ledgersProcessor.ProcessLedger(ledger) - - transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) + startTime := time.Now() + transactionReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) if err != nil { err = errors.Wrap(err, "Error creating ledger reader") - return + return err } + err = processors.StreamLedgerTransactions(s.ctx, + groupFilterers, + groupFilteredOutProcessors, + groupProcessors, + transactionReader, + ledger, + ) + if err != nil { + return errors.Wrap(err, "Error streaming changes from ledger") + } + + transactionStats := groupProcessors.transactionStatsProcessor.GetResults() + transactionStats.TransactionsFiltered = groupFilterers.droppedTransactions + + tradeStats := groupProcessors.tradeProcessor.GetStats() + + curHeap, sysHeap := getMemStats() + log.WithFields(transactionStats.Map()). + WithFields(tradeStats.Map()). + WithFields(logpkg.F{ + "currentHeapSizeMB": curHeap, + "systemHeapSizeMB": sysHeap, + "sequence": ledger.LedgerSequence(), + "state": false, + "ledger": true, + "commit": false, + "duration": time.Since(startTime).Seconds(), + }).Info("Processed ledger") + + return nil +} + +func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( + transactionStats processors.StatsLedgerTransactionProcessorResults, + transactionDurations processorsRunDurations, + tradeStats processors.TradeStats, + err error, +) { + // ensure capture of the ledger to history regardless of whether it has transactions. + ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) + ledgersProcessor.ProcessLedger(ledger) + groupTransactionFilterers := s.buildTransactionFilterer() groupFilteredOutProcessors := s.buildFilteredOutProcessor() - groupTransactionProcessors := s.buildTransactionProcessor( - &ledgerTransactionStats, &tradeProcessor, ledgersProcessor) - err = processors.StreamLedgerTransactions(s.ctx, + groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + + err = s.streamLedger(ledger, groupTransactionFilterers, groupFilteredOutProcessors, groupTransactionProcessors, - transactionReader, - ledger, ) if err != nil { err = errors.Wrap(err, "Error streaming changes from ledger") return } + err = s.flushProcessors(groupFilteredOutProcessors, groupTransactionProcessors) + if err != nil { + return + } + + transactionStats = groupTransactionProcessors.transactionStatsProcessor.GetResults() + transactionStats.TransactionsFiltered = groupTransactionFilterers.droppedTransactions + + transactionDurations = groupTransactionProcessors.processorsRunDurations + for key, duration := range groupFilteredOutProcessors.processorsRunDurations { + transactionDurations[key] = duration + } + for key, duration := range groupTransactionFilterers.processorsRunDurations { + transactionDurations[key] = duration + } + + tradeStats = groupTransactionProcessors.tradeProcessor.GetStats() + + return +} + +func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) (err error) { + ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) + + groupTransactionFilterers := s.buildTransactionFilterer() + groupFilteredOutProcessors := s.buildFilteredOutProcessor() + groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + + startTime := time.Now() + curHeap, sysHeap := getMemStats() + log.WithFields(logpkg.F{ + "currentHeapSizeMB": curHeap, + "systemHeapSizeMB": sysHeap, + "ledgerBatchSize": len(ledgers), + "state": false, + "ledger": true, + "commit": false, + }).Info("Running processors for batch of ledgers") + + for _, ledger := range ledgers { + // ensure capture of the ledger to history regardless of whether it has transactions. + ledgersProcessor.ProcessLedger(ledger) + + err = s.streamLedger(ledger, + groupTransactionFilterers, + groupFilteredOutProcessors, + groupTransactionProcessors, + ) + if err != nil { + err = errors.Wrap(err, "Error streaming changes during ledger batch") + return + } + groupTransactionProcessors.ResetStats() + groupFilteredOutProcessors.ResetStats() + groupTransactionFilterers.ResetStats() + } + + err = s.flushProcessors(groupFilteredOutProcessors, groupTransactionProcessors) + if err != nil { + return + } + curHeap, sysHeap = getMemStats() + log.WithFields(logpkg.F{ + "currentHeapSizeMB": curHeap, + "systemHeapSizeMB": sysHeap, + "ledgers": len(ledgers), + "state": false, + "ledger": true, + "commit": false, + "duration": time.Since(startTime).Seconds(), + }).Info("Flushed processors for batch of ledgers") + + return nil +} + +func (s *ProcessorRunner) flushProcessors(groupFilteredOutProcessors *groupTransactionProcessors, groupTransactionProcessors *groupTransactionProcessors) (err error) { if s.config.EnableIngestionFiltering { err = groupFilteredOutProcessors.Flush(s.ctx, s.session) if err != nil { @@ -374,20 +475,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos err = groupTransactionProcessors.Flush(s.ctx, s.session) if err != nil { err = errors.Wrap(err, "Error flushing changes from processor") - return } - - transactionStats = ledgerTransactionStats.GetResults() - transactionStats.TransactionsFiltered = groupTransactionFilterers.droppedTransactions - transactionDurations = groupTransactionProcessors.processorsRunDurations - for key, duration := range groupFilteredOutProcessors.processorsRunDurations { - transactionDurations[key] = duration - } - for key, duration := range groupTransactionFilterers.processorsRunDurations { - transactionDurations[key] = duration - } - - tradeStats = tradeProcessor.GetStats() return } @@ -414,11 +502,13 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( return } + transactionStats, transactionDurations, tradeStats, err := s.RunTransactionProcessorsOnLedger(ledger) + stats.changeStats = changeStatsProcessor.GetResults() stats.changeDurations = groupChangeProcessors.processorsRunDurations - - stats.transactionStats, stats.transactionDurations, stats.tradeStats, err = - s.RunTransactionProcessorsOnLedger(ledger) + stats.transactionStats = transactionStats + stats.transactionDurations = transactionDurations + stats.tradeStats = tradeStats return } diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 4af1a5be11..4023987b8f 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -266,14 +266,11 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { historyQ: q, } - stats := &processors.StatsLedgerTransactionProcessor{} - trades := &processors.TradeProcessor{} - ledgersProcessor := &processors.LedgersProcessor{} - processor := runner.buildTransactionProcessor(stats, trades, ledgersProcessor) + processor := runner.buildTransactionProcessor(ledgersProcessor) assert.IsType(t, &groupTransactionProcessors{}, processor) - assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0]) + assert.IsType(t, &processors.StatsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1]) assert.IsType(t, &processors.LedgersProcessor{}, processor.processors[2]) assert.IsType(t, &processors.OperationProcessor{}, processor.processors[3]) @@ -370,12 +367,14 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) mockBatchInsertBuilder.On( "Add", - ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil) + ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil).Once() mockBatchInsertBuilder.On( "Exec", ctx, mockSession, - ).Return(nil) + ).Return(nil).Once() + + defer mock.AssertExpectationsForObjects(t, mockBatchInsertBuilder) runner := ProcessorRunner{ ctx: ctx, @@ -389,6 +388,85 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { assert.NoError(t, err) } +func TestProcessorRunnerRunTransactionsProcessorsOnLedgers(t *testing.T) { + ctx := context.Background() + maxBatchSize := 100000 + + config := Config{ + NetworkPassphrase: network.PublicNetworkPassphrase, + } + + mockSession := &db.MockSession{} + q := &mockDBQ{} + defer mock.AssertExpectationsForObjects(t, q) + + ledgers := []xdr.LedgerCloseMeta{{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + BucketListHash: xdr.Hash([32]byte{0, 1, 2}), + LedgerSeq: xdr.Uint32(1), + }, + }, + }, + }, + { + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + BucketListHash: xdr.Hash([32]byte{3, 4, 5}), + LedgerSeq: xdr.Uint32(2), + }, + }, + }, + }, + { + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + BucketListHash: xdr.Hash([32]byte{6, 7, 8}), + LedgerSeq: xdr.Uint32(3), + }, + }, + }, + }, + } + + // Batches + defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) + + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} + q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) + mockBatchInsertBuilder.On( + "Add", + ledgers[0].V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil).Once() + mockBatchInsertBuilder.On( + "Add", + ledgers[1].V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil).Once() + mockBatchInsertBuilder.On( + "Add", + ledgers[2].V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil).Once() + + mockBatchInsertBuilder.On( + "Exec", + ctx, + mockSession, + ).Return(nil).Once() + + defer mock.AssertExpectationsForObjects(t, mockBatchInsertBuilder) + + runner := ProcessorRunner{ + ctx: ctx, + config: config, + historyQ: q, + session: mockSession, + filters: &MockFilters{}, + } + + err := runner.RunTransactionProcessorsOnLedgers(ledgers) + assert.NoError(t, err) +} + func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *testing.T) { ctx := context.Background() maxBatchSize := 100000 @@ -447,7 +525,7 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder) + Return(mockTransactionsBatchInsertBuilder).Once() mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). @@ -456,47 +534,47 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(mockOperationsBatchInsertBuilder).Twice() + Return(mockOperationsBatchInsertBuilder).Once() mockEffectBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} mockEffectBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQEffects.On("NewEffectBatchInsertBuilder"). - Return(mockEffectBatchInsertBuilder) + Return(mockEffectBatchInsertBuilder).Once() mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{} - mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.On("NewTransactionParticipantsBatchInsertBuilder"). - Return(mockTransactionsParticipantsBatchInsertBuilder) + Return(mockTransactionsParticipantsBatchInsertBuilder).Once() mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{} - mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.On("NewOperationParticipantBatchInsertBuilder"). - Return(mockOperationParticipantBatchInsertBuilder) + Return(mockOperationParticipantBatchInsertBuilder).Once() mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{} - mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). - Return(mockTransactionClaimableBalanceBatchInsertBuilder) + Return(mockTransactionClaimableBalanceBatchInsertBuilder).Once() mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{} - mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). - Return(mockOperationClaimableBalanceBatchInsertBuilder) + Return(mockOperationClaimableBalanceBatchInsertBuilder).Once() mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{} - mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). - Return(mockTransactionLiquidityPoolBatchInsertBuilder) + Return(mockTransactionLiquidityPoolBatchInsertBuilder).Once() mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{} - mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). - Return(mockOperationLiquidityPoolBatchInsertBuilder) + Return(mockOperationLiquidityPoolBatchInsertBuilder).Once() q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() - q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}).Once() return []interface{}{mockAccountSignersBatchInsertBuilder, mockOperationsBatchInsertBuilder, diff --git a/services/horizon/internal/ingest/processors/ledgers_processor.go b/services/horizon/internal/ingest/processors/ledgers_processor.go index 942a5f8522..5ecfb2ad7f 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor.go @@ -82,7 +82,7 @@ func (p *LedgersProcessor) Flush(ctx context.Context, session db.SessionInterfac } if err := p.batch.Exec(ctx, session); err != nil { - return errors.Wrapf(err, "error committing ledgers %d - %d", min, max) + return errors.Wrapf(err, "error flushing ledgers %d - %d", min, max) } return nil diff --git a/services/horizon/internal/ingest/processors/ledgers_processor_test.go b/services/horizon/internal/ingest/processors/ledgers_processor_test.go index 308bb6995d..1df6640266 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor_test.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor_test.go @@ -220,6 +220,6 @@ func (s *LedgersProcessorTestSuiteLedger) TestExecFails() { s.Assert().EqualError(s.processor.Flush( context.Background(), s.mockSession), - "error committing ledgers 20 - 20: transient exec error", + "error flushing ledgers 20 - 20: transient exec error", ) } diff --git a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go index 1937331d6a..8fa886ab52 100644 --- a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go +++ b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go @@ -15,6 +15,10 @@ type StatsLedgerTransactionProcessor struct { results StatsLedgerTransactionProcessorResults } +func NewStatsLedgerTransactionProcessor() *StatsLedgerTransactionProcessor { + return &StatsLedgerTransactionProcessor{} +} + // StatsLedgerTransactionProcessorResults contains results after running StatsLedgerTransactionProcessor. type StatsLedgerTransactionProcessorResults struct { Transactions int64 @@ -179,5 +183,9 @@ func (stats *StatsLedgerTransactionProcessorResults) Map() map[string]interface{ } } +func (p *StatsLedgerTransactionProcessor) ResetStats() { + p.results = StatsLedgerTransactionProcessorResults{} +} + // Ensure the StatsChangeProcessor conforms to the ChangeProcessor interface. var _ LedgerTransactionProcessor = (*StatsLedgerTransactionProcessor)(nil) diff --git a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go index c7fc6d7967..1f2f8c370e 100644 --- a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go @@ -44,8 +44,38 @@ func TestStatsLedgerTransactionProcessoAllOpTypesCovered(t *testing.T) { assert.Panics(t, f) } +func TestStatsLedgerTransactionProcessorReset(t *testing.T) { + processor := NewStatsLedgerTransactionProcessor() + lcm := xdr.LedgerCloseMeta{} + + assert.NoError(t, processor.ProcessTransaction(lcm, ingest.LedgerTransaction{ + Result: xdr.TransactionResultPair{ + Result: xdr.TransactionResult{ + Result: xdr.TransactionResultResult{ + Code: xdr.TransactionResultCodeTxSuccess, + }, + }, + }, + Envelope: xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + {Body: xdr.OperationBody{Type: xdr.OperationTypeCreateAccount}}, + {Body: xdr.OperationBody{Type: xdr.OperationTypePayment}}, + }, + }, + }, + }, + })) + + assert.Equal(t, processor.GetResults().Operations, int64(2)) + processor.ResetStats() + assert.Equal(t, processor.GetResults().Operations, int64(0)) +} + func TestStatsLedgerTransactionProcessor(t *testing.T) { - processor := &StatsLedgerTransactionProcessor{} + processor := NewStatsLedgerTransactionProcessor() lcm := xdr.LedgerCloseMeta{} // Successful diff --git a/services/horizon/internal/ingest/processors/trades_processor.go b/services/horizon/internal/ingest/processors/trades_processor.go index 2cb702e14b..66c6aeb8d3 100644 --- a/services/horizon/internal/ingest/processors/trades_processor.go +++ b/services/horizon/internal/ingest/processors/trades_processor.go @@ -48,6 +48,7 @@ type TradeStats struct { func (p *TradeProcessor) GetStats() TradeStats { return p.stats } + func (stats *TradeStats) Map() map[string]interface{} { return map[string]interface{}{ "stats_count": stats.count, @@ -460,3 +461,7 @@ func (p *TradeProcessor) extractTrades( return result, nil } + +func (p *TradeProcessor) ResetStats() { + p.stats = TradeStats{} +} diff --git a/services/horizon/internal/ingest/processors/trades_processor_test.go b/services/horizon/internal/ingest/processors/trades_processor_test.go index 5b2a2f20e3..8a7733f4d1 100644 --- a/services/horizon/internal/ingest/processors/trades_processor_test.go +++ b/services/horizon/internal/ingest/processors/trades_processor_test.go @@ -757,6 +757,9 @@ func (s *TradeProcessorTestSuiteLedger) TestIngestTradesSucceeds() { err := s.processor.Flush(ctx, s.mockSession) s.Assert().NoError(err) + s.Assert().Equal(s.processor.GetStats().count, int64(8)) + s.processor.ResetStats() + s.Assert().Equal(s.processor.GetStats().count, int64(0)) } func (s *TradeProcessorTestSuiteLedger) TestBatchAddError() { From e43e291d569f13fcfaad8c74d5bf1a6cfe11fc1a Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 15 Nov 2023 16:07:20 -0800 Subject: [PATCH 2/9] #5099: added 'ledgers-per-flush' reingest flag, default to 0. --- services/horizon/cmd/db.go | 10 +++ .../ingest/fsm_history_range_state.go | 2 +- .../fsm_reingest_history_range_state.go | 2 +- .../ingest/ingest_history_range_state_test.go | 67 ++++++++----------- 4 files changed, 39 insertions(+), 42 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index e23c224012..17df7d6f95 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -243,6 +243,7 @@ var dbReingestCmd = &cobra.Command{ var ( reingestForce bool parallelWorkers uint + maxLedgersPerFlush uint32 parallelJobSize uint32 retries uint retryBackoffSeconds uint @@ -275,6 +276,14 @@ func ingestRangeCmdOpts() support.ConfigOptions { FlagDefault: uint32(100000), Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", }, + { + Name: "ledgers-per-flush", + ConfigKey: &maxLedgersPerFlush, + OptType: types.Uint32, + Required: false, + FlagDefault: uint32(0), + Usage: "[optional] size of ledgers batch for tx processors to retain in memory first, before flushing once to the workers ongoing db transaction, default is 0, which disables batching, effectively flush to db tx per each ledger.", + }, { Name: "retries", ConfigKey: &retries, @@ -416,6 +425,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, StellarCoreURL: config.StellarCoreURL, RoundingSlippageFilter: config.RoundingSlippageFilter, EnableIngestionFiltering: config.EnableIngestionFiltering, + MaxLedgerPerFlush: maxLedgersPerFlush, } if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { diff --git a/services/horizon/internal/ingest/fsm_history_range_state.go b/services/horizon/internal/ingest/fsm_history_range_state.go index 321da34c2c..b967203dfb 100644 --- a/services/horizon/internal/ingest/fsm_history_range_state.go +++ b/services/horizon/internal/ingest/fsm_history_range_state.go @@ -86,7 +86,7 @@ func (h historyRangeState) run(s *system) (transition, error) { }).Info("Ledger returned from the backend") ledgers = append(ledgers, ledgerCloseMeta) - if s.maxLedgerPerFlush > 0 && len(ledgers)%int(s.maxLedgerPerFlush) == 0 { + if s.maxLedgerPerFlush < 1 || len(ledgers)%int(s.maxLedgerPerFlush) == 0 { if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) } diff --git a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go index eaa4402687..cbc1bbf715 100644 --- a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go +++ b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go @@ -68,7 +68,7 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u ledgers = append(ledgers, ledgerCloseMeta) - if s.maxLedgerPerFlush > 0 && len(ledgers)%int(s.maxLedgerPerFlush) == 0 { + if s.maxLedgerPerFlush < 1 || len(ledgers)%int(s.maxLedgerPerFlush) == 0 { if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) } diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index d1382b9bd9..a0c32bcc7d 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -142,25 +142,22 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeRunTransactionProcess s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() - ledgers := []xdr.LedgerCloseMeta{} - for i := 100; i <= 200; i++ { - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(i), - }, + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(100), }, }, - } - ledgers = append(ledgers, meta) - s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + }, } - s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(errors.New("my error")).Once() + + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) - s.Assert().EqualError(err, "error processing ledger range 100 - 200: my error") + s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error") s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } @@ -170,7 +167,6 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessNoFlushMax() { s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() - ledgers := []xdr.LedgerCloseMeta{} for i := 100; i <= 200; i++ { meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -181,10 +177,10 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessNoFlushMax() { }, }, } - ledgers = append(ledgers, meta) s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } - s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once() + s.historyQ.On("Commit").Return(nil).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -244,9 +240,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeCommitsWorkOnLedgerBa }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() s.ledgerBackend.On("GetLedger", s.ctx, uint32(101)). Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() - s.runner.AssertNotCalled(s.T(), "RunTransactionProcessorsOnLedgers") next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) @@ -365,24 +361,21 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTra "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - ledgers := []xdr.LedgerCloseMeta{} - for i := uint32(100); i <= uint32(200); i++ { - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(i), - }, + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(100), }, }, - } - ledgers = append(ledgers, meta) - s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + }, } - s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(errors.New("my error")).Once() + + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) - s.Assert().EqualError(err, "error processing ledger range 100 - 200: my error") + s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error") } func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommitFails() { @@ -398,7 +391,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommit "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - ledgers := []xdr.LedgerCloseMeta{} for i := uint32(100); i <= uint32(200); i++ { meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -409,12 +401,10 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommit }, }, } - ledgers = append(ledgers, meta) s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } - s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "Error committing db transaction: my error") } @@ -432,7 +422,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() - ledgers := []xdr.LedgerCloseMeta{} for i := uint32(100); i <= uint32(200); i++ { meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -443,11 +432,10 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces }, }, } - ledgers = append(ledgers, meta) s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } - s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().NoError(err) } @@ -549,7 +537,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - ledgers := []xdr.LedgerCloseMeta{} for i := 100; i <= 200; i++ { meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -560,10 +547,10 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW }, }, } - ledgers = append(ledgers, meta) s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } - s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once() + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) s.Assert().NoError(err) } From 4fb6f87589f0db17c21ff16c01266ebd978d0078 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 15 Nov 2023 20:46:53 -0800 Subject: [PATCH 3/9] 5099: set default ledgers-per-flush=10 --- services/horizon/cmd/db.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index a72eacadd9..f889f09841 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -281,8 +281,8 @@ func ingestRangeCmdOpts() support.ConfigOptions { ConfigKey: &maxLedgersPerFlush, OptType: types.Uint32, Required: false, - FlagDefault: uint32(0), - Usage: "[optional] size of ledgers batch for tx processors to retain in memory first, before flushing once to the workers ongoing db transaction, default is 0, which disables batching, effectively flush to db tx per each ledger.", + FlagDefault: uint32(10), + Usage: "[optional] size of ledgers batch for tx processors to retain in memory first, before flushing once to the workers ongoing db transaction, default is 10, 0 disables batching, effectively flush to db tx per each ledger.", }, { Name: "retries", From 882443e23a184fd045eb445e2784571b9b49b45e Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 16 Nov 2023 21:37:27 -0800 Subject: [PATCH 4/9] #5099: fixed process runner unit tests --- .../internal/ingest/processor_runner_test.go | 106 +++++++++--------- 1 file changed, 51 insertions(+), 55 deletions(-) diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 874afd42a7..6acd225846 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -357,7 +357,6 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { func TestProcessorRunnerWithFilterEnabled(t *testing.T) { ctx := context.Background() - maxBatchSize := 100000 config := Config{ NetworkPassphrase: network.PublicNetworkPassphrase, @@ -378,34 +377,16 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { }, } - // Batches mockTransactionsFilteredTmpBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockTransactionsFilteredTmpBatchInsertBuilder) mockTransactionsFilteredTmpBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). Return(mockTransactionsFilteredTmpBatchInsertBuilder) - - mockClaimableBalanceBatchInsertBuilder := &history.MockClaimableBalanceBatchInsertBuilder{} - q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). - Return(mockClaimableBalanceBatchInsertBuilder).Once() - - mockClaimantsBatchInsertBuilder := &history.MockClaimableBalanceClaimantBatchInsertBuilder{} - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). - Return(mockClaimantsBatchInsertBuilder).Once() - - mockClaimantsBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - - mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} - q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). - Return(mockLiquidityPoolBatchInsertBuilder).Twice() - - mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) - defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) + defer mock.AssertExpectationsForObjects(t, mockTXBatchBuilders(q, mockSession, ctx, 1)...) + defer mock.AssertExpectationsForObjects(t, mockChangeBatchBuilders(q, mockSession, ctx, 1)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -432,7 +413,6 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { ctx := context.Background() - maxBatchSize := 100000 config := Config{ NetworkPassphrase: network.PublicNetworkPassphrase, @@ -453,7 +433,8 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { } // Batches - defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) + defer mock.AssertExpectationsForObjects(t, mockTXBatchBuilders(q, mockSession, ctx, 1)...) + defer mock.AssertExpectationsForObjects(t, mockChangeBatchBuilders(q, mockSession, ctx, 1)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -482,7 +463,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { func TestProcessorRunnerRunTransactionsProcessorsOnLedgers(t *testing.T) { ctx := context.Background() - maxBatchSize := 100000 config := Config{ NetworkPassphrase: network.PublicNetworkPassphrase, @@ -525,7 +505,7 @@ func TestProcessorRunnerRunTransactionsProcessorsOnLedgers(t *testing.T) { } // Batches - defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) + defer mock.AssertExpectationsForObjects(t, mockTXBatchBuilders(q, mockSession, ctx, 3)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -613,17 +593,48 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t ) } -func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context, maxBatchSize int) []interface{} { - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder).Once() +func mockChangeBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context, numOfLedgers int) []interface{} { + // change processors + mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} + mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil).Times(numOfLedgers) + q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder).Twice() mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - mockAccountSignersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() + mockAccountSignersBatchInsertBuilder.On("Exec", ctx).Return(nil).Times(numOfLedgers) q.MockQSigners.On("NewAccountSignersBatchInsertBuilder"). Return(mockAccountSignersBatchInsertBuilder).Twice() + mockClaimableBalanceClaimantBatchInsertBuilder := &history.MockClaimableBalanceClaimantBatchInsertBuilder{} + mockClaimableBalanceClaimantBatchInsertBuilder.On("Exec", ctx).Return(nil).Times(numOfLedgers) + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). + Return(mockClaimableBalanceClaimantBatchInsertBuilder).Twice() + + mockClaimableBalanceBatchInsertBuilder := &history.MockClaimableBalanceBatchInsertBuilder{} + q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). + Return(mockClaimableBalanceBatchInsertBuilder).Twice() + mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil).Times(numOfLedgers) + + mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} + q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). + Return(mockLiquidityPoolBatchInsertBuilder).Twice() + mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Times(numOfLedgers) + + return []interface{}{mockOfferBatchInsertBuilder, + mockAccountSignersBatchInsertBuilder, + mockClaimableBalanceClaimantBatchInsertBuilder, + mockClaimableBalanceBatchInsertBuilder, + mockLiquidityPoolBatchInsertBuilder} +} + +func mockTXBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context, numOfLedgers int) []interface{} { + mockTradeBatchInsertBuilder := &history.MockTradeBatchInsertBuilder{} + q.On("NewTradeBatchInsertBuilder").Return(mockTradeBatchInsertBuilder).Once() + + mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). + Return(mockTransactionsBatchInsertBuilder).Once() + mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQOperations.On("NewOperationBatchInsertBuilder"). @@ -664,29 +675,14 @@ func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Cont q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). Return(mockOperationLiquidityPoolBatchInsertBuilder).Once() - mockClaimableBalanceClaimantBatchInsertBuilder := &history.MockClaimableBalanceClaimantBatchInsertBuilder{} - mockClaimableBalanceClaimantBatchInsertBuilder.On("Exec", ctx).Return(nil) - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder"). - Return(mockClaimableBalanceClaimantBatchInsertBuilder) - - mockClaimableBalanceBatchInsertBuilder := &history.MockClaimableBalanceBatchInsertBuilder{} - q.MockQClaimableBalances.On("NewClaimableBalanceBatchInsertBuilder"). - Return(mockClaimableBalanceBatchInsertBuilder) - mockClaimableBalanceBatchInsertBuilder.On("Exec", ctx).Return(nil) - - mockLiquidityPoolBatchInsertBuilder := &history.MockLiquidityPoolBatchInsertBuilder{} - q.MockQLiquidityPools.On("NewLiquidityPoolBatchInsertBuilder"). - Return(mockLiquidityPoolBatchInsertBuilder).Twice() - - mockLiquidityPoolBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - - mockOfferBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - mockOfferBatchInsertBuilder.On("Exec", ctx).Return(nil) - q.MockQOffers.On("NewOffersBatchInsertBuilder").Return(mockOfferBatchInsertBuilder) - - q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}).Once() - - return []interface{}{mockAccountSignersBatchInsertBuilder, + return []interface{}{mockTradeBatchInsertBuilder, + mockTransactionsBatchInsertBuilder, mockOperationsBatchInsertBuilder, - mockTransactionsBatchInsertBuilder} + mockEffectBatchInsertBuilder, + mockTransactionsParticipantsBatchInsertBuilder, + mockOperationParticipantBatchInsertBuilder, + mockTransactionClaimableBalanceBatchInsertBuilder, + mockOperationClaimableBalanceBatchInsertBuilder, + mockTransactionLiquidityPoolBatchInsertBuilder, + mockOperationLiquidityPoolBatchInsertBuilder} } From fd713f8269513ff3516e778763ec26c744da29a6 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 16 Nov 2023 22:42:33 -0800 Subject: [PATCH 5/9] #5099: fixed fmt warnings --- services/horizon/internal/ingest/processor_runner_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 95d5001a1b..550788746f 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -299,7 +299,7 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { defer mock.AssertExpectationsForObjects(t, mockTxProcessorBatchBuilders(q, mockSession, ctx)...) defer mock.AssertExpectationsForObjects(t, mockChangeProcessorBatchBuilders(q, ctx, true)...) - + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) mockBatchInsertBuilder.On( From c2aea6ab64612b13b934c446ed0b92fc7f5afa30 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Sun, 26 Nov 2023 22:49:43 -0800 Subject: [PATCH 6/9] #5099: review feedback, remove maxflush command flag, calc max instead --- services/horizon/cmd/db.go | 14 ++--- .../ingest/fsm_history_range_state.go | 12 +++-- .../fsm_reingest_history_range_state.go | 12 +++-- .../ingest/ingest_history_range_state_test.go | 51 ++++++++++++++----- .../internal/ingest/processor_runner.go | 17 +++---- 5 files changed, 67 insertions(+), 39 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index f889f09841..85b26afd56 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -243,7 +243,6 @@ var dbReingestCmd = &cobra.Command{ var ( reingestForce bool parallelWorkers uint - maxLedgersPerFlush uint32 parallelJobSize uint32 retries uint retryBackoffSeconds uint @@ -276,14 +275,6 @@ func ingestRangeCmdOpts() support.ConfigOptions { FlagDefault: uint32(100000), Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", }, - { - Name: "ledgers-per-flush", - ConfigKey: &maxLedgersPerFlush, - OptType: types.Uint32, - Required: false, - FlagDefault: uint32(10), - Usage: "[optional] size of ledgers batch for tx processors to retain in memory first, before flushing once to the workers ongoing db transaction, default is 10, 0 disables batching, effectively flush to db tx per each ledger.", - }, { Name: "retries", ConfigKey: &retries, @@ -408,6 +399,11 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, return errors.New("--force is incompatible with --parallel-workers > 1") } + maxLedgersPerFlush := uint32(100) + if parallelWorkers > 0 { + maxLedgersPerFlush = uint32(parallelWorkers) + } + ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, HistoryArchiveURLs: config.HistoryArchiveURLs, diff --git a/services/horizon/internal/ingest/fsm_history_range_state.go b/services/horizon/internal/ingest/fsm_history_range_state.go index b967203dfb..0aa65795a9 100644 --- a/services/horizon/internal/ingest/fsm_history_range_state.go +++ b/services/horizon/internal/ingest/fsm_history_range_state.go @@ -33,6 +33,10 @@ func (h historyRangeState) run(s *system) (transition, error) { return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) } + if s.maxLedgerPerFlush < 1 { + return start(), errors.New("invalid maxLedgerPerFlush, must be greater than 0") + } + err := s.maybePrepareRange(s.ctx, h.fromLedger) if err != nil { return start(), err @@ -61,7 +65,7 @@ func (h historyRangeState) run(s *system) (transition, error) { return start(), nil } - ledgers := []xdr.LedgerCloseMeta{} + ledgers := make([]xdr.LedgerCloseMeta, 0, s.maxLedgerPerFlush) for cur := h.fromLedger; cur <= h.toLedger; cur++ { var ledgerCloseMeta xdr.LedgerCloseMeta @@ -70,7 +74,7 @@ func (h historyRangeState) run(s *system) (transition, error) { ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) if err != nil { - // Commit finished work in case of ledger backend error. + // Commit prior batches that have been flushed in case of ledger backend error. commitErr := s.historyQ.Commit() if commitErr != nil { log.WithError(commitErr).Error("Error committing partial range results") @@ -86,11 +90,11 @@ func (h historyRangeState) run(s *system) (transition, error) { }).Info("Ledger returned from the backend") ledgers = append(ledgers, ledgerCloseMeta) - if s.maxLedgerPerFlush < 1 || len(ledgers)%int(s.maxLedgerPerFlush) == 0 { + if len(ledgers) == cap(ledgers) { if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) } - ledgers = []xdr.LedgerCloseMeta{} + ledgers = ledgers[0:0] } } diff --git a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go index cbc1bbf715..fdf7d793fb 100644 --- a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go +++ b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go @@ -35,6 +35,10 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u return errors.New("expected transaction to be present") } + if s.maxLedgerPerFlush < 1 { + return errors.New("invalid maxLedgerPerFlush, must be greater than 0") + } + // Clear history data before ingesting - used in `reingest range` command. start, end, err := toid.LedgerRangeInclusive( int32(fromLedger), @@ -49,7 +53,9 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u return errors.Wrap(err, "error in DeleteRangeAll") } - ledgers := []xdr.LedgerCloseMeta{} + // s.maxLedgerPerFlush has been validated to be at least 1 + ledgers := make([]xdr.LedgerCloseMeta, 0, s.maxLedgerPerFlush) + for cur := fromLedger; cur <= toLedger; cur++ { var ledgerCloseMeta xdr.LedgerCloseMeta @@ -68,11 +74,11 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u ledgers = append(ledgers, ledgerCloseMeta) - if s.maxLedgerPerFlush < 1 || len(ledgers)%int(s.maxLedgerPerFlush) == 0 { + if len(ledgers)%int(s.maxLedgerPerFlush) == 0 { if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) } - ledgers = []xdr.LedgerCloseMeta{} + ledgers = ledgers[0:0] } } diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index a0c32bcc7d..be0ebdaafc 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -37,11 +37,12 @@ func (s *IngestHistoryRangeStateTestSuite) SetupTest() { s.historyAdapter = &mockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} s.system = &system{ - ctx: s.ctx, - historyQ: s.historyQ, - historyAdapter: s.historyAdapter, - ledgerBackend: s.ledgerBackend, - runner: s.runner, + ctx: s.ctx, + historyQ: s.historyQ, + historyAdapter: s.historyAdapter, + ledgerBackend: s.ledgerBackend, + runner: s.runner, + maxLedgerPerFlush: 1, } s.system.initMetrics() s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() @@ -80,6 +81,16 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidRange() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidMaxFlush() { + *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + + s.system.maxLedgerPerFlush = 0 + next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) + s.Assert().Error(err) + s.Assert().EqualError(err, "invalid maxLedgerPerFlush, must be greater than 0") + s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) +} + func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeFailPrepare() { *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() @@ -161,7 +172,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeRunTransactionProcess s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessNoFlushMax() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccess() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() @@ -183,6 +194,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessNoFlushMax() { s.historyQ.On("Commit").Return(nil).Once() + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().NoError(err) s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) @@ -271,11 +283,12 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { s.ledgerBackend = &mockLedgerBackend{} s.runner = &mockProcessorsRunner{} s.system = &system{ - ctx: s.ctx, - historyQ: s.historyQ, - historyAdapter: s.historyAdapter, - ledgerBackend: s.ledgerBackend, - runner: s.runner, + ctx: s.ctx, + historyQ: s.historyQ, + historyAdapter: s.historyAdapter, + ledgerBackend: s.ledgerBackend, + runner: s.runner, + maxLedgerPerFlush: 1, } s.historyQ.On("GetTx").Return(nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() @@ -304,6 +317,16 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvali s.Assert().EqualError(err, "Invalid range: {100 99} from > to") } +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvalidMaxFlush() { + s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.system.maxLedgerPerFlush = 0 + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + s.Assert().EqualError(err, "invalid maxLedgerPerFlush, must be greater than 0") +} + func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateBeginReturnsError() { // Recreate mock in this single test to remove Rollback assertion. s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() @@ -409,7 +432,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommit s.Assert().EqualError(err, "Error committing db transaction: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccessNoFlushMax() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccess() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() @@ -436,6 +459,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().NoError(err) } @@ -523,7 +547,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceG s.Assert().EqualError(err, "Error getting last ingested ledger: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceWithoutFlushMax() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() @@ -551,6 +575,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) s.Assert().NoError(err) } diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index fac7f92f99..fe7894d737 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -345,17 +345,14 @@ func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta, tradeStats := groupProcessors.tradeProcessor.GetStats() - curHeap, sysHeap := getMemStats() log.WithFields(transactionStats.Map()). WithFields(tradeStats.Map()). WithFields(logpkg.F{ - "currentHeapSizeMB": curHeap, - "systemHeapSizeMB": sysHeap, - "sequence": ledger.LedgerSequence(), - "state": false, - "ledger": true, - "commit": false, - "duration": time.Since(startTime).Seconds(), + "sequence": ledger.LedgerSequence(), + "state": false, + "ledger": true, + "commit": false, + "duration": time.Since(startTime).Seconds(), }).Info("Processed ledger") return nil @@ -422,7 +419,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.Ledger "state": false, "ledger": true, "commit": false, - }).Info("Running processors for batch of ledgers") + }).Infof("Running processors for batch of %v ledgers", len(ledgers)) for _, ledger := range ledgers { // ensure capture of the ledger to history regardless of whether it has transactions. @@ -455,7 +452,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.Ledger "ledger": true, "commit": false, "duration": time.Since(startTime).Seconds(), - }).Info("Flushed processors for batch of ledgers") + }).Infof("Flushed processors for batch of %v ledgers", len(ledgers)) return nil } From 2386a2083ca2e7b84e486e125b37958222d58d4b Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 27 Nov 2023 00:11:49 -0800 Subject: [PATCH 7/9] #5099: review feedback, commit already flushed batches prior to err --- .../fsm_reingest_history_range_state.go | 7 +- .../ingest/ingest_history_range_state_test.go | 68 +++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go index fdf7d793fb..4e60f71cd1 100644 --- a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go +++ b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go @@ -141,8 +141,11 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) { return stop(), errors.Wrap(err, getLastIngestedErrMsg) } - if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil { - return stop(), err + if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger); ingestErr != nil { + if err := s.historyQ.Commit(); err != nil { + return stop(), errors.Wrap(ingestErr, commitErrMsg) + } + return stop(), ingestErr } if err := s.historyQ.Commit(); err != nil { diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index be0ebdaafc..4598008eb8 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -580,6 +580,74 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce( s.Assert().NoError(err) } +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceLedgerRetrievalError() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() + + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + + for i := 100; i <= 105; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + } + + s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() + + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + s.Assert().EqualError(err, "error getting ledger: my error") +} + +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceLedgerRetrievalAndCommitError() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() + s.historyQ.On("Commit").Return(errors.New("commit error")).Once() + + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + + for i := 100; i <= 105; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + } + + s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() + + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + s.Assert().EqualError(err, "Error committing db transaction: error getting ledger: my error") +} + func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceWithFlushMax() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() From d3a39265c47aa0298f4e796a2bbbbb6e71dbb3c9 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 27 Nov 2023 22:42:33 -0800 Subject: [PATCH 8/9] #5099: set max flush size default in NewSystem --- services/horizon/cmd/db.go | 4 +++- services/horizon/internal/ingest/main.go | 9 ++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 85b26afd56..930d3f1ca2 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -399,7 +399,9 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, return errors.New("--force is incompatible with --parallel-workers > 1") } - maxLedgersPerFlush := uint32(100) + // set to 0 to allow system default to be applied for flush max, + // unless parallel workers are define, then set flush size to the number of workers. + maxLedgersPerFlush := uint32(0) if parallelWorkers > 0 { maxLedgersPerFlush = uint32(parallelWorkers) } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 24d6a91b34..32873dbea0 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -280,6 +280,13 @@ func NewSystem(config Config) (System, error) { historyAdapter := newHistoryArchiveAdapter(archive) filters := filters.NewFilters() + maxLedgersPerFlush := config.MaxLedgerPerFlush + if maxLedgersPerFlush < 1 { + // 100 ledgers per flush has shown in stress tests + // to be best point on performance curve, default to that. + maxLedgersPerFlush = 100 + } + system := &system{ cancel: cancel, config: config, @@ -306,7 +313,7 @@ func NewSystem(config Config) (System, error) { config.CheckpointFrequency, config.StateVerificationCheckpointFrequency, ), - maxLedgerPerFlush: config.MaxLedgerPerFlush, + maxLedgerPerFlush: maxLedgersPerFlush, } system.initMetrics() From 386891a100e98a5a10ded7a2487bcde33ba10071 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 27 Nov 2023 23:30:35 -0800 Subject: [PATCH 9/9] #5099: use parallelJobSize as the alternate default for reingest max flush size if lower than default of 100 --- services/horizon/cmd/db.go | 8 +++----- services/horizon/internal/ingest/main.go | 8 +++++--- services/horizon/internal/ingest/main_test.go | 1 + 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 930d3f1ca2..a83597932e 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -399,11 +399,9 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, return errors.New("--force is incompatible with --parallel-workers > 1") } - // set to 0 to allow system default to be applied for flush max, - // unless parallel workers are define, then set flush size to the number of workers. - maxLedgersPerFlush := uint32(0) - if parallelWorkers > 0 { - maxLedgersPerFlush = uint32(parallelWorkers) + maxLedgersPerFlush := ingest.MaxLedgersPerFlush + if parallelJobSize < maxLedgersPerFlush { + maxLedgersPerFlush = parallelJobSize } ingestConfig := ingest.Config{ diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 32873dbea0..0369f3a69d 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -68,6 +68,10 @@ const ( defaultCoreCursorName = "HORIZON" stateVerificationErrorThreshold = 3 + + // 100 ledgers per flush has shown in stress tests + // to be best point on performance curve, default to that. + MaxLedgersPerFlush uint32 = 100 ) var log = logpkg.DefaultLogger.WithField("service", "ingest") @@ -282,9 +286,7 @@ func NewSystem(config Config) (System, error) { maxLedgersPerFlush := config.MaxLedgerPerFlush if maxLedgersPerFlush < 1 { - // 100 ledgers per flush has shown in stress tests - // to be best point on performance curve, default to that. - maxLedgersPerFlush = 100 + maxLedgersPerFlush = MaxLedgersPerFlush } system := &system{ diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 569307b26b..3258db4899 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -106,6 +106,7 @@ func TestNewSystem(t *testing.T) { assert.Equal(t, config, system.runner.(*ProcessorRunner).config) assert.Equal(t, system.ctx, system.runner.(*ProcessorRunner).ctx) + assert.Equal(t, system.maxLedgerPerFlush, MaxLedgersPerFlush) } func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) {