diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 45ecd90489..a83597932e 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -399,6 +399,11 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, return errors.New("--force is incompatible with --parallel-workers > 1") } + maxLedgersPerFlush := ingest.MaxLedgersPerFlush + if parallelJobSize < maxLedgersPerFlush { + maxLedgersPerFlush = parallelJobSize + } + ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, HistoryArchiveURLs: config.HistoryArchiveURLs, @@ -415,6 +420,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, StellarCoreURL: config.StellarCoreURL, RoundingSlippageFilter: config.RoundingSlippageFilter, EnableIngestionFiltering: config.EnableIngestionFiltering, + MaxLedgerPerFlush: maxLedgersPerFlush, } if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 2f9a40783c..4d7640932b 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -12,7 +12,6 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" - "github.com/stellar/go/toid" "github.com/stellar/go/xdr" ) @@ -585,288 +584,6 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string] } } -type historyRangeState struct { - fromLedger uint32 - toLedger uint32 -} - -func (h historyRangeState) String() string { - return fmt.Sprintf( - "historyRange(fromLedger=%d, toLedger=%d)", - h.fromLedger, - h.toLedger, - ) -} - -func (historyRangeState) GetState() State { - return HistoryRange -} - -// historyRangeState is used when catching up history data -func (h historyRangeState) run(s *system) (transition, error) { - if h.fromLedger == 0 || h.toLedger == 0 || - h.fromLedger > h.toLedger { - return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) - } - - err := s.maybePrepareRange(s.ctx, h.fromLedger) - if err != nil { - return start(), err - } - - if err = s.historyQ.Begin(s.ctx); err != nil { - return start(), errors.Wrap(err, "Error starting a transaction") - } - defer s.historyQ.Rollback() - - // acquire distributed lock so no one else can perform ingestion operations. - if _, err = s.historyQ.GetLastLedgerIngest(s.ctx); err != nil { - return start(), errors.Wrap(err, getLastIngestedErrMsg) - } - - lastHistoryLedger, err := s.historyQ.GetLatestHistoryLedger(s.ctx) - if err != nil { - return start(), errors.Wrap(err, "could not get latest history ledger") - } - - // We should be ingesting the ledger which occurs after - // lastHistoryLedger. Otherwise, some other horizon node has - // already completed the ingest history range operation and - // we should go back to the init state - if lastHistoryLedger != h.fromLedger-1 { - return start(), nil - } - - for cur := h.fromLedger; cur <= h.toLedger; cur++ { - var ledgerCloseMeta xdr.LedgerCloseMeta - - log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...") - startTime := time.Now() - - ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) - if err != nil { - // Commit finished work in case of ledger backend error. - commitErr := s.historyQ.Commit() - if commitErr != nil { - log.WithError(commitErr).Error("Error committing partial range results") - } else { - log.Info("Committed partial range results") - } - return start(), errors.Wrap(err, "error getting ledger") - } - - log.WithFields(logpkg.F{ - "sequence": cur, - "duration": time.Since(startTime).Seconds(), - }).Info("Ledger returned from the backend") - - if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil { - return start(), err - } - } - - if err = s.historyQ.Commit(); err != nil { - return start(), errors.Wrap(err, commitErrMsg) - } - - return start(), nil -} - -func runTransactionProcessorsOnLedger(s *system, ledger xdr.LedgerCloseMeta) error { - log.WithFields(logpkg.F{ - "sequence": ledger.LedgerSequence(), - "state": false, - "ledger": true, - "commit": false, - }).Info("Processing ledger") - startTime := time.Now() - - ledgerTransactionStats, _, tradeStats, err := s.runner.RunTransactionProcessorsOnLedger(ledger) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", ledger.LedgerSequence())) - } - - log. - WithFields(ledgerTransactionStats.Map()). - WithFields(tradeStats.Map()). - WithFields(logpkg.F{ - "sequence": ledger.LedgerSequence(), - "duration": time.Since(startTime).Seconds(), - "state": false, - "ledger": true, - "commit": false, - }). - Info("Processed ledger") - return nil -} - -type reingestHistoryRangeState struct { - fromLedger uint32 - toLedger uint32 - force bool -} - -func (h reingestHistoryRangeState) String() string { - return fmt.Sprintf( - "reingestHistoryRange(fromLedger=%d, toLedger=%d, force=%t)", - h.fromLedger, - h.toLedger, - h.force, - ) -} - -func (reingestHistoryRangeState) GetState() State { - return ReingestHistoryRange -} - -func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error { - if s.historyQ.GetTx() == nil { - return errors.New("expected transaction to be present") - } - - // Clear history data before ingesting - used in `reingest range` command. - start, end, err := toid.LedgerRangeInclusive( - int32(fromLedger), - int32(toLedger), - ) - if err != nil { - return errors.Wrap(err, "Invalid range") - } - - err = s.historyQ.DeleteRangeAll(s.ctx, start, end) - if err != nil { - return errors.Wrap(err, "error in DeleteRangeAll") - } - - for cur := fromLedger; cur <= toLedger; cur++ { - var ledgerCloseMeta xdr.LedgerCloseMeta - ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) - if err != nil { - return errors.Wrap(err, "error getting ledger") - } - - if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil { - return err - } - } - - return nil -} - -func (h reingestHistoryRangeState) prepareRange(s *system) (transition, error) { - log.WithFields(logpkg.F{ - "from": h.fromLedger, - "to": h.toLedger, - }).Info("Preparing ledger backend to retrieve range") - startTime := time.Now() - - err := s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(h.fromLedger, h.toLedger)) - if err != nil { - return stop(), errors.Wrap(err, "error preparing range") - } - - log.WithFields(logpkg.F{ - "from": h.fromLedger, - "to": h.toLedger, - "duration": time.Since(startTime).Seconds(), - }).Info("Range ready") - - return transition{}, nil -} - -// reingestHistoryRangeState is used as a command to reingest historical data -func (h reingestHistoryRangeState) run(s *system) (transition, error) { - if h.fromLedger == 0 || h.toLedger == 0 || - h.fromLedger > h.toLedger { - return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) - } - - if h.fromLedger == 1 { - log.Warn("Ledger 1 is pregenerated and not available, starting from ledger 2.") - h.fromLedger = 2 - } - - var startTime time.Time - - if h.force { - if t, err := h.prepareRange(s); err != nil { - return t, err - } - startTime = time.Now() - - if err := s.historyQ.Begin(s.ctx); err != nil { - return stop(), errors.Wrap(err, "Error starting a transaction") - } - defer s.historyQ.Rollback() - - // acquire distributed lock so no one else can perform ingestion operations. - if _, err := s.historyQ.GetLastLedgerIngest(s.ctx); err != nil { - return stop(), errors.Wrap(err, getLastIngestedErrMsg) - } - - if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil { - return stop(), err - } - - if err := s.historyQ.Commit(); err != nil { - return stop(), errors.Wrap(err, commitErrMsg) - } - } else { - lastIngestedLedger, err := s.historyQ.GetLastLedgerIngestNonBlocking(s.ctx) - if err != nil { - return stop(), errors.Wrap(err, getLastIngestedErrMsg) - } - - if lastIngestedLedger > 0 && h.toLedger >= lastIngestedLedger { - return stop(), ErrReingestRangeConflict{lastIngestedLedger} - } - - // Only prepare the range after checking the bounds to enable an early error return - var t transition - if t, err = h.prepareRange(s); err != nil { - return t, err - } - startTime = time.Now() - - for cur := h.fromLedger; cur <= h.toLedger; cur++ { - err = func(ledger uint32) error { - if e := s.historyQ.Begin(s.ctx); e != nil { - return errors.Wrap(e, "Error starting a transaction") - } - defer s.historyQ.Rollback() - - // ingest each ledger in a separate transaction to prevent deadlocks - // when acquiring ShareLocks from multiple parallel reingest range processes - if e := h.ingestRange(s, ledger, ledger); e != nil { - return e - } - - if e := s.historyQ.Commit(); e != nil { - return errors.Wrap(e, commitErrMsg) - } - - return nil - }(cur) - if err != nil { - return stop(), err - } - } - } - - err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter) - if err != nil { - return stop(), errors.Wrap(err, "Error rebuilding trade aggregations") - } - - log.WithFields(logpkg.F{ - "from": h.fromLedger, - "to": h.toLedger, - "duration": time.Since(startTime).Seconds(), - }).Info("Reingestion done") - - return stop(), nil -} - type waitForCheckpointState struct{} func (waitForCheckpointState) String() string { diff --git a/services/horizon/internal/ingest/fsm_history_range_state.go b/services/horizon/internal/ingest/fsm_history_range_state.go new file mode 100644 index 0000000000..0aa65795a9 --- /dev/null +++ b/services/horizon/internal/ingest/fsm_history_range_state.go @@ -0,0 +1,111 @@ +package ingest + +import ( + "fmt" + "time" + + "github.com/stellar/go/support/errors" + logpkg "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" +) + +type historyRangeState struct { + fromLedger uint32 + toLedger uint32 +} + +func (h historyRangeState) String() string { + return fmt.Sprintf( + "historyRange(fromLedger=%d, toLedger=%d)", + h.fromLedger, + h.toLedger, + ) +} + +func (historyRangeState) GetState() State { + return HistoryRange +} + +// historyRangeState is used when catching up history data +func (h historyRangeState) run(s *system) (transition, error) { + if h.fromLedger == 0 || h.toLedger == 0 || + h.fromLedger > h.toLedger { + return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) + } + + if s.maxLedgerPerFlush < 1 { + return start(), errors.New("invalid maxLedgerPerFlush, must be greater than 0") + } + + err := s.maybePrepareRange(s.ctx, h.fromLedger) + if err != nil { + return start(), err + } + + if err = s.historyQ.Begin(s.ctx); err != nil { + return start(), errors.Wrap(err, "Error starting a transaction") + } + defer s.historyQ.Rollback() + + // acquire distributed lock so no one else can perform ingestion operations. + if _, err = s.historyQ.GetLastLedgerIngest(s.ctx); err != nil { + return start(), errors.Wrap(err, getLastIngestedErrMsg) + } + + lastHistoryLedger, err := s.historyQ.GetLatestHistoryLedger(s.ctx) + if err != nil { + return start(), errors.Wrap(err, "could not get latest history ledger") + } + + // We should be ingesting the ledger which occurs after + // lastHistoryLedger. Otherwise, some other horizon node has + // already completed the ingest history range operation and + // we should go back to the init state + if lastHistoryLedger != h.fromLedger-1 { + return start(), nil + } + + ledgers := make([]xdr.LedgerCloseMeta, 0, s.maxLedgerPerFlush) + for cur := h.fromLedger; cur <= h.toLedger; cur++ { + var ledgerCloseMeta xdr.LedgerCloseMeta + + log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...") + startTime := time.Now() + + ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) + if err != nil { + // Commit prior batches that have been flushed in case of ledger backend error. + commitErr := s.historyQ.Commit() + if commitErr != nil { + log.WithError(commitErr).Error("Error committing partial range results") + } else { + log.Info("Committed partial range results") + } + return start(), errors.Wrap(err, "error getting ledger") + } + + log.WithFields(logpkg.F{ + "sequence": cur, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") + ledgers = append(ledgers, ledgerCloseMeta) + + if len(ledgers) == cap(ledgers) { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) + } + ledgers = ledgers[0:0] + } + } + + if len(ledgers) > 0 { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) + } + } + if err = s.historyQ.Commit(); err != nil { + return start(), errors.Wrap(err, commitErrMsg) + } + + return start(), nil +} diff --git a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go new file mode 100644 index 0000000000..4e60f71cd1 --- /dev/null +++ b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go @@ -0,0 +1,196 @@ +package ingest + +import ( + "fmt" + "time" + + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/errors" + logpkg "github.com/stellar/go/support/log" + "github.com/stellar/go/toid" + "github.com/stellar/go/xdr" +) + +type reingestHistoryRangeState struct { + fromLedger uint32 + toLedger uint32 + force bool +} + +func (h reingestHistoryRangeState) String() string { + return fmt.Sprintf( + "reingestHistoryRange(fromLedger=%d, toLedger=%d, force=%t)", + h.fromLedger, + h.toLedger, + h.force, + ) +} + +func (reingestHistoryRangeState) GetState() State { + return ReingestHistoryRange +} + +func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error { + if s.historyQ.GetTx() == nil { + return errors.New("expected transaction to be present") + } + + if s.maxLedgerPerFlush < 1 { + return errors.New("invalid maxLedgerPerFlush, must be greater than 0") + } + + // Clear history data before ingesting - used in `reingest range` command. + start, end, err := toid.LedgerRangeInclusive( + int32(fromLedger), + int32(toLedger), + ) + if err != nil { + return errors.Wrap(err, "Invalid range") + } + + err = s.historyQ.DeleteRangeAll(s.ctx, start, end) + if err != nil { + return errors.Wrap(err, "error in DeleteRangeAll") + } + + // s.maxLedgerPerFlush has been validated to be at least 1 + ledgers := make([]xdr.LedgerCloseMeta, 0, s.maxLedgerPerFlush) + + for cur := fromLedger; cur <= toLedger; cur++ { + var ledgerCloseMeta xdr.LedgerCloseMeta + + log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...") + startTime := time.Now() + ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur) + + if err != nil { + return errors.Wrap(err, "error getting ledger") + } + + log.WithFields(logpkg.F{ + "sequence": cur, + "duration": time.Since(startTime).Seconds(), + }).Info("Ledger returned from the backend") + + ledgers = append(ledgers, ledgerCloseMeta) + + if len(ledgers)%int(s.maxLedgerPerFlush) == 0 { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) + } + ledgers = ledgers[0:0] + } + } + + if len(ledgers) > 0 { + if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil { + return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence()) + } + } + + return nil +} + +func (h reingestHistoryRangeState) prepareRange(s *system) (transition, error) { + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + }).Info("Preparing ledger backend to retrieve range") + startTime := time.Now() + + err := s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(h.fromLedger, h.toLedger)) + if err != nil { + return stop(), errors.Wrap(err, "error preparing range") + } + + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + "duration": time.Since(startTime).Seconds(), + }).Info("Range ready") + + return transition{}, nil +} + +// reingestHistoryRangeState is used as a command to reingest historical data +func (h reingestHistoryRangeState) run(s *system) (transition, error) { + if h.fromLedger == 0 || h.toLedger == 0 || + h.fromLedger > h.toLedger { + return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) + } + + if h.fromLedger == 1 { + log.Warn("Ledger 1 is pregenerated and not available, starting from ledger 2.") + h.fromLedger = 2 + } + + startTime := time.Now() + + if h.force { + if t, err := h.prepareRange(s); err != nil { + return t, err + } + + if err := s.historyQ.Begin(s.ctx); err != nil { + return stop(), errors.Wrap(err, "Error starting a transaction") + } + defer s.historyQ.Rollback() + + // acquire distributed lock so no one else can perform ingestion operations. + if _, err := s.historyQ.GetLastLedgerIngest(s.ctx); err != nil { + return stop(), errors.Wrap(err, getLastIngestedErrMsg) + } + + if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger); ingestErr != nil { + if err := s.historyQ.Commit(); err != nil { + return stop(), errors.Wrap(ingestErr, commitErrMsg) + } + return stop(), ingestErr + } + + if err := s.historyQ.Commit(); err != nil { + return stop(), errors.Wrap(err, commitErrMsg) + } + } else { + lastIngestedLedger, err := s.historyQ.GetLastLedgerIngestNonBlocking(s.ctx) + if err != nil { + return stop(), errors.Wrap(err, getLastIngestedErrMsg) + } + + if lastIngestedLedger > 0 && h.toLedger >= lastIngestedLedger { + return stop(), ErrReingestRangeConflict{lastIngestedLedger} + } + + // Only prepare the range after checking the bounds to enable an early error return + var t transition + if t, err = h.prepareRange(s); err != nil { + return t, err + } + + if err := s.historyQ.Begin(s.ctx); err != nil { + return stop(), errors.Wrap(err, "Error starting a transaction") + } + defer s.historyQ.Rollback() + + if e := h.ingestRange(s, h.fromLedger, h.toLedger); e != nil { + return stop(), e + } + + if e := s.historyQ.Commit(); e != nil { + return stop(), errors.Wrap(e, commitErrMsg) + } + } + + err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter) + if err != nil { + return stop(), errors.Wrap(err, "Error rebuilding trade aggregations") + } + + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + "duration": time.Since(startTime).Seconds(), + }).Info("Reingestion done") + + return stop(), nil +} diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index af486a35cf..5af3d024ef 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -56,13 +56,31 @@ type groupTransactionProcessors struct { processors []horizonTransactionProcessor lazyLoaders []horizonLazyLoader processorsRunDurations + transactionStatsProcessor *processors.StatsLedgerTransactionProcessor + tradeProcessor *processors.TradeProcessor } -func newGroupTransactionProcessors(processors []horizonTransactionProcessor, lazyLoaders []horizonLazyLoader) *groupTransactionProcessors { +// build the group processor for all tx processors +// processors - list of processors this should include StatsLedgerTransactionProcessor and TradeProcessor +// transactionStatsProcessor - provide a direct reference to the stats processor that is in processors or nil, +// +// group processing will reset stats as needed +// +// tradeProcessor - provide a direct reference to the trades processor in processors or nil, +// +// so group processing will reset stats as needed +func newGroupTransactionProcessors(processors []horizonTransactionProcessor, + lazyLoaders []horizonLazyLoader, + transactionStatsProcessor *processors.StatsLedgerTransactionProcessor, + tradeProcessor *processors.TradeProcessor, +) *groupTransactionProcessors { + return &groupTransactionProcessors{ - processors: processors, - processorsRunDurations: make(map[string]time.Duration), - lazyLoaders: lazyLoaders, + processors: processors, + processorsRunDurations: make(map[string]time.Duration), + lazyLoaders: lazyLoaders, + transactionStatsProcessor: transactionStatsProcessor, + tradeProcessor: tradeProcessor, } } @@ -98,6 +116,16 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio return nil } +func (g *groupTransactionProcessors) ResetStats() { + g.processorsRunDurations = make(map[string]time.Duration) + if g.tradeProcessor != nil { + g.tradeProcessor.ResetStats() + } + if g.transactionStatsProcessor != nil { + g.transactionStatsProcessor.ResetStats() + } +} + type groupTransactionFilterers struct { filterers []processors.LedgerTransactionFilterer processorsRunDurations @@ -127,3 +155,8 @@ func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx in } return true, nil } + +func (g *groupTransactionFilterers) ResetStats() { + g.droppedTransactions = 0 + g.processorsRunDurations = make(map[string]time.Duration) +} diff --git a/services/horizon/internal/ingest/group_processors_test.go b/services/horizon/internal/ingest/group_processors_test.go index 73d4f56f3f..71e50d3911 100644 --- a/services/horizon/internal/ingest/group_processors_test.go +++ b/services/horizon/internal/ingest/group_processors_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest" + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -135,12 +137,19 @@ func TestGroupTransactionProcessorsTestSuiteLedger(t *testing.T) { func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() { s.ctx = context.Background() + statsProcessor := processors.NewStatsLedgerTransactionProcessor() + + tradesProcessor := processors.NewTradeProcessor(history.NewAccountLoaderStub().Loader, + history.NewLiquidityPoolLoaderStub().Loader, + history.NewAssetLoaderStub().Loader, + &history.MockTradeBatchInsertBuilder{}) + s.processorA = &mockHorizonTransactionProcessor{} s.processorB = &mockHorizonTransactionProcessor{} s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{ s.processorA, s.processorB, - }, nil) + }, nil, statsProcessor, tradesProcessor) s.session = &db.MockSession{} } diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index c757703822..4598008eb8 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -11,7 +11,6 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/db2/history" - "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/support/errors" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" @@ -38,16 +37,14 @@ func (s *IngestHistoryRangeStateTestSuite) SetupTest() { s.historyAdapter = &mockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} s.system = &system{ - ctx: s.ctx, - historyQ: s.historyQ, - historyAdapter: s.historyAdapter, - ledgerBackend: s.ledgerBackend, - runner: s.runner, + ctx: s.ctx, + historyQ: s.historyQ, + historyAdapter: s.historyAdapter, + ledgerBackend: s.ledgerBackend, + runner: s.runner, + maxLedgerPerFlush: 1, } s.system.initMetrics() - - s.historyQ.On("Rollback").Return(nil).Once() - s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(nil).Once() } @@ -60,9 +57,7 @@ func (s *IngestHistoryRangeStateTestSuite) TearDownTest() { s.runner.AssertExpectations(t) } -func (s *IngestHistoryRangeStateTestSuite) TestInvalidRange() { - // Recreate mock in this single test to remove Rollback assertion. - *s.historyQ = mockDBQ{} +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidRange() { *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} next, err := historyRangeState{fromLedger: 0, toLedger: 0}.run(s.system) @@ -86,11 +81,18 @@ func (s *IngestHistoryRangeStateTestSuite) TestInvalidRange() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestRangeNotPreparedFailPrepare() { - // Recreate mock in this single test to remove assertions. - *s.historyQ = mockDBQ{} +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeInvalidMaxFlush() { *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} + s.system.maxLedgerPerFlush = 0 + next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) + s.Assert().Error(err) + s.Assert().EqualError(err, "invalid maxLedgerPerFlush, must be greater than 0") + s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) +} + +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeFailPrepare() { + *s.ledgerBackend = ledgerbackend.MockDatabaseBackend{} s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(100)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(100)).Return(errors.New("my error")).Once() @@ -100,10 +102,7 @@ func (s *IngestHistoryRangeStateTestSuite) TestRangeNotPreparedFailPrepare() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestBeginReturnsError() { - // Recreate mock in this single test to remove Rollback assertion. - *s.historyQ = mockDBQ{} - +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeBeginReturnsError() { s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -112,8 +111,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestBeginReturnsError() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestGetLastLedgerIngestReturnsError() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeGetLastLedgerIngestReturnsError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) @@ -122,8 +122,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestGetLastLedgerIngestReturnsError() s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestGetLatestLedgerReturnsError() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeGetLatestLedgerReturnsError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(0), errors.New("my error")).Once() @@ -135,8 +136,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestGetLatestLedgerReturnsError() { // TestAnotherNodeIngested tests the case when another node has ingested the range. // In such case we go back to `init` state without processing. -func (s *IngestHistoryRangeStateTestSuite) TestAnotherNodeIngested() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeAnotherNodeIngested() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(200), nil).Once() @@ -145,8 +147,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestAnotherNodeIngested() { s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerReturnsError() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeRunTransactionProcessorsReturnsError() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() @@ -154,28 +157,24 @@ func (s *IngestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerR V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ - LedgerSeq: 100, + LedgerSeq: xdr.Uint32(100), }, }, }, } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - errors.New("my error"), - ).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once() next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) - s.Assert().EqualError(err, "error processing ledger sequence=100: my error") + s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error") s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestSuccess() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccess() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() @@ -190,56 +189,58 @@ func (s *IngestHistoryRangeStateTestSuite) TestSuccess() { }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() - - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } s.historyQ.On("Commit").Return(nil).Once() + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().NoError(err) s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestSuccessOneLedger() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessWithFlushMax() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(100), + firstLedgersBatch := []xdr.LedgerCloseMeta{} + secondLedgersBatch := []xdr.LedgerCloseMeta{} + for i := 100; i <= 200; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, }, }, - }, + } + if i-60 < 100 { + firstLedgersBatch = append(firstLedgersBatch, meta) + } else { + secondLedgersBatch = append(secondLedgersBatch, meta) + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() - - s.historyQ.On("Commit").Return(nil).Once() - - next, err := historyRangeState{fromLedger: 100, toLedger: 100}.run(s.system) + s.system.maxLedgerPerFlush = 60 + next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().NoError(err) s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) } -func (s *IngestHistoryRangeStateTestSuite) TestCommitsWorkOnLedgerBackendFailure() { +func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeCommitsWorkOnLedgerBackendFailure() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ @@ -251,19 +252,11 @@ func (s *IngestHistoryRangeStateTestSuite) TestCommitsWorkOnLedgerBackendFailure }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() s.ledgerBackend.On("GetLedger", s.ctx, uint32(101)). Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() - - s.historyQ.On("Commit").Return(nil).Once() - - next, err := historyRangeState{fromLedger: 100, toLedger: 102}.run(s.system) + next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system) s.Assert().Error(err) s.Assert().EqualError(err, "error getting ledger: my error") s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) @@ -290,17 +283,14 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { s.ledgerBackend = &mockLedgerBackend{} s.runner = &mockProcessorsRunner{} s.system = &system{ - ctx: s.ctx, - historyQ: s.historyQ, - historyAdapter: s.historyAdapter, - ledgerBackend: s.ledgerBackend, - runner: s.runner, + ctx: s.ctx, + historyQ: s.historyQ, + historyAdapter: s.historyAdapter, + ledgerBackend: s.ledgerBackend, + runner: s.runner, + maxLedgerPerFlush: 1, } - s.historyQ.On("GetTx").Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() } @@ -311,10 +301,9 @@ func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() { s.runner.AssertExpectations(t) } -func (s *ReingestHistoryRangeStateTestSuite) TestInvalidRange() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvalidRange() { // Recreate mock in this single test to remove Rollback assertion. - *s.historyQ = mockDBQ{} - + s.historyQ = &mockDBQ{} err := s.system.ReingestRange([]history.LedgerRange{{0, 0}}, false) s.Assert().EqualError(err, "Invalid range: {0 0} genesis ledger starts at 1") @@ -328,76 +317,69 @@ func (s *ReingestHistoryRangeStateTestSuite) TestInvalidRange() { s.Assert().EqualError(err, "Invalid range: {100 99} from > to") } -func (s *ReingestHistoryRangeStateTestSuite) TestBeginReturnsError() { - // Recreate mock in this single test to remove Rollback assertion. - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil) +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvalidMaxFlush() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.system.maxLedgerPerFlush = 0 + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + s.Assert().EqualError(err, "invalid maxLedgerPerFlush, must be greater than 0") +} +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateBeginReturnsError() { + // Recreate mock in this single test to remove Rollback assertion. + s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "Error starting a transaction: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestGetLastLedgerIngestNonBlockingError() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() - +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateGetLastLedgerIngestNonBlockingError() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "Error getting last ingested ledger: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeOverlaps() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() - +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRangeOverlaps() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(190), nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().Equal(ErrReingestRangeConflict{190}, err) } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeOverlapsAtEnd() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() - +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStatRangeOverlapsAtEnd() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(200), nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().Equal(ErrReingestRangeConflict{200}, err) } -func (s *ReingestHistoryRangeStateTestSuite) TestClearHistoryFails() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateClearHistoryFails() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(101, 0, 0) + // the state deletes range once, calc'd by toid.LedgerRangeInclusive(), which adjusts to = to + 1 + toidTo := toid.New(201, 0, 0) s.historyQ.On( "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(errors.New("my error")).Once() - s.historyQ.On("Rollback").Return(nil).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "error in DeleteRangeAll: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerReturnsError() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTransactionProcessorsReturnsError() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(101, 0, 0) + toidTo := toid.New(201, 0, 0) s.historyQ.On( "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() @@ -411,74 +393,59 @@ func (s *ReingestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedge }, }, } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - s.runner.On("RunTransactionProcessorsOnLedger", meta). - Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - errors.New("my error"), - ).Once() - s.historyQ.On("Rollback").Return(nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) - s.Assert().EqualError(err, "error processing ledger sequence=100: my error") + s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestCommitFails() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommitFails() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() - s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("Commit").Return(errors.New("my error")).Once() + toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(101, 0, 0) + toidTo := toid.New(201, 0, 0) s.historyQ.On( "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - meta := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(100), + for i := uint32(100); i <= uint32(200); i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, }, }, - }, + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } - s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() - - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() - - s.historyQ.On("Commit").Return(errors.New("my error")).Once() - s.historyQ.On("Rollback").Return(nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().EqualError(err, "Error committing db transaction: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { - *s.historyQ = mockDBQ{} - s.historyQ.On("GetTx").Return(nil).Once() +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccess() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + s.historyQ.On("Commit").Return(nil).Once() + s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() for i := uint32(100); i <= uint32(200); i++ { - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() - - toidFrom := toid.New(int32(i), 0, 0) - toidTo := toid.New(int32(i+1), 0, 0) - s.historyQ.On( - "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), - ).Return(nil).Once() - meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ @@ -489,26 +456,64 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + } - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + s.Assert().NoError(err) +} - s.historyQ.On("Commit").Return(nil).Once() - s.historyQ.On("Rollback").Return(nil).Once() - } +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccessWithFlushMax() { + s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() + firstLedgersBatch := []xdr.LedgerCloseMeta{} + secondLedgersBatch := []xdr.LedgerCloseMeta{} + for i := uint32(100); i <= uint32(200); i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + } + // triggers two flushes, one for max during loop and one for remainder + if i-60 < 100 { + firstLedgersBatch = append(firstLedgersBatch, meta) + } else { + secondLedgersBatch = append(secondLedgersBatch, meta) + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + } + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() + s.system.maxLedgerPerFlush = 60 err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) s.Assert().NoError(err) } -func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSuccessOneLedger() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("Commit").Return(nil).Once() + s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(100), 0).Return(nil).Once() + // Recreate mock in this single ledger test to remove setup assertion on ledger range. + *s.ledgerBackend = mockLedgerBackend{} + s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 100)).Return(nil).Once() toidFrom := toid.New(100, 0, 0) toidTo := toid.New(101, 0, 0) @@ -526,35 +531,61 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { }, } - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() - s.historyQ.On("Commit").Return(nil).Once() - s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(100), 0).Return(nil).Once() - - // Recreate mock in this single test to remove previous assertion. - *s.ledgerBackend = mockLedgerBackend{} - s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 100)).Return(nil).Once() s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false) s.Assert().NoError(err) } -func (s *ReingestHistoryRangeStateTestSuite) TestGetLastLedgerIngestError() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceGetLastLedgerIngestError() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), errors.New("my error")).Once() err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) s.Assert().EqualError(err, "Error getting last ingested ledger: my error") } -func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() { +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() + s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() + + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + + for i := 100; i <= 200; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + } + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + s.Assert().NoError(err) +} + +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceLedgerRetrievalError() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() toidFrom := toid.New(100, 0, 0) toidTo := toid.New(201, 0, 0) @@ -562,7 +593,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() { "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - for i := 100; i <= 200; i++ { + for i := 100; i <= 105; i++ { meta := xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ @@ -573,19 +604,88 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() { }, } s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() + } + + s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() + + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + s.Assert().EqualError(err, "error getting ledger: my error") +} + +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceLedgerRetrievalAndCommitError() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() + s.historyQ.On("Commit").Return(errors.New("commit error")).Once() + + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() - s.runner.On("RunTransactionProcessorsOnLedger", meta).Return( - processors.StatsLedgerTransactionProcessorResults{}, - processorsRunDurations{}, - processors.TradeStats{}, - nil, - ).Once() + for i := 100; i <= 105; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } - s.historyQ.On("Commit").Return(nil).Once() + s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() + + // system.maxLedgerPerFlush has been set by default to 1 in test suite setup + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + s.Assert().EqualError(err, "Error committing db transaction: error getting ledger: my error") +} +func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceWithFlushMax() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once() + s.historyQ.On("Commit").Return(nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once() + toidFrom := toid.New(100, 0, 0) + toidTo := toid.New(201, 0, 0) + s.historyQ.On( + "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() + + firstLedgersBatch := []xdr.LedgerCloseMeta{} + secondLedgersBatch := []xdr.LedgerCloseMeta{} + for i := 100; i <= 200; i++ { + meta := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(i), + }, + }, + }, + } + // triggers two flushes, one for max during loop and one for remainder + if i-60 < 100 { + firstLedgersBatch = append(firstLedgersBatch, meta) + } else { + secondLedgersBatch = append(secondLedgersBatch, meta) + } + s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once() + } + s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() + + s.system.maxLedgerPerFlush = 60 err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) s.Assert().NoError(err) } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 45f32e36fa..0369f3a69d 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -68,6 +68,10 @@ const ( defaultCoreCursorName = "HORIZON" stateVerificationErrorThreshold = 3 + + // 100 ledgers per flush has shown in stress tests + // to be best point on performance curve, default to that. + MaxLedgersPerFlush uint32 = 100 ) var log = logpkg.DefaultLogger.WithField("service", "ingest") @@ -102,6 +106,7 @@ type Config struct { RoundingSlippageFilter int EnableIngestionFiltering bool + MaxLedgerPerFlush uint32 } // LocalCaptiveCoreEnabled returns true if configured to run @@ -214,7 +219,8 @@ type system struct { runStateVerificationOnLedger func(uint32) bool - reapOffsets map[string]int64 + reapOffsets map[string]int64 + maxLedgerPerFlush uint32 currentStateMutex sync.Mutex currentState State @@ -278,6 +284,11 @@ func NewSystem(config Config) (System, error) { historyAdapter := newHistoryArchiveAdapter(archive) filters := filters.NewFilters() + maxLedgersPerFlush := config.MaxLedgerPerFlush + if maxLedgersPerFlush < 1 { + maxLedgersPerFlush = MaxLedgersPerFlush + } + system := &system{ cancel: cancel, config: config, @@ -304,6 +315,7 @@ func NewSystem(config Config) (System, error) { config.CheckpointFrequency, config.StateVerificationCheckpointFrequency, ), + maxLedgerPerFlush: maxLedgersPerFlush, } system.initMetrics() diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index be2687b8e7..3258db4899 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -106,6 +106,7 @@ func TestNewSystem(t *testing.T) { assert.Equal(t, config, system.runner.(*ProcessorRunner).config) assert.Equal(t, system.ctx, system.runner.(*ProcessorRunner).ctx) + assert.Equal(t, system.maxLedgerPerFlush, MaxLedgersPerFlush) } func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { @@ -542,6 +543,11 @@ func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedger(ledger xdr.Ledge args.Error(3) } +func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error { + args := m.Called(ledgers) + return args.Error(0) +} + var _ ProcessorRunnerInterface = (*mockProcessorsRunner)(nil) type mockStellarCoreClient struct { diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index cff8960c1d..fe7894d737 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" + logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -46,10 +47,6 @@ func (statsChangeProcessor) Commit(ctx context.Context) error { return nil } -type statsLedgerTransactionProcessor struct { - *processors.StatsLedgerTransactionProcessor -} - type ledgerStats struct { changeStats ingest.StatsChangeProcessorResults changeDurations processorsRunDurations @@ -75,6 +72,7 @@ type ProcessorRunnerInterface interface { tradeStats processors.TradeStats, err error, ) + RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( stats ledgerStats, err error, @@ -132,22 +130,16 @@ func buildChangeProcessor( }) } -func (s *ProcessorRunner) buildTransactionProcessor( - ledgerTransactionStats *processors.StatsLedgerTransactionProcessor, - tradeProcessor *processors.TradeProcessor, - ledgersProcessor *processors.LedgersProcessor, -) *groupTransactionProcessors { +func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor) *groupTransactionProcessors { accountLoader := history.NewAccountLoader() assetLoader := history.NewAssetLoader() lpLoader := history.NewLiquidityPoolLoader() cbLoader := history.NewClaimableBalanceLoader() lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader} + statsLedgerTransactionProcessor := processors.NewStatsLedgerTransactionProcessor() - statsLedgerTransactionProcessor := &statsLedgerTransactionProcessor{ - StatsLedgerTransactionProcessor: ledgerTransactionStats, - } - *tradeProcessor = *processors.NewTradeProcessor(accountLoader, + tradeProcessor := processors.NewTradeProcessor(accountLoader, lpLoader, assetLoader, s.historyQ.NewTradeBatchInsertBuilder()) processors := []horizonTransactionProcessor{ @@ -164,7 +156,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( processors.NewLiquidityPoolsTransactionProcessor(lpLoader, s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())} - return newGroupTransactionProcessors(processors, lazyLoaders) + return newGroupTransactionProcessors(processors, lazyLoaders, statsLedgerTransactionProcessor, tradeProcessor) } func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers { @@ -184,7 +176,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessor p = append(p, txSubProc) } - return newGroupTransactionProcessors(p, nil) + return newGroupTransactionProcessors(p, nil, nil, nil) } // checkIfProtocolVersionSupported checks if this Horizon version supports the @@ -317,49 +309,155 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( return nil } -func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( - transactionStats processors.StatsLedgerTransactionProcessorResults, - transactionDurations processorsRunDurations, - tradeStats processors.TradeStats, - err error, -) { +func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta, + groupFilterers *groupTransactionFilterers, + groupFilteredOutProcessors *groupTransactionProcessors, + groupProcessors *groupTransactionProcessors) error { var ( - ledgerTransactionStats processors.StatsLedgerTransactionProcessor - tradeProcessor processors.TradeProcessor - transactionReader *ingest.LedgerTransactionReader + transactionReader *ingest.LedgerTransactionReader ) - if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil { + if err := s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil { err = errors.Wrap(err, "Error while checking for supported protocol version") - return + return err } - // ensure capture of the ledger to history regardless of whether it has transactions. - ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) - ledgersProcessor.ProcessLedger(ledger) - - transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) + startTime := time.Now() + transactionReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) if err != nil { err = errors.Wrap(err, "Error creating ledger reader") - return + return err } + err = processors.StreamLedgerTransactions(s.ctx, + groupFilterers, + groupFilteredOutProcessors, + groupProcessors, + transactionReader, + ledger, + ) + if err != nil { + return errors.Wrap(err, "Error streaming changes from ledger") + } + + transactionStats := groupProcessors.transactionStatsProcessor.GetResults() + transactionStats.TransactionsFiltered = groupFilterers.droppedTransactions + + tradeStats := groupProcessors.tradeProcessor.GetStats() + + log.WithFields(transactionStats.Map()). + WithFields(tradeStats.Map()). + WithFields(logpkg.F{ + "sequence": ledger.LedgerSequence(), + "state": false, + "ledger": true, + "commit": false, + "duration": time.Since(startTime).Seconds(), + }).Info("Processed ledger") + + return nil +} + +func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( + transactionStats processors.StatsLedgerTransactionProcessorResults, + transactionDurations processorsRunDurations, + tradeStats processors.TradeStats, + err error, +) { + // ensure capture of the ledger to history regardless of whether it has transactions. + ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) + ledgersProcessor.ProcessLedger(ledger) + groupTransactionFilterers := s.buildTransactionFilterer() groupFilteredOutProcessors := s.buildFilteredOutProcessor() - groupTransactionProcessors := s.buildTransactionProcessor( - &ledgerTransactionStats, &tradeProcessor, ledgersProcessor) - err = processors.StreamLedgerTransactions(s.ctx, + groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + + err = s.streamLedger(ledger, groupTransactionFilterers, groupFilteredOutProcessors, groupTransactionProcessors, - transactionReader, - ledger, ) if err != nil { err = errors.Wrap(err, "Error streaming changes from ledger") return } + err = s.flushProcessors(groupFilteredOutProcessors, groupTransactionProcessors) + if err != nil { + return + } + + transactionStats = groupTransactionProcessors.transactionStatsProcessor.GetResults() + transactionStats.TransactionsFiltered = groupTransactionFilterers.droppedTransactions + + transactionDurations = groupTransactionProcessors.processorsRunDurations + for key, duration := range groupFilteredOutProcessors.processorsRunDurations { + transactionDurations[key] = duration + } + for key, duration := range groupTransactionFilterers.processorsRunDurations { + transactionDurations[key] = duration + } + + tradeStats = groupTransactionProcessors.tradeProcessor.GetStats() + + return +} + +func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) (err error) { + ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) + + groupTransactionFilterers := s.buildTransactionFilterer() + groupFilteredOutProcessors := s.buildFilteredOutProcessor() + groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + + startTime := time.Now() + curHeap, sysHeap := getMemStats() + log.WithFields(logpkg.F{ + "currentHeapSizeMB": curHeap, + "systemHeapSizeMB": sysHeap, + "ledgerBatchSize": len(ledgers), + "state": false, + "ledger": true, + "commit": false, + }).Infof("Running processors for batch of %v ledgers", len(ledgers)) + + for _, ledger := range ledgers { + // ensure capture of the ledger to history regardless of whether it has transactions. + ledgersProcessor.ProcessLedger(ledger) + + err = s.streamLedger(ledger, + groupTransactionFilterers, + groupFilteredOutProcessors, + groupTransactionProcessors, + ) + if err != nil { + err = errors.Wrap(err, "Error streaming changes during ledger batch") + return + } + groupTransactionProcessors.ResetStats() + groupFilteredOutProcessors.ResetStats() + groupTransactionFilterers.ResetStats() + } + + err = s.flushProcessors(groupFilteredOutProcessors, groupTransactionProcessors) + if err != nil { + return + } + curHeap, sysHeap = getMemStats() + log.WithFields(logpkg.F{ + "currentHeapSizeMB": curHeap, + "systemHeapSizeMB": sysHeap, + "ledgers": len(ledgers), + "state": false, + "ledger": true, + "commit": false, + "duration": time.Since(startTime).Seconds(), + }).Infof("Flushed processors for batch of %v ledgers", len(ledgers)) + + return nil +} + +func (s *ProcessorRunner) flushProcessors(groupFilteredOutProcessors *groupTransactionProcessors, groupTransactionProcessors *groupTransactionProcessors) (err error) { if s.config.EnableIngestionFiltering { err = groupFilteredOutProcessors.Flush(s.ctx, s.session) if err != nil { @@ -374,20 +472,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos err = groupTransactionProcessors.Flush(s.ctx, s.session) if err != nil { err = errors.Wrap(err, "Error flushing changes from processor") - return } - - transactionStats = ledgerTransactionStats.GetResults() - transactionStats.TransactionsFiltered = groupTransactionFilterers.droppedTransactions - transactionDurations = groupTransactionProcessors.processorsRunDurations - for key, duration := range groupFilteredOutProcessors.processorsRunDurations { - transactionDurations[key] = duration - } - for key, duration := range groupTransactionFilterers.processorsRunDurations { - transactionDurations[key] = duration - } - - tradeStats = tradeProcessor.GetStats() return } @@ -414,11 +499,13 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( return } + transactionStats, transactionDurations, tradeStats, err := s.RunTransactionProcessorsOnLedger(ledger) + stats.changeStats = changeStatsProcessor.GetResults() stats.changeDurations = groupChangeProcessors.processorsRunDurations - - stats.transactionStats, stats.transactionDurations, stats.tradeStats, err = - s.RunTransactionProcessorsOnLedger(ledger) + stats.transactionStats = transactionStats + stats.transactionDurations = transactionDurations + stats.tradeStats = tradeStats return } diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index fc82ed3cfe..66bb316d18 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -251,14 +251,11 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { historyQ: q, } - stats := &processors.StatsLedgerTransactionProcessor{} - trades := &processors.TradeProcessor{} - ledgersProcessor := &processors.LedgersProcessor{} - processor := runner.buildTransactionProcessor(stats, trades, ledgersProcessor) + processor := runner.buildTransactionProcessor(ledgersProcessor) assert.IsType(t, &groupTransactionProcessors{}, processor) - assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0]) + assert.IsType(t, &processors.StatsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1]) assert.IsType(t, &processors.LedgersProcessor{}, processor.processors[2]) assert.IsType(t, &processors.OperationProcessor{}, processor.processors[3]) @@ -290,17 +287,16 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { }, } - // Batches mockTransactionsFilteredTmpBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockTransactionsFilteredTmpBatchInsertBuilder) mockTransactionsFilteredTmpBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). Return(mockTransactionsFilteredTmpBatchInsertBuilder) - q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) - defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx)...) + defer mock.AssertExpectationsForObjects(t, mockTxProcessorBatchBuilders(q, mockSession, ctx)...) + defer mock.AssertExpectationsForObjects(t, mockChangeProcessorBatchBuilders(q, ctx, true)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -312,6 +308,7 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { ctx, mockSession, ).Return(nil) + defer mock.AssertExpectationsForObjects(t, mockBatchInsertBuilder) runner := ProcessorRunner{ ctx: ctx, @@ -347,18 +344,21 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { } // Batches - defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx)...) + defer mock.AssertExpectationsForObjects(t, mockTxProcessorBatchBuilders(q, mockSession, ctx)...) + defer mock.AssertExpectationsForObjects(t, mockChangeProcessorBatchBuilders(q, ctx, true)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) mockBatchInsertBuilder.On( "Add", - ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil) + ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil).Once() mockBatchInsertBuilder.On( "Exec", ctx, mockSession, - ).Return(nil) + ).Return(nil).Once() + + defer mock.AssertExpectationsForObjects(t, mockBatchInsertBuilder) runner := ProcessorRunner{ ctx: ctx, @@ -372,6 +372,84 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { assert.NoError(t, err) } +func TestProcessorRunnerRunTransactionsProcessorsOnLedgers(t *testing.T) { + ctx := context.Background() + + config := Config{ + NetworkPassphrase: network.PublicNetworkPassphrase, + } + + mockSession := &db.MockSession{} + q := &mockDBQ{} + defer mock.AssertExpectationsForObjects(t, q) + + ledgers := []xdr.LedgerCloseMeta{{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + BucketListHash: xdr.Hash([32]byte{0, 1, 2}), + LedgerSeq: xdr.Uint32(1), + }, + }, + }, + }, + { + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + BucketListHash: xdr.Hash([32]byte{3, 4, 5}), + LedgerSeq: xdr.Uint32(2), + }, + }, + }, + }, + { + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + BucketListHash: xdr.Hash([32]byte{6, 7, 8}), + LedgerSeq: xdr.Uint32(3), + }, + }, + }, + }, + } + + // Batches + defer mock.AssertExpectationsForObjects(t, mockTxProcessorBatchBuilders(q, mockSession, ctx)...) + + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} + q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) + mockBatchInsertBuilder.On( + "Add", + ledgers[0].V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil).Once() + mockBatchInsertBuilder.On( + "Add", + ledgers[1].V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil).Once() + mockBatchInsertBuilder.On( + "Add", + ledgers[2].V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil).Once() + + mockBatchInsertBuilder.On( + "Exec", + ctx, + mockSession, + ).Return(nil).Once() + + defer mock.AssertExpectationsForObjects(t, mockBatchInsertBuilder) + + runner := ProcessorRunner{ + ctx: ctx, + config: config, + historyQ: q, + session: mockSession, + filters: &MockFilters{}, + } + + err := runner.RunTransactionProcessorsOnLedgers(ledgers) + assert.NoError(t, err) +} + func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *testing.T) { ctx := context.Background() maxBatchSize := 100000 @@ -426,62 +504,68 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t ) } -func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context) []interface{} { +func mockTxProcessorBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context) []interface{} { + // no mocking of builder Add methods needed, the fake ledgers used in tests don't have any operations + // that would trigger the respective processors to invoke Add, each test locally decides to use + // MockLedgersBatchInsertBuilder with asserts on Add invocations, as those are fired once per ledger. + mockTradeBatchInsertBuilder := &history.MockTradeBatchInsertBuilder{} + q.On("NewTradeBatchInsertBuilder").Return(mockTradeBatchInsertBuilder).Once() + mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder) + Return(mockTransactionsBatchInsertBuilder).Once() mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(mockOperationsBatchInsertBuilder).Twice() + Return(mockOperationsBatchInsertBuilder).Once() mockEffectBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} mockEffectBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQEffects.On("NewEffectBatchInsertBuilder"). - Return(mockEffectBatchInsertBuilder) + Return(mockEffectBatchInsertBuilder).Once() mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{} - mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.On("NewTransactionParticipantsBatchInsertBuilder"). - Return(mockTransactionsParticipantsBatchInsertBuilder) + Return(mockTransactionsParticipantsBatchInsertBuilder).Once() mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{} - mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.On("NewOperationParticipantBatchInsertBuilder"). - Return(mockOperationParticipantBatchInsertBuilder) + Return(mockOperationParticipantBatchInsertBuilder).Once() mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{} - mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). - Return(mockTransactionClaimableBalanceBatchInsertBuilder) + Return(mockTransactionClaimableBalanceBatchInsertBuilder).Once() mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{} - mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). - Return(mockOperationClaimableBalanceBatchInsertBuilder) + Return(mockOperationClaimableBalanceBatchInsertBuilder).Once() mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{} - mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). - Return(mockTransactionLiquidityPoolBatchInsertBuilder) + Return(mockTransactionLiquidityPoolBatchInsertBuilder).Once() mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{} - mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). - Return(mockOperationLiquidityPoolBatchInsertBuilder) + Return(mockOperationLiquidityPoolBatchInsertBuilder).Once() - q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) - - return append([]interface{}{ - mockOperationsBatchInsertBuilder, + return []interface{}{mockTradeBatchInsertBuilder, mockTransactionsBatchInsertBuilder, + mockOperationsBatchInsertBuilder, + mockEffectBatchInsertBuilder, + mockTransactionsParticipantsBatchInsertBuilder, + mockOperationParticipantBatchInsertBuilder, mockTransactionClaimableBalanceBatchInsertBuilder, mockOperationClaimableBalanceBatchInsertBuilder, mockTransactionLiquidityPoolBatchInsertBuilder, - mockOperationLiquidityPoolBatchInsertBuilder, - }, mockChangeProcessorBatchBuilders(q, ctx, true)[:]...) + mockOperationLiquidityPoolBatchInsertBuilder} } func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec bool) []interface{} { diff --git a/services/horizon/internal/ingest/processors/ledgers_processor.go b/services/horizon/internal/ingest/processors/ledgers_processor.go index 942a5f8522..5ecfb2ad7f 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor.go @@ -82,7 +82,7 @@ func (p *LedgersProcessor) Flush(ctx context.Context, session db.SessionInterfac } if err := p.batch.Exec(ctx, session); err != nil { - return errors.Wrapf(err, "error committing ledgers %d - %d", min, max) + return errors.Wrapf(err, "error flushing ledgers %d - %d", min, max) } return nil diff --git a/services/horizon/internal/ingest/processors/ledgers_processor_test.go b/services/horizon/internal/ingest/processors/ledgers_processor_test.go index 308bb6995d..1df6640266 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor_test.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor_test.go @@ -220,6 +220,6 @@ func (s *LedgersProcessorTestSuiteLedger) TestExecFails() { s.Assert().EqualError(s.processor.Flush( context.Background(), s.mockSession), - "error committing ledgers 20 - 20: transient exec error", + "error flushing ledgers 20 - 20: transient exec error", ) } diff --git a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go index 1937331d6a..8fa886ab52 100644 --- a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go +++ b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go @@ -15,6 +15,10 @@ type StatsLedgerTransactionProcessor struct { results StatsLedgerTransactionProcessorResults } +func NewStatsLedgerTransactionProcessor() *StatsLedgerTransactionProcessor { + return &StatsLedgerTransactionProcessor{} +} + // StatsLedgerTransactionProcessorResults contains results after running StatsLedgerTransactionProcessor. type StatsLedgerTransactionProcessorResults struct { Transactions int64 @@ -179,5 +183,9 @@ func (stats *StatsLedgerTransactionProcessorResults) Map() map[string]interface{ } } +func (p *StatsLedgerTransactionProcessor) ResetStats() { + p.results = StatsLedgerTransactionProcessorResults{} +} + // Ensure the StatsChangeProcessor conforms to the ChangeProcessor interface. var _ LedgerTransactionProcessor = (*StatsLedgerTransactionProcessor)(nil) diff --git a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go index c7fc6d7967..1f2f8c370e 100644 --- a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go @@ -44,8 +44,38 @@ func TestStatsLedgerTransactionProcessoAllOpTypesCovered(t *testing.T) { assert.Panics(t, f) } +func TestStatsLedgerTransactionProcessorReset(t *testing.T) { + processor := NewStatsLedgerTransactionProcessor() + lcm := xdr.LedgerCloseMeta{} + + assert.NoError(t, processor.ProcessTransaction(lcm, ingest.LedgerTransaction{ + Result: xdr.TransactionResultPair{ + Result: xdr.TransactionResult{ + Result: xdr.TransactionResultResult{ + Code: xdr.TransactionResultCodeTxSuccess, + }, + }, + }, + Envelope: xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + {Body: xdr.OperationBody{Type: xdr.OperationTypeCreateAccount}}, + {Body: xdr.OperationBody{Type: xdr.OperationTypePayment}}, + }, + }, + }, + }, + })) + + assert.Equal(t, processor.GetResults().Operations, int64(2)) + processor.ResetStats() + assert.Equal(t, processor.GetResults().Operations, int64(0)) +} + func TestStatsLedgerTransactionProcessor(t *testing.T) { - processor := &StatsLedgerTransactionProcessor{} + processor := NewStatsLedgerTransactionProcessor() lcm := xdr.LedgerCloseMeta{} // Successful diff --git a/services/horizon/internal/ingest/processors/trades_processor.go b/services/horizon/internal/ingest/processors/trades_processor.go index 2cb702e14b..66c6aeb8d3 100644 --- a/services/horizon/internal/ingest/processors/trades_processor.go +++ b/services/horizon/internal/ingest/processors/trades_processor.go @@ -48,6 +48,7 @@ type TradeStats struct { func (p *TradeProcessor) GetStats() TradeStats { return p.stats } + func (stats *TradeStats) Map() map[string]interface{} { return map[string]interface{}{ "stats_count": stats.count, @@ -460,3 +461,7 @@ func (p *TradeProcessor) extractTrades( return result, nil } + +func (p *TradeProcessor) ResetStats() { + p.stats = TradeStats{} +} diff --git a/services/horizon/internal/ingest/processors/trades_processor_test.go b/services/horizon/internal/ingest/processors/trades_processor_test.go index 5b2a2f20e3..8a7733f4d1 100644 --- a/services/horizon/internal/ingest/processors/trades_processor_test.go +++ b/services/horizon/internal/ingest/processors/trades_processor_test.go @@ -757,6 +757,9 @@ func (s *TradeProcessorTestSuiteLedger) TestIngestTradesSucceeds() { err := s.processor.Flush(ctx, s.mockSession) s.Assert().NoError(err) + s.Assert().Equal(s.processor.GetStats().count, int64(8)) + s.processor.ResetStats() + s.Assert().Equal(s.processor.GetStats().count, int64(0)) } func (s *TradeProcessorTestSuiteLedger) TestBatchAddError() {