Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tradeagg rebuild from reingest command with parallel workers #5168

Merged
merged 5 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Fixed
- Trade agg rebuild errors reported on `db reingest range` with parellel workers ([5168](https://github.com/stellar/go/pull/5168))

### Added

- Add a deprecation warning for using command-line flags when running Horizon ([5051](https://github.com/stellar/go/pull/5051))
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
}
defer system.Shutdown()

err = system.ReingestRange(ledgerRanges, reingestForce)
err = system.ReingestRange(ledgerRanges, reingestForce, true)
if err != nil {
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (r resumeState) run(s *system) (transition, error) {
}

rebuildStart := time.Now()
err = s.historyQ.RebuildTradeAggregationBuckets(s.ctx, ingestLedger, ingestLedger, s.config.RoundingSlippageFilter)
err = s.RebuildTradeAggregationBuckets(ingestLedger, ingestLedger)
if err != nil {
return retryResume(r), errors.Wrap(err, "error rebuilding trade aggregations")
}
Expand Down Expand Up @@ -713,7 +713,7 @@ func (v verifyRangeState) run(s *system) (transition, error) {
Info("Processed ledger")
}

err = s.historyQ.RebuildTradeAggregationBuckets(s.ctx, v.fromLedger, v.toLedger, s.config.RoundingSlippageFilter)
err = s.RebuildTradeAggregationBuckets(v.fromLedger, v.toLedger)
if err != nil {
return stop(), errors.Wrap(err, "error rebuilding trade aggregations")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) {
}
}

err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter)
if err != nil {
return stop(), errors.Wrap(err, "Error rebuilding trade aggregations")
}

log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
Expand Down
40 changes: 20 additions & 20 deletions services/horizon/internal/ingest/ingest_history_range_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,16 @@ func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() {
func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvalidRange() {
// Recreate mock in this single test to remove Rollback assertion.
s.historyQ = &mockDBQ{}
err := s.system.ReingestRange([]history.LedgerRange{{0, 0}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{0, 0}}, false, true)
s.Assert().EqualError(err, "Invalid range: {0 0} genesis ledger starts at 1")

err = s.system.ReingestRange([]history.LedgerRange{{0, 100}}, false)
err = s.system.ReingestRange([]history.LedgerRange{{0, 100}}, false, true)
s.Assert().EqualError(err, "Invalid range: {0 100} genesis ledger starts at 1")

err = s.system.ReingestRange([]history.LedgerRange{{100, 0}}, false)
err = s.system.ReingestRange([]history.LedgerRange{{100, 0}}, false, true)
s.Assert().EqualError(err, "Invalid range: {100 0} from > to")

err = s.system.ReingestRange([]history.LedgerRange{{100, 99}}, false)
err = s.system.ReingestRange([]history.LedgerRange{{100, 99}}, false, true)
s.Assert().EqualError(err, "Invalid range: {100 99} from > to")
}

Expand All @@ -323,7 +323,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvali
s.historyQ.On("Rollback").Return(nil).Once()
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.system.maxLedgerPerFlush = 0
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "invalid maxLedgerPerFlush, must be greater than 0")
}

Expand All @@ -332,28 +332,28 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateBeginR
s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once()
s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "Error starting a transaction: my error")
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateGetLastLedgerIngestNonBlockingError() {
s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "Error getting last ingested ledger: my error")
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRangeOverlaps() {
s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(190), nil).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().Equal(ErrReingestRangeConflict{190}, err)
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStatRangeOverlapsAtEnd() {
s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(200), nil).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().Equal(ErrReingestRangeConflict{200}, err)
}

Expand All @@ -369,7 +369,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateClearH
"DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "error in DeleteRangeAll: my error")
}

Expand Down Expand Up @@ -397,7 +397,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTra
s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error")
}

Expand Down Expand Up @@ -428,7 +428,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommit
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
}

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "Error committing db transaction: my error")
}

Expand Down Expand Up @@ -460,7 +460,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces
}

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -500,7 +500,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces
s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once()
s.system.maxLedgerPerFlush = 60
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -534,7 +534,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces
s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false, true)
s.Assert().NoError(err)
}

Expand All @@ -543,7 +543,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceG
s.historyQ.On("Rollback").Return(nil).Once()
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().EqualError(err, "Error getting last ingested ledger: my error")
}

Expand Down Expand Up @@ -576,7 +576,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce(
}

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -610,7 +610,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL
s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once()

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().EqualError(err, "error getting ledger: my error")
}

Expand Down Expand Up @@ -644,7 +644,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL
s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once()

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().EqualError(err, "Error committing db transaction: error getting ledger: my error")
}

Expand Down Expand Up @@ -686,6 +686,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW
s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once()

s.system.maxLedgerPerFlush = 60
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().NoError(err)
}
15 changes: 13 additions & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ type System interface {
StressTest(numTransactions, changesPerTransaction int) error
VerifyRange(fromLedger, toLedger uint32, verifyState bool) error
BuildState(sequence uint32, skipChecks bool) error
ReingestRange(ledgerRanges []history.LedgerRange, force bool) error
ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradAgg bool) error
sreuland marked this conversation as resolved.
Show resolved Hide resolved
BuildGenesisState() error
Shutdown()
GetCurrentState() State
RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error
}

type system struct {
Expand Down Expand Up @@ -509,7 +510,7 @@ func validateRanges(ledgerRanges []history.LedgerRange) error {

// ReingestRange runs the ingestion pipeline on the range of ledgers ingesting
// history data only.
func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool) error {
func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error {
if err := validateRanges(ledgerRanges); err != nil {
return err
}
Expand All @@ -530,10 +531,20 @@ func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool) e
if err != nil {
return err
}
if rebuildTradeAgg {
err = s.RebuildTradeAggregationBuckets(cur.StartSequence, cur.EndSequence)
if err != nil {
return errors.Wrap(err, "Error rebuilding trade aggregations")
}
}
}
return nil
}

func (s *system) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error {
return s.historyQ.RebuildTradeAggregationBuckets(s.ctx, fromLedger, toLedger, s.config.RoundingSlippageFilter)
}

// BuildGenesisState runs the ingestion pipeline on genesis ledger. Transitions
// to stopState when done.
func (s *system) BuildGenesisState() error {
Expand Down
9 changes: 7 additions & 2 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,8 @@ func (m *mockSystem) BuildState(sequence uint32, skipChecks bool) error {
return args.Error(0)
}

func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force bool) error {
args := m.Called(ledgerRanges, force)
func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error {
args := m.Called(ledgerRanges, force, rebuildTradeAgg)
return args.Error(0)
}

Expand All @@ -607,6 +607,11 @@ func (m *mockSystem) GetCurrentState() State {
return args.Get(0).(State)
}

func (m *mockSystem) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error {
args := m.Called(fromLedger, toLedger)
return args.Error(0)
}

func (m *mockSystem) Shutdown() {
m.Called()
}
Expand Down
36 changes: 32 additions & 4 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"fmt"
"math"
"sync"

"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -61,7 +62,7 @@
case <-stop:
return rangeError{}
case reingestRange := <-reingestJobQueue:
err := s.ReingestRange([]history.LedgerRange{reingestRange}, false)
err := s.ReingestRange([]history.LedgerRange{reingestRange}, false, false)
if err != nil {
return rangeError{
err: err,
Expand All @@ -73,7 +74,24 @@
}
}

func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, stop <-chan struct{}, reingestJobQueue chan<- history.LedgerRange) {
func (ps *ParallelSystems) rebuildTradeAggRanges(ledgerRanges []history.LedgerRange) error {
s, err := ps.systemFactory(ps.config)
if err != nil {
return err
}

for _, cur := range ledgerRanges {
err := s.RebuildTradeAggregationBuckets(cur.StartSequence, cur.EndSequence)
if err != nil {
return errors.Wrapf(err, "Error rebuilding trade aggregations for range start=%v, stop=%v", cur.StartSequence, cur.EndSequence)
}
}
return nil
}

// returns the lowest ledger to start from of all ledgerRanges
func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, stop <-chan struct{}, reingestJobQueue chan<- history.LedgerRange) uint32 {

Check failure on line 93 in services/horizon/internal/ingest/parallel.go

View workflow job for this annotation

GitHub Actions / golangci

line is 155 characters (lll)
lowestLedger := uint32(math.MaxUint32)
for _, cur := range ledgerRanges {
for subRangeFrom := cur.StartSequence; subRangeFrom < cur.EndSequence; {
// job queuing
Expand All @@ -83,12 +101,16 @@
}
select {
case <-stop:
return
return lowestLedger
case reingestJobQueue <- history.LedgerRange{StartSequence: subRangeFrom, EndSequence: subRangeTo}:
}
if subRangeFrom < lowestLedger {
lowestLedger = subRangeFrom
}
subRangeFrom = subRangeTo + 1
}
}
return lowestLedger
}

func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 {
Expand Down Expand Up @@ -166,7 +188,7 @@
}()
}

enqueueReingestTasks(ledgerRanges, batchSize, stop, reingestJobQueue)
lowestLedger := enqueueReingestTasks(ledgerRanges, batchSize, stop, reingestJobQueue)

stopOnce.Do(func() {
close(stop)
Expand All @@ -176,7 +198,13 @@

if lowestRangeErr != nil {
lastLedger := ledgerRanges[len(ledgerRanges)-1].EndSequence
if err := ps.rebuildTradeAggRanges([]history.LedgerRange{{StartSequence: lowestLedger, EndSequence: lowestRangeErr.ledgerRange.StartSequence}}); err != nil {

Check failure on line 201 in services/horizon/internal/ingest/parallel.go

View workflow job for this annotation

GitHub Actions / golangci

line is 159 characters (lll)
log.WithError(err).Errorf("error when trying to rebuild trade agg for partially completed portion of overall parallel reingestion range, start=%v, stop=%v", lowestLedger, lowestRangeErr.ledgerRange.StartSequence)

Check failure on line 202 in services/horizon/internal/ingest/parallel.go

View workflow job for this annotation

GitHub Actions / golangci

line is 215 characters (lll)
}
return errors.Wrapf(lowestRangeErr, "job failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.StartSequence, lastLedger)
}
if err := ps.rebuildTradeAggRanges(ledgerRanges); err != nil {
return err
}
return nil
}
Loading
Loading