diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index a5d5c52dde..70cb6e833c 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -6,9 +6,11 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased ### Fixed -- http archive requests include user agent and metrics ([5166](https://github.com/stellar/go/pull/5166)) +- Trade agg rebuild errors reported on `db reingest range` with parellel workers ([5168](https://github.com/stellar/go/pull/5168)) +- http archive requests include user agent ([5166](https://github.com/stellar/go/pull/5166)) ### Added +- http archive requests include metrics ([5166](https://github.com/stellar/go/pull/5166)) - Add a deprecation warning for using command-line flags when running Horizon ([5051](https://github.com/stellar/go/pull/5051)) ### Breaking Changes diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index a0d0e6c518..725df622b0 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -443,7 +443,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, } defer system.Shutdown() - err = system.ReingestRange(ledgerRanges, reingestForce) + err = system.ReingestRange(ledgerRanges, reingestForce, true) if err != nil { if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok { return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger. diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index d3831fca5c..59a1a7c969 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -499,7 +499,7 @@ func (r resumeState) run(s *system) (transition, error) { } rebuildStart := time.Now() - err = s.historyQ.RebuildTradeAggregationBuckets(s.ctx, ingestLedger, ingestLedger, s.config.RoundingSlippageFilter) + err = s.RebuildTradeAggregationBuckets(ingestLedger, ingestLedger) if err != nil { return retryResume(r), errors.Wrap(err, "error rebuilding trade aggregations") } @@ -741,7 +741,7 @@ func (v verifyRangeState) run(s *system) (transition, error) { Info("Processed ledger") } - err = s.historyQ.RebuildTradeAggregationBuckets(s.ctx, v.fromLedger, v.toLedger, s.config.RoundingSlippageFilter) + err = s.RebuildTradeAggregationBuckets(v.fromLedger, v.toLedger) if err != nil { return stop(), errors.Wrap(err, "error rebuilding trade aggregations") } diff --git a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go index e2e7724d68..832898d021 100644 --- a/services/horizon/internal/ingest/fsm_reingest_history_range_state.go +++ b/services/horizon/internal/ingest/fsm_reingest_history_range_state.go @@ -183,11 +183,6 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) { } } - err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter) - if err != nil { - return stop(), errors.Wrap(err, "Error rebuilding trade aggregations") - } - log.WithFields(logpkg.F{ "from": h.fromLedger, "to": h.toLedger, diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index 4598008eb8..4f7d2c4944 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -304,16 +304,16 @@ func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() { func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvalidRange() { // Recreate mock in this single test to remove Rollback assertion. s.historyQ = &mockDBQ{} - err := s.system.ReingestRange([]history.LedgerRange{{0, 0}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{0, 0}}, false, true) s.Assert().EqualError(err, "Invalid range: {0 0} genesis ledger starts at 1") - err = s.system.ReingestRange([]history.LedgerRange{{0, 100}}, false) + err = s.system.ReingestRange([]history.LedgerRange{{0, 100}}, false, true) s.Assert().EqualError(err, "Invalid range: {0 100} genesis ledger starts at 1") - err = s.system.ReingestRange([]history.LedgerRange{{100, 0}}, false) + err = s.system.ReingestRange([]history.LedgerRange{{100, 0}}, false, true) s.Assert().EqualError(err, "Invalid range: {100 0} from > to") - err = s.system.ReingestRange([]history.LedgerRange{{100, 99}}, false) + err = s.system.ReingestRange([]history.LedgerRange{{100, 99}}, false, true) s.Assert().EqualError(err, "Invalid range: {100 99} from > to") } @@ -323,7 +323,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvali s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() s.system.maxLedgerPerFlush = 0 - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().EqualError(err, "invalid maxLedgerPerFlush, must be greater than 0") } @@ -332,28 +332,28 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateBeginR s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once() s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().EqualError(err, "Error starting a transaction: my error") } func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateGetLastLedgerIngestNonBlockingError() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), errors.New("my error")).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().EqualError(err, "Error getting last ingested ledger: my error") } func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRangeOverlaps() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(190), nil).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().Equal(ErrReingestRangeConflict{190}, err) } func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStatRangeOverlapsAtEnd() { s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(200), nil).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().Equal(ErrReingestRangeConflict{200}, err) } @@ -369,7 +369,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateClearH "DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(errors.New("my error")).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().EqualError(err, "error in DeleteRangeAll: my error") } @@ -397,7 +397,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTra s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error") } @@ -428,7 +428,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommit s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() } - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().EqualError(err, "Error committing db transaction: my error") } @@ -460,7 +460,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces } // system.maxLedgerPerFlush has been set by default to 1 in test suite setup - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().NoError(err) } @@ -500,7 +500,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once() s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() s.system.maxLedgerPerFlush = 60 - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true) s.Assert().NoError(err) } @@ -534,7 +534,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once() s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false) + err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false, true) s.Assert().NoError(err) } @@ -543,7 +543,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceG s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), errors.New("my error")).Once() - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true) s.Assert().EqualError(err, "Error getting last ingested ledger: my error") } @@ -576,7 +576,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce( } // system.maxLedgerPerFlush has been set by default to 1 in test suite setup - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true) s.Assert().NoError(err) } @@ -610,7 +610,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() // system.maxLedgerPerFlush has been set by default to 1 in test suite setup - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true) s.Assert().EqualError(err, "error getting ledger: my error") } @@ -644,7 +644,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once() // system.maxLedgerPerFlush has been set by default to 1 in test suite setup - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true) s.Assert().EqualError(err, "Error committing db transaction: error getting ledger: my error") } @@ -686,6 +686,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once() s.system.maxLedgerPerFlush = 60 - err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true) + err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true) s.Assert().NoError(err) } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 508476a55b..f064c4695c 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -173,10 +173,11 @@ type System interface { StressTest(numTransactions, changesPerTransaction int) error VerifyRange(fromLedger, toLedger uint32, verifyState bool) error BuildState(sequence uint32, skipChecks bool) error - ReingestRange(ledgerRanges []history.LedgerRange, force bool) error + ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error BuildGenesisState() error Shutdown() GetCurrentState() State + RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error } type system struct { @@ -521,7 +522,7 @@ func validateRanges(ledgerRanges []history.LedgerRange) error { // ReingestRange runs the ingestion pipeline on the range of ledgers ingesting // history data only. -func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool) error { +func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error { if err := validateRanges(ledgerRanges); err != nil { return err } @@ -542,10 +543,20 @@ func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool) e if err != nil { return err } + if rebuildTradeAgg { + err = s.RebuildTradeAggregationBuckets(cur.StartSequence, cur.EndSequence) + if err != nil { + return errors.Wrap(err, "Error rebuilding trade aggregations") + } + } } return nil } +func (s *system) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error { + return s.historyQ.RebuildTradeAggregationBuckets(s.ctx, fromLedger, toLedger, s.config.RoundingSlippageFilter) +} + // BuildGenesisState runs the ingestion pipeline on genesis ledger. Transitions // to stopState when done. func (s *system) BuildGenesisState() error { diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 460c27e062..80b5a40ed1 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -592,8 +592,8 @@ func (m *mockSystem) BuildState(sequence uint32, skipChecks bool) error { return args.Error(0) } -func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force bool) error { - args := m.Called(ledgerRanges, force) +func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error { + args := m.Called(ledgerRanges, force, rebuildTradeAgg) return args.Error(0) } @@ -607,6 +607,11 @@ func (m *mockSystem) GetCurrentState() State { return args.Get(0).(State) } +func (m *mockSystem) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error { + args := m.Called(fromLedger, toLedger) + return args.Error(0) +} + func (m *mockSystem) Shutdown() { m.Called() } diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index 525f153b81..4f07c21cc4 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -2,6 +2,7 @@ package ingest import ( "fmt" + "math" "sync" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -61,7 +62,7 @@ func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, rei case <-stop: return rangeError{} case reingestRange := <-reingestJobQueue: - err := s.ReingestRange([]history.LedgerRange{reingestRange}, false) + err := s.ReingestRange([]history.LedgerRange{reingestRange}, false, false) if err != nil { return rangeError{ err: err, @@ -73,7 +74,24 @@ func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, rei } } -func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, stop <-chan struct{}, reingestJobQueue chan<- history.LedgerRange) { +func (ps *ParallelSystems) rebuildTradeAggRanges(ledgerRanges []history.LedgerRange) error { + s, err := ps.systemFactory(ps.config) + if err != nil { + return err + } + + for _, cur := range ledgerRanges { + err := s.RebuildTradeAggregationBuckets(cur.StartSequence, cur.EndSequence) + if err != nil { + return errors.Wrapf(err, "Error rebuilding trade aggregations for range start=%v, stop=%v", cur.StartSequence, cur.EndSequence) + } + } + return nil +} + +// returns the lowest ledger to start from of all ledgerRanges +func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, stop <-chan struct{}, reingestJobQueue chan<- history.LedgerRange) uint32 { + lowestLedger := uint32(math.MaxUint32) for _, cur := range ledgerRanges { for subRangeFrom := cur.StartSequence; subRangeFrom < cur.EndSequence; { // job queuing @@ -83,12 +101,16 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, } select { case <-stop: - return + return lowestLedger case reingestJobQueue <- history.LedgerRange{StartSequence: subRangeFrom, EndSequence: subRangeTo}: } + if subRangeFrom < lowestLedger { + lowestLedger = subRangeFrom + } subRangeFrom = subRangeTo + 1 } } + return lowestLedger } func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { @@ -166,7 +188,7 @@ func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, bat }() } - enqueueReingestTasks(ledgerRanges, batchSize, stop, reingestJobQueue) + lowestLedger := enqueueReingestTasks(ledgerRanges, batchSize, stop, reingestJobQueue) stopOnce.Do(func() { close(stop) @@ -176,7 +198,13 @@ func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, bat if lowestRangeErr != nil { lastLedger := ledgerRanges[len(ledgerRanges)-1].EndSequence + if err := ps.rebuildTradeAggRanges([]history.LedgerRange{{StartSequence: lowestLedger, EndSequence: lowestRangeErr.ledgerRange.StartSequence}}); err != nil { + log.WithError(err).Errorf("error when trying to rebuild trade agg for partially completed portion of overall parallel reingestion range, start=%v, stop=%v", lowestLedger, lowestRangeErr.ledgerRange.StartSequence) + } return errors.Wrapf(lowestRangeErr, "job failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.StartSequence, lastLedger) } + if err := ps.rebuildTradeAggRanges(ledgerRanges); err != nil { + return err + } return nil } diff --git a/services/horizon/internal/ingest/parallel_test.go b/services/horizon/internal/ingest/parallel_test.go index 27ab0c459f..8004a4048c 100644 --- a/services/horizon/internal/ingest/parallel_test.go +++ b/services/horizon/internal/ingest/parallel_test.go @@ -31,7 +31,7 @@ func TestParallelReingestRange(t *testing.T) { m sync.Mutex ) result := &mockSystem{} - result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), mock.AnythingOfType("bool")).Run( + result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Run( func(args mock.Arguments) { m.Lock() defer m.Unlock() @@ -39,6 +39,7 @@ func TestParallelReingestRange(t *testing.T) { // simulate call time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) }).Return(error(nil)) + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(2050)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } @@ -59,6 +60,7 @@ func TestParallelReingestRange(t *testing.T) { rangesCalled = nil system, err = newParallelSystems(config, 1, factory) assert.NoError(t, err) + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1024)).Return(nil).Once() err = system.ReingestRange([]history.LedgerRange{{1, 1024}}, 64) result.AssertExpectations(t) expected = []history.LedgerRange{ @@ -75,8 +77,10 @@ func TestParallelReingestRangeError(t *testing.T) { config := Config{} result := &mockSystem{} // Fail on the second range - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, mock.AnythingOfType("bool")).Return(errors.New("failed because of foo")) - result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), mock.AnythingOfType("bool")).Return(error(nil)) + result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Return(errors.New("failed because of foo")).Once() + result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(nil) + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1537)).Return(nil).Once() + factory := func(c Config) (System, error) { return result, nil } @@ -94,17 +98,18 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { wg.Add(1) result := &mockSystem{} // Fail on an lower subrange after the first error - result.On("ReingestRange", []history.LedgerRange{{1025, 1280}}, mock.AnythingOfType("bool")).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{1025, 1280}}, false, false).Run(func(mock.Arguments) { // Wait for a more recent range to error wg.Wait() // This sleep should help making sure the result of this range is processed later than the one below // (there are no guarantees without instrumenting ReingestRange(), but that's too complicated) time.Sleep(50 * time.Millisecond) - }).Return(errors.New("failed because of foo")) - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, mock.AnythingOfType("bool")).Run(func(mock.Arguments) { + }).Return(errors.New("failed because of foo")).Once() + result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Run(func(mock.Arguments) { wg.Done() - }).Return(errors.New("failed because of bar")) - result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), mock.AnythingOfType("bool")).Return(error(nil)) + }).Return(errors.New("failed because of bar")).Once() + result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(error(nil)) + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1025)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil