From f96cbce3b42e9ceaf1683f2590ce459d0c6f4c16 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 18 Jun 2020 20:14:54 +0200 Subject: [PATCH 01/23] Parallelize `db reingest range` This change breaks down the ledger range to reingest in subranges which are submitted to a pre-defined number of workers, processing the subranges in parallel. For now, the workers are simply Go routines using their own `System` (with their own DB connections etc ...). In the future workers could be fully fledged Horizon instances running in multiple machines (e.g. orchestrated through Kubernetes Jobs or AWS Batch Jobs). New flags: --parallel-workers: [optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers --parallel-job-size: [optional] parallel workers will run jobs processing ledger batches of the supplied size --- services/horizon/cmd/db.go | 61 ++++- services/horizon/internal/app.go | 2 +- .../internal/expingest/build_state_test.go | 4 +- .../internal/expingest/db_integration_test.go | 5 +- services/horizon/internal/expingest/fsm.go | 28 +-- .../ingest_history_range_state_test.go | 8 +- .../internal/expingest/init_state_test.go | 4 +- services/horizon/internal/expingest/main.go | 217 +++++++++++++++--- .../horizon/internal/expingest/main_test.go | 105 ++++++++- .../internal/expingest/resume_state_test.go | 4 +- .../horizon/internal/expingest/stress_test.go | 4 +- services/horizon/internal/expingest/verify.go | 4 +- .../expingest/verify_range_state_test.go | 4 +- services/horizon/internal/init.go | 6 +- 14 files changed, 372 insertions(+), 84 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 119ff21a13..591619b28b 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/stellar/go/services/horizon/internal/db2/schema" "github.com/stellar/go/services/horizon/internal/expingest" support "github.com/stellar/go/support/config" @@ -122,16 +123,36 @@ var dbReingestCmd = &cobra.Command{ }, } -var reingestForce bool +var ( + reingestForce bool + parallelWorkers uint + parallelJobSize uint32 +) var reingestRangeCmdOpts = []*support.ConfigOption{ - &support.ConfigOption{ + { Name: "force", ConfigKey: &reingestForce, OptType: types.Bool, Required: false, FlagDefault: false, Usage: "[optional] if this flag is set, horizon will be blocked " + - "from ingesting until the reingestion command completes", + "from ingesting until the reingestion command completes (incompatible with --parallel-workers > 1)", + }, + { + Name: "parallel-workers", + ConfigKey: ¶llelWorkers, + OptType: types.Uint, + Required: false, + FlagDefault: uint(1), + Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", + }, + { + Name: "parallel-job-size", + ConfigKey: ¶llelJobSize, + OptType: types.Uint32, + Required: false, + FlagDefault: uint32(256), + Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", }, } @@ -144,6 +165,9 @@ var dbReingestRangeCmd = &cobra.Command{ co.Require() co.SetValue() } + if reingestForce && parallelWorkers > 1 { + log.Fatal("--force is incompatible with --parallel-workers > 1") + } if len(args) != 2 { cmd.Usage() @@ -182,16 +206,31 @@ var dbReingestRangeCmd = &cobra.Command{ ingestConfig.StellarCorePath = config.StellarCoreBinaryPath } - system, err := expingest.NewSystem(ingestConfig) - if err != nil { - log.Fatal(err) + if parallelWorkers < 2 { + system, err := expingest.NewSystem(ingestConfig) + if err != nil { + log.Fatal(err) + } + + err = system.ReingestRange( + argsInt32[0], + argsInt32[1], + reingestForce, + ) + } else { + system, err := expingest.NewParallelSystems(ingestConfig, parallelWorkers) + if err != nil { + log.Fatal(err) + } + defer system.Shutdown() + + err = system.ReingestRange( + argsInt32[0], + argsInt32[1], + parallelJobSize, + ) } - err = system.ReingestRange( - argsInt32[0], - argsInt32[1], - reingestForce, - ) if err == nil { hlog.Info("Range run successfully!") return diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index e26ba84a7f..b9695f6027 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -69,7 +69,7 @@ type App struct { orderBookStream *expingest.OrderBookStream submitter *txsub.System paths paths.Finder - expingester *expingest.System + expingester expingest.System reaper *reap.System ticks *time.Ticker diff --git a/services/horizon/internal/expingest/build_state_test.go b/services/horizon/internal/expingest/build_state_test.go index f984f9d779..f17eca2773 100644 --- a/services/horizon/internal/expingest/build_state_test.go +++ b/services/horizon/internal/expingest/build_state_test.go @@ -19,7 +19,7 @@ type BuildStateTestSuite struct { suite.Suite historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter - system *System + system *system runner *mockProcessorsRunner stellarCoreClient *mockStellarCoreClient checkpointLedger uint32 @@ -33,7 +33,7 @@ func (s *BuildStateTestSuite) SetupTest() { s.stellarCoreClient = &mockStellarCoreClient{} s.checkpointLedger = uint32(63) s.lastLedger = 0 - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, diff --git a/services/horizon/internal/expingest/db_integration_test.go b/services/horizon/internal/expingest/db_integration_test.go index fc4c5b9634..9045f8134d 100644 --- a/services/horizon/internal/expingest/db_integration_test.go +++ b/services/horizon/internal/expingest/db_integration_test.go @@ -60,7 +60,7 @@ type DBTestSuite struct { sequence uint32 ledgerBackend *ledgerbackend.MockDatabaseBackend historyAdapter *adapters.MockHistoryArchiveAdapter - system *System + system *system tt *test.T } @@ -77,7 +77,7 @@ func (s *DBTestSuite) SetupTest() { s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} var err error - s.system, err = NewSystem(Config{ + sIface, err := NewSystem(Config{ CoreSession: s.tt.CoreSession(), HistorySession: s.tt.HorizonSession(), HistoryArchiveURL: "http://ignore.test", @@ -85,6 +85,7 @@ func (s *DBTestSuite) SetupTest() { DisableStateVerification: false, }) s.Assert().NoError(err) + s.system = sIface.(*system) s.sequence = uint32(28660351) s.setupMocksForBuildState() diff --git a/services/horizon/internal/expingest/fsm.go b/services/horizon/internal/expingest/fsm.go index b2dc95b579..f4c4cb059b 100644 --- a/services/horizon/internal/expingest/fsm.go +++ b/services/horizon/internal/expingest/fsm.go @@ -18,7 +18,7 @@ var ( ) type stateMachineNode interface { - run(*System) (transition, error) + run(*system) (transition, error) String() string } @@ -92,7 +92,7 @@ func (stopState) String() string { return "stop" } -func (stopState) run(s *System) (transition, error) { +func (stopState) run(s *system) (transition, error) { return stop(), errors.New("Cannot run terminal state") } @@ -102,7 +102,7 @@ func (startState) String() string { return "start" } -func (startState) run(s *System) (transition, error) { +func (startState) run(s *system) (transition, error) { if err := s.historyQ.Begin(); err != nil { return start(), errors.Wrap(err, "Error starting a transaction") } @@ -213,7 +213,7 @@ func (b buildState) String() string { return fmt.Sprintf("buildFromCheckpoint(checkpointLedger=%d)", b.checkpointLedger) } -func (b buildState) run(s *System) (transition, error) { +func (b buildState) run(s *system) (transition, error) { if b.checkpointLedger == 0 { return start(), errors.New("unexpected checkpointLedger value") } @@ -307,7 +307,7 @@ func (r resumeState) String() string { return fmt.Sprintf("resume(latestSuccessfullyProcessedLedger=%d)", r.latestSuccessfullyProcessedLedger) } -func (r resumeState) run(s *System) (transition, error) { +func (r resumeState) run(s *system) (transition, error) { if r.latestSuccessfullyProcessedLedger == 0 { return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value") } @@ -404,7 +404,7 @@ func (r resumeState) run(s *System) (transition, error) { } duration := time.Since(startTime) - s.Metrics.LedgerIngestionTimer.Update(duration) + s.Metrics().LedgerIngestionTimer.Update(duration) log. WithFields(changeStats.Map()). WithFields(ledgerTransactionStats.Map()). @@ -436,7 +436,7 @@ func (h historyRangeState) String() string { } // historyRangeState is used when catching up history data -func (h historyRangeState) run(s *System) (transition, error) { +func (h historyRangeState) run(s *system) (transition, error) { if h.fromLedger == 0 || h.toLedger == 0 || h.fromLedger > h.toLedger { return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) @@ -478,7 +478,7 @@ func (h historyRangeState) run(s *System) (transition, error) { return start(), nil } -func runTransactionProcessorsOnLedger(s *System, ledger uint32) error { +func runTransactionProcessorsOnLedger(s *system, ledger uint32) error { log.WithFields(logpkg.F{ "sequence": ledger, "state": false, @@ -520,7 +520,7 @@ func (h reingestHistoryRangeState) String() string { ) } -func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger uint32) error { +func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error { if s.historyQ.GetTx() == nil { return errors.New("expected transaction to be present") } @@ -549,7 +549,7 @@ func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger u } // reingestHistoryRangeState is used as a command to reingest historical data -func (h reingestHistoryRangeState) run(s *System) (transition, error) { +func (h reingestHistoryRangeState) run(s *system) (transition, error) { if h.fromLedger == 0 || h.toLedger == 0 || h.fromLedger > h.toLedger { return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) @@ -634,7 +634,7 @@ func (waitForCheckpointState) String() string { return "waitForCheckpoint" } -func (waitForCheckpointState) run(*System) (transition, error) { +func (waitForCheckpointState) run(*system) (transition, error) { log.Info("Waiting for the next checkpoint...") time.Sleep(10 * time.Second) return start(), nil @@ -655,7 +655,7 @@ func (v verifyRangeState) String() string { ) } -func (v verifyRangeState) run(s *System) (transition, error) { +func (v verifyRangeState) run(s *system) (transition, error) { if v.fromLedger == 0 || v.toLedger == 0 || v.fromLedger > v.toLedger { return stop(), errors.Errorf("invalid range: [%d, %d]", v.fromLedger, v.toLedger) @@ -754,7 +754,7 @@ func (stressTestState) String() string { return "stressTest" } -func (stressTestState) run(s *System) (transition, error) { +func (stressTestState) run(s *system) (transition, error) { if err := s.historyQ.Begin(); err != nil { err = errors.Wrap(err, "Error starting a transaction") return stop(), err @@ -813,7 +813,7 @@ func (stressTestState) run(s *System) (transition, error) { return stop(), nil } -func (s *System) completeIngestion(ledger uint32) error { +func (s *system) completeIngestion(ledger uint32) error { if ledger == 0 { return errors.New("ledger must be positive") } diff --git a/services/horizon/internal/expingest/ingest_history_range_state_test.go b/services/horizon/internal/expingest/ingest_history_range_state_test.go index d9cbc7e5ab..8c62e3e24c 100644 --- a/services/horizon/internal/expingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/expingest/ingest_history_range_state_test.go @@ -21,14 +21,14 @@ type IngestHistoryRangeStateTestSuite struct { historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner - system *System + system *system } func (s *IngestHistoryRangeStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, @@ -168,7 +168,7 @@ type ReingestHistoryRangeStateTestSuite struct { historyAdapter *adapters.MockHistoryArchiveAdapter ledgerBackend *mockLedgerBackend runner *mockProcessorsRunner - system *System + system *system } func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { @@ -176,7 +176,7 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.ledgerBackend = &mockLedgerBackend{} s.runner = &mockProcessorsRunner{} - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, diff --git a/services/horizon/internal/expingest/init_state_test.go b/services/horizon/internal/expingest/init_state_test.go index a27e345261..7e2773686b 100644 --- a/services/horizon/internal/expingest/init_state_test.go +++ b/services/horizon/internal/expingest/init_state_test.go @@ -17,13 +17,13 @@ type InitStateTestSuite struct { suite.Suite historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter - system *System + system *system } func (s *InitStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index ba5f390feb..c1de154c47 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -5,10 +5,12 @@ package expingest import ( "context" + "fmt" "sync" "time" "github.com/rcrowley/go-metrics" + "github.com/stellar/go/clients/stellarcore" "github.com/stellar/go/exp/ingest/adapters" ingesterrors "github.com/stellar/go/exp/ingest/errors" @@ -80,23 +82,33 @@ type stellarCoreClient interface { SetCursor(ctx context.Context, id string, cursor int32) error } -type System struct { - Metrics struct { - // LedgerIngestionTimer exposes timing metrics about the rate and - // duration of ledger ingestion (including updating DB and graph). - LedgerIngestionTimer metrics.Timer +type Metrics struct { + // LedgerIngestionTimer exposes timing metrics about the rate and + // duration of ledger ingestion (including updating DB and graph). + LedgerIngestionTimer metrics.Timer - // LedgerInMemoryIngestionTimer exposes timing metrics about the rate and - // duration of ingestion into in-memory graph only. - LedgerInMemoryIngestionTimer metrics.Timer + // LedgerInMemoryIngestionTimer exposes timing metrics about the rate and + // duration of ingestion into in-memory graph only. + LedgerInMemoryIngestionTimer metrics.Timer - // StateVerifyTimer exposes timing metrics about the rate and - // duration of state verification. - StateVerifyTimer metrics.Timer - } + // StateVerifyTimer exposes timing metrics about the rate and + // duration of state verification. + StateVerifyTimer metrics.Timer +} + +type System interface { + Run() + Metrics() Metrics + StressTest(numTransactions, changesPerTransaction int) error + VerifyRange(fromLedger, toLedger uint32, verifyState bool) error + ReingestRange(fromLedger, toLedger uint32, force bool) error + Shutdown() +} - ctx context.Context - cancel context.CancelFunc +type system struct { + metrics Metrics + ctx context.Context + cancel context.CancelFunc config Config @@ -120,7 +132,7 @@ type System struct { disableStateVerification bool } -func NewSystem(config Config) (*System, error) { +func NewSystem(config Config) (System, error) { ctx, cancel := context.WithCancel(context.Background()) archive, err := historyarchive.Connect( @@ -157,7 +169,7 @@ func NewSystem(config Config) (*System, error) { historyAdapter := adapters.MakeHistoryArchiveAdapter(archive) - system := &System{ + system := &system{ ctx: ctx, cancel: cancel, historyAdapter: historyAdapter, @@ -182,10 +194,14 @@ func NewSystem(config Config) (*System, error) { return system, nil } -func (s *System) initMetrics() { - s.Metrics.LedgerIngestionTimer = metrics.NewTimer() - s.Metrics.LedgerInMemoryIngestionTimer = metrics.NewTimer() - s.Metrics.StateVerifyTimer = metrics.NewTimer() +func (s *system) initMetrics() { + s.metrics.LedgerIngestionTimer = metrics.NewTimer() + s.metrics.LedgerInMemoryIngestionTimer = metrics.NewTimer() + s.metrics.StateVerifyTimer = metrics.NewTimer() +} + +func (s *system) Metrics() Metrics { + return s.metrics } // Run starts ingestion system. Ingestion system supports distributed ingestion @@ -217,11 +233,11 @@ func (s *System) initMetrics() { // a database. // * If instances is a NOT leader, it runs ledger pipeline without updating a // a database so order book graph is updated but database is not overwritten. -func (s *System) Run() { +func (s *system) Run() { s.runStateMachine(startState{}) } -func (s *System) StressTest(numTransactions, changesPerTransaction int) error { +func (s *system) StressTest(numTransactions, changesPerTransaction int) error { if numTransactions <= 0 { return errors.New("transactions must be positive") } @@ -239,7 +255,7 @@ func (s *System) StressTest(numTransactions, changesPerTransaction int) error { // VerifyRange runs the ingestion pipeline on the range of ledgers. When // verifyState is true it verifies the state when ingestion is complete. -func (s *System) VerifyRange(fromLedger, toLedger uint32, verifyState bool) error { +func (s *system) VerifyRange(fromLedger, toLedger uint32, verifyState bool) error { return s.runStateMachine(verifyRangeState{ fromLedger: fromLedger, toLedger: toLedger, @@ -249,7 +265,7 @@ func (s *System) VerifyRange(fromLedger, toLedger uint32, verifyState bool) erro // ReingestRange runs the ingestion pipeline on the range of ledgers ingesting // history data only. -func (s *System) ReingestRange(fromLedger, toLedger uint32, force bool) error { +func (s *system) ReingestRange(fromLedger, toLedger uint32, force bool) error { return s.runStateMachine(reingestHistoryRangeState{ fromLedger: fromLedger, toLedger: toLedger, @@ -257,7 +273,7 @@ func (s *System) ReingestRange(fromLedger, toLedger uint32, force bool) error { }) } -func (s *System) runStateMachine(cur stateMachineNode) error { +func (s *system) runStateMachine(cur stateMachineNode) error { defer func() { s.wg.Wait() }() @@ -310,7 +326,7 @@ func (s *System) runStateMachine(cur stateMachineNode) error { } } -func (s *System) maybeVerifyState(lastIngestedLedger uint32) { +func (s *system) maybeVerifyState(lastIngestedLedger uint32) { stateInvalid, err := s.historyQ.GetExpStateInvalid() if err != nil && !isCancelledError(err) { log.WithField("err", err).Error("Error getting state invalid value") @@ -348,20 +364,20 @@ func (s *System) maybeVerifyState(lastIngestedLedger uint32) { } } -func (s *System) incrementStateVerificationErrors() int { +func (s *system) incrementStateVerificationErrors() int { s.stateVerificationMutex.Lock() defer s.stateVerificationMutex.Unlock() s.stateVerificationErrors++ return s.stateVerificationErrors } -func (s *System) resetStateVerificationErrors() { +func (s *system) resetStateVerificationErrors() { s.stateVerificationMutex.Lock() defer s.stateVerificationMutex.Unlock() s.stateVerificationErrors = 0 } -func (s *System) updateCursor(ledgerSequence uint32) error { +func (s *system) updateCursor(ledgerSequence uint32) error { if s.stellarCoreClient == nil { return nil } @@ -381,7 +397,7 @@ func (s *System) updateCursor(ledgerSequence uint32) error { return nil } -func (s *System) Shutdown() { +func (s *system) Shutdown() { log.Info("Shutting down ingestion system...") s.stateVerificationMutex.Lock() defer s.stateVerificationMutex.Unlock() @@ -403,3 +419,144 @@ func isCancelledError(err error) bool { cause := errors.Cause(err) return cause == context.Canceled || cause == db.ErrCancelled } + +type ledgerRange struct { + from uint32 + to uint32 +} + +type rangeResult struct { + err error + requestedRange ledgerRange +} + +type ParallelSystems struct { + workerCount uint + reingestJobQueue chan ledgerRange + shutdown chan struct{} + wait sync.WaitGroup + reingestJobResult chan rangeResult +} + +func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { + return newParallelSystems(config, workerCount, NewSystem) +} + +// private version of NewParallel systems, allowing to inject a mock system +func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { + if workerCount < 1 { + return nil, errors.New("workerCount must be > 0") + } + + result := ParallelSystems{ + workerCount: workerCount, + reingestJobQueue: make(chan ledgerRange), + shutdown: make(chan struct{}), + reingestJobResult: make(chan rangeResult), + } + for i := uint(0); i < workerCount; i++ { + s, err := systemFactory(config) + if err != nil { + result.Shutdown() + return nil, errors.Wrap(err, "cannot create new system") + } + result.wait.Add(1) + go result.work(s) + } + return &result, nil +} + +func (ps *ParallelSystems) work(s System) { + defer func() { + s.Shutdown() + ps.wait.Done() + }() + for { + select { + case <-ps.shutdown: + return + case ledgerRange := <-ps.reingestJobQueue: + err := s.ReingestRange(ledgerRange.from, ledgerRange.to, false) + select { + case <-ps.shutdown: + return + case ps.reingestJobResult <- rangeResult{err, ledgerRange}: + } + } + } +} + +const ( + historyCheckpointLedgerInterval = 64 + minBatchSize = historyCheckpointLedgerInterval +) + +func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error { + rangeSize := toLedger - fromLedger + batchSize := batchSizeSuggestion + if rangeSize/batchSize < uint32(ps.workerCount) { + // let's try to make use of all the workers + batchSize = rangeSize / uint32(ps.workerCount) + } + + // Use a minimum batch size to make it worth it in terms of overhead + if batchSize < minBatchSize { + batchSize = minBatchSize + } + + // Also, round the batch size to the closest, lower or equal 64 multiple + batchSize = (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval + + pendingJobsCount := 0 + var result error + processSubRangeResult := func(subRangeResult rangeResult) { + pendingJobsCount-- + if result == nil && subRangeResult.err != nil { + // TODO: give account of what ledgers were correctly reingested? + errMsg := fmt.Sprintf("in subrange %d to %d", + subRangeResult.requestedRange.from, subRangeResult.requestedRange.to) + result = errors.Wrap(subRangeResult.err, errMsg) + } + } + + for subRangeFrom := fromLedger; subRangeFrom < toLedger && result == nil; { + // job queuing + subRangeTo := subRangeFrom + batchSize + if subRangeTo > toLedger { + subRangeTo = toLedger + } + subRange := ledgerRange{subRangeFrom, subRangeTo} + + select { + case <-ps.shutdown: + return errors.New("aborted") + case subRangeResult := <-ps.reingestJobResult: + processSubRangeResult(subRangeResult) + case ps.reingestJobQueue <- subRange: + pendingJobsCount++ + subRangeFrom = subRangeTo + } + } + + for pendingJobsCount > 0 { + // wait for any remaining running jobs to finish + select { + case <-ps.shutdown: + return errors.New("aborted") + case subRangeResult := <-ps.reingestJobResult: + processSubRangeResult(subRangeResult) + } + + } + + return result +} + +func (ps *ParallelSystems) Shutdown() { + if ps.shutdown != nil { + close(ps.shutdown) + ps.wait.Wait() + close(ps.reingestJobQueue) + close(ps.reingestJobResult) + } +} diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index 3baa1f333c..120e085be8 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -4,7 +4,11 @@ import ( "bytes" "context" "database/sql" + "math/rand" + "sort" + "sync" "testing" + "time" "github.com/jmoiron/sqlx" "github.com/stellar/go/exp/ingest/adapters" @@ -89,8 +93,9 @@ func TestNewSystem(t *testing.T) { HistoryArchiveURL: "https://history.stellar.org/prd/core-live/core_live_001", } - system, err := NewSystem(config) + sIface, err := NewSystem(config) assert.NoError(t, err) + system := sIface.(*system) assert.Equal(t, config, system.config) assert.Equal(t, config.DisableStateVerification, system.disableStateVerification) @@ -100,9 +105,61 @@ func TestNewSystem(t *testing.T) { assert.Equal(t, system.ctx, system.runner.(*ProcessorRunner).ctx) } +type sorteableRanges []ledgerRange + +func (s sorteableRanges) Len() int { return len(s) } +func (s sorteableRanges) Less(i, j int) bool { return s[i].from < s[j].from } +func (s sorteableRanges) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func TestParallelReingestRange(t *testing.T) { + config := Config{} + var ( + systems []*mockSystem + rangesCalled sorteableRanges + shutdowns int + m sync.Mutex + ) + factory := func(c Config) (System, error) { + result := &mockSystem{} + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Run( + func(args mock.Arguments) { + r := ledgerRange{ + from: args.Get(0).(uint32), + to: args.Get(1).(uint32), + } + m.Lock() + defer m.Unlock() + rangesCalled = append(rangesCalled, r) + // simulate call + time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) + }).Return(error(nil)) + result.On("Shutdown").Run(func(mock.Arguments) { + m.Lock() + defer m.Unlock() + shutdowns++ + }) + systems = append(systems, result) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.NoError(t, err) + + sort.Sort(rangesCalled) + expected := sorteableRanges{ + {from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280}, + {from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050}, + } + assert.Equal(t, expected, rangesCalled) + system.Shutdown() + assert.Equal(t, 3, shutdowns) + +} + func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: context.Background(), } @@ -116,7 +173,7 @@ func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { func TestStateMachineTransition(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: context.Background(), } @@ -133,7 +190,7 @@ func TestStateMachineTransition(t *testing.T) { func TestContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: ctx, } @@ -151,7 +208,7 @@ func TestContextCancel(t *testing.T) { // non-zero exit code. func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ ctx: context.Background(), historyQ: historyQ, } @@ -165,7 +222,7 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing. func TestMaybeVerifyStateGetExpStateInvalidDBErrCancelOrContextCanceled(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: context.Background(), } @@ -191,7 +248,7 @@ func TestMaybeVerifyStateGetExpStateInvalidDBErrCancelOrContextCanceled(t *testi } func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: context.Background(), } @@ -433,3 +490,37 @@ func (m *mockStellarCoreClient) SetCursor(ctx context.Context, id string, cursor } var _ stellarCoreClient = (*mockStellarCoreClient)(nil) + +type mockSystem struct { + mock.Mock +} + +func (m *mockSystem) Run() { + m.Called() +} + +func (m *mockSystem) Metrics() Metrics { + args := m.Called() + return args.Get(0).(Metrics) +} + +func (m *mockSystem) StressTest(numTransactions, changesPerTransaction int) error { + args := m.Called(numTransactions, changesPerTransaction) + return args.Error(0) +} + +func (m *mockSystem) VerifyRange(fromLedger, toLedger uint32, verifyState bool) error { + args := m.Called(fromLedger, toLedger, verifyState) + return args.Error(0) +} + +func (m *mockSystem) ReingestRange(fromLedger, toLedger uint32, force bool) error { + args := m.Called(fromLedger, toLedger, force) + return args.Error(0) +} + +func (m *mockSystem) Shutdown() { + m.Called() +} + +var _ System = (*mockSystem)(nil) diff --git a/services/horizon/internal/expingest/resume_state_test.go b/services/horizon/internal/expingest/resume_state_test.go index 04ec556be1..ff6ca22bd9 100644 --- a/services/horizon/internal/expingest/resume_state_test.go +++ b/services/horizon/internal/expingest/resume_state_test.go @@ -23,7 +23,7 @@ type ResumeTestTestSuite struct { historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner stellarCoreClient *mockStellarCoreClient - system *System + system *system } func (s *ResumeTestTestSuite) SetupTest() { @@ -32,7 +32,7 @@ func (s *ResumeTestTestSuite) SetupTest() { s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} s.stellarCoreClient = &mockStellarCoreClient{} - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, diff --git a/services/horizon/internal/expingest/stress_test.go b/services/horizon/internal/expingest/stress_test.go index c0d02f1b44..5ea22bf87f 100644 --- a/services/horizon/internal/expingest/stress_test.go +++ b/services/horizon/internal/expingest/stress_test.go @@ -18,14 +18,14 @@ type StressTestStateTestSuite struct { historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner - system *System + system *system } func (s *StressTestStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} - s.system = &System{ + s.system = &system{ historyQ: s.historyQ, historyAdapter: s.historyAdapter, runner: s.runner, diff --git a/services/horizon/internal/expingest/verify.go b/services/horizon/internal/expingest/verify.go index aa40fc9b68..8f8f24efdd 100644 --- a/services/horizon/internal/expingest/verify.go +++ b/services/horizon/internal/expingest/verify.go @@ -30,7 +30,7 @@ const stateVerifierExpectedIngestionVersion = 10 // verifyState is called as a go routine from pipeline post hook every 64 // ledgers. It checks if the state is correct. If another go routine is already // running it exits. -func (s *System) verifyState(verifyAgainstLatestCheckpoint bool) error { +func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { s.stateVerificationMutex.Lock() if s.stateVerificationRunning { log.Warn("State verification is already running...") @@ -57,7 +57,7 @@ func (s *System) verifyState(verifyAgainstLatestCheckpoint bool) error { defer func() { duration := time.Since(startTime) if updateMetrics { - s.Metrics.StateVerifyTimer.Update(duration) + s.Metrics().StateVerifyTimer.Update(duration) } log.WithField("duration", duration.Seconds()).Info("State verification finished") historyQ.Rollback() diff --git a/services/horizon/internal/expingest/verify_range_state_test.go b/services/horizon/internal/expingest/verify_range_state_test.go index a9cbcd99a8..be042c60f3 100644 --- a/services/horizon/internal/expingest/verify_range_state_test.go +++ b/services/horizon/internal/expingest/verify_range_state_test.go @@ -25,14 +25,14 @@ type VerifyRangeStateTestSuite struct { historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner - system *System + system *system } func (s *VerifyRangeStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} - s.system = &System{ + s.system = &system{ historyQ: s.historyQ, historyAdapter: s.historyAdapter, runner: s.runner, diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index e8dc49fd55..8da9860d3f 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -161,9 +161,9 @@ func initIngestMetrics(app *App) { if app.expingester == nil { return } - app.metrics.Register("ingest.ledger_ingestion", app.expingester.Metrics.LedgerIngestionTimer) - app.metrics.Register("ingest.ledger_in_memory_ingestion", app.expingester.Metrics.LedgerInMemoryIngestionTimer) - app.metrics.Register("ingest.state_verify", app.expingester.Metrics.StateVerifyTimer) + app.metrics.Register("ingest.ledger_ingestion", app.expingester.Metrics().LedgerIngestionTimer) + app.metrics.Register("ingest.ledger_in_memory_ingestion", app.expingester.Metrics().LedgerInMemoryIngestionTimer) + app.metrics.Register("ingest.state_verify", app.expingester.Metrics().StateVerifyTimer) } func initTxSubMetrics(app *App) { From 55cc50fe6867ad05f27b012eac33c59e1499b34b Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 22 Jun 2020 17:24:51 +0200 Subject: [PATCH 02/23] Remove job size (the larger, the faster it is) --- services/horizon/cmd/db.go | 11 +---- .../horizon/internal/expingest/main_test.go | 44 +++++++++++++++++++ 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 591619b28b..94c91f1b94 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -126,7 +126,6 @@ var dbReingestCmd = &cobra.Command{ var ( reingestForce bool parallelWorkers uint - parallelJobSize uint32 ) var reingestRangeCmdOpts = []*support.ConfigOption{ { @@ -146,14 +145,6 @@ var reingestRangeCmdOpts = []*support.ConfigOption{ FlagDefault: uint(1), Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", }, - { - Name: "parallel-job-size", - ConfigKey: ¶llelJobSize, - OptType: types.Uint32, - Required: false, - FlagDefault: uint32(256), - Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", - }, } var dbReingestRangeCmd = &cobra.Command{ @@ -227,7 +218,7 @@ var dbReingestRangeCmd = &cobra.Command{ err = system.ReingestRange( argsInt32[0], argsInt32[1], - parallelJobSize, + argsInt32[1]-argsInt32[0], ) } diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index 120e085be8..132a637b0c 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -112,6 +112,50 @@ func (s sorteableRanges) Less(i, j int) bool { return s[i].from < s[j].from } func (s sorteableRanges) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func TestParallelReingestRange(t *testing.T) { + config := Config{} + var ( + rangesCalled sorteableRanges + shutdowns int + m sync.Mutex + ) + factory := func(c Config) (System, error) { + result := &mockSystem{} + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Run( + func(args mock.Arguments) { + r := ledgerRange{ + from: args.Get(0).(uint32), + to: args.Get(1).(uint32), + } + m.Lock() + defer m.Unlock() + rangesCalled = append(rangesCalled, r) + // simulate call + time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) + }).Return(error(nil)) + result.On("Shutdown").Run(func(mock.Arguments) { + m.Lock() + defer m.Unlock() + shutdowns++ + }) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.NoError(t, err) + + sort.Sort(rangesCalled) + expected := sorteableRanges{ + {from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280}, + {from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050}, + } + assert.Equal(t, expected, rangesCalled) + system.Shutdown() + assert.Equal(t, 3, shutdowns) + +} + +func TestParallelReingestRangeError(t *testing.T) { config := Config{} var ( systems []*mockSystem From 71916fce9d8dc4b40f0ce0e1c71e35c64eb29b8d Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 22 Jun 2020 17:50:34 +0200 Subject: [PATCH 03/23] Test batch size calculation --- services/horizon/internal/expingest/main.go | 13 ++--- .../horizon/internal/expingest/main_test.go | 51 +++---------------- 2 files changed, 14 insertions(+), 50 deletions(-) diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index c1de154c47..ef47e48457 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -491,22 +491,23 @@ const ( minBatchSize = historyCheckpointLedgerInterval ) -func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error { - rangeSize := toLedger - fromLedger +func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { batchSize := batchSizeSuggestion - if rangeSize/batchSize < uint32(ps.workerCount) { + if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { // let's try to make use of all the workers - batchSize = rangeSize / uint32(ps.workerCount) + batchSize = rangeSize / uint32(workerCount) } - // Use a minimum batch size to make it worth it in terms of overhead if batchSize < minBatchSize { batchSize = minBatchSize } // Also, round the batch size to the closest, lower or equal 64 multiple - batchSize = (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval + return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval +} +func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error { + batchSize := calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) pendingJobsCount := 0 var result error processSubRangeResult := func(subRangeResult rangeResult) { diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index 132a637b0c..70f91608e8 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -155,50 +155,13 @@ func TestParallelReingestRange(t *testing.T) { } -func TestParallelReingestRangeError(t *testing.T) { - config := Config{} - var ( - systems []*mockSystem - rangesCalled sorteableRanges - shutdowns int - m sync.Mutex - ) - factory := func(c Config) (System, error) { - result := &mockSystem{} - result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Run( - func(args mock.Arguments) { - r := ledgerRange{ - from: args.Get(0).(uint32), - to: args.Get(1).(uint32), - } - m.Lock() - defer m.Unlock() - rangesCalled = append(rangesCalled, r) - // simulate call - time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) - }).Return(error(nil)) - result.On("Shutdown").Run(func(mock.Arguments) { - m.Lock() - defer m.Unlock() - shutdowns++ - }) - systems = append(systems, result) - return result, nil - } - system, err := newParallelSystems(config, 3, factory) - assert.NoError(t, err) - err = system.ReingestRange(0, 2050, 258) - assert.NoError(t, err) - - sort.Sort(rangesCalled) - expected := sorteableRanges{ - {from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280}, - {from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050}, - } - assert.Equal(t, expected, rangesCalled) - system.Shutdown() - assert.Equal(t, 3, shutdowns) - +func TestCalculateParallelLedgerBatchSize(t *testing.T) { + assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) } func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { From 9d7cdc173fd01b79368a932db286bbe087e7208b Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 22 Jun 2020 18:14:59 +0200 Subject: [PATCH 04/23] Update CHANGELOG --- services/horizon/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 68712e113f..e16c64d2b1 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -5,6 +5,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).x ## Unreleased +* Add `--parallel-workers` to `horizon db reingest range`. `--parallel-workers` will parallelize reingestion using the supplied number of workers cores to * Add transaction set operation count to `history_ledger`([#2690](https://github.com/stellar/go/pull/2690)). Extend ingestion to store the total number of operations in the transaction set and expose it in the ledger resource via `tx_set_operation_count`. This feature allow you to assess the used capacity of a transaction set. * Remove `--ingest-failed-transactions` flag. From now on Horizon will always ingest failed transactions. WARNING: if your application is using Horizon DB directly (not recommended!) remember that now it will also contain failed txs. ([#2702](https://github.com/stellar/go/pull/2702)). From 55b33392b1223c7340c62022b00bfdca6928af7e Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 22 Jun 2020 18:22:08 +0200 Subject: [PATCH 05/23] Appease go vet --- services/horizon/cmd/db.go | 8 ++++---- services/horizon/internal/expingest/main.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 94c91f1b94..d144b28019 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -198,9 +198,9 @@ var dbReingestRangeCmd = &cobra.Command{ } if parallelWorkers < 2 { - system, err := expingest.NewSystem(ingestConfig) + system, systemErr := expingest.NewSystem(ingestConfig) if err != nil { - log.Fatal(err) + log.Fatal(systemErr) } err = system.ReingestRange( @@ -209,9 +209,9 @@ var dbReingestRangeCmd = &cobra.Command{ reingestForce, ) } else { - system, err := expingest.NewParallelSystems(ingestConfig, parallelWorkers) + system, systemErr := expingest.NewParallelSystems(ingestConfig, parallelWorkers) if err != nil { - log.Fatal(err) + log.Fatal(systemErr) } defer system.Shutdown() diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index ef47e48457..1483d1f64d 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -475,12 +475,12 @@ func (ps *ParallelSystems) work(s System) { select { case <-ps.shutdown: return - case ledgerRange := <-ps.reingestJobQueue: - err := s.ReingestRange(ledgerRange.from, ledgerRange.to, false) + case reingestRange := <-ps.reingestJobQueue: + err := s.ReingestRange(reingestRange.from, reingestRange.to, false) select { case <-ps.shutdown: return - case ps.reingestJobResult <- rangeResult{err, ledgerRange}: + case ps.reingestJobResult <- rangeResult{err, reingestRange}: } } } From 0a6bb1e6669a0c24a70b3c9117507c3fb745b524 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 24 Jun 2020 15:08:07 +0200 Subject: [PATCH 06/23] Address review feedback --- services/horizon/CHANGELOG.md | 3 +- services/horizon/internal/expingest/main.go | 143 ---------------- .../horizon/internal/expingest/main_test.go | 68 +------- .../horizon/internal/expingest/parallel.go | 156 ++++++++++++++++++ .../internal/expingest/parallel_test.go | 102 ++++++++++++ 5 files changed, 263 insertions(+), 209 deletions(-) create mode 100644 services/horizon/internal/expingest/parallel.go create mode 100644 services/horizon/internal/expingest/parallel_test.go diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index e16c64d2b1..254244af33 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -5,7 +5,8 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).x ## Unreleased -* Add `--parallel-workers` to `horizon db reingest range`. `--parallel-workers` will parallelize reingestion using the supplied number of workers cores to +* Add `--parallel-workers` to `horizon db reingest range`. `--parallel +-workers` will parallelize reingestion using the supplied number of workers. * Add transaction set operation count to `history_ledger`([#2690](https://github.com/stellar/go/pull/2690)). Extend ingestion to store the total number of operations in the transaction set and expose it in the ledger resource via `tx_set_operation_count`. This feature allow you to assess the used capacity of a transaction set. * Remove `--ingest-failed-transactions` flag. From now on Horizon will always ingest failed transactions. WARNING: if your application is using Horizon DB directly (not recommended!) remember that now it will also contain failed txs. ([#2702](https://github.com/stellar/go/pull/2702)). diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 1483d1f64d..6fbe9e2b7c 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -5,7 +5,6 @@ package expingest import ( "context" - "fmt" "sync" "time" @@ -419,145 +418,3 @@ func isCancelledError(err error) bool { cause := errors.Cause(err) return cause == context.Canceled || cause == db.ErrCancelled } - -type ledgerRange struct { - from uint32 - to uint32 -} - -type rangeResult struct { - err error - requestedRange ledgerRange -} - -type ParallelSystems struct { - workerCount uint - reingestJobQueue chan ledgerRange - shutdown chan struct{} - wait sync.WaitGroup - reingestJobResult chan rangeResult -} - -func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { - return newParallelSystems(config, workerCount, NewSystem) -} - -// private version of NewParallel systems, allowing to inject a mock system -func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { - if workerCount < 1 { - return nil, errors.New("workerCount must be > 0") - } - - result := ParallelSystems{ - workerCount: workerCount, - reingestJobQueue: make(chan ledgerRange), - shutdown: make(chan struct{}), - reingestJobResult: make(chan rangeResult), - } - for i := uint(0); i < workerCount; i++ { - s, err := systemFactory(config) - if err != nil { - result.Shutdown() - return nil, errors.Wrap(err, "cannot create new system") - } - result.wait.Add(1) - go result.work(s) - } - return &result, nil -} - -func (ps *ParallelSystems) work(s System) { - defer func() { - s.Shutdown() - ps.wait.Done() - }() - for { - select { - case <-ps.shutdown: - return - case reingestRange := <-ps.reingestJobQueue: - err := s.ReingestRange(reingestRange.from, reingestRange.to, false) - select { - case <-ps.shutdown: - return - case ps.reingestJobResult <- rangeResult{err, reingestRange}: - } - } - } -} - -const ( - historyCheckpointLedgerInterval = 64 - minBatchSize = historyCheckpointLedgerInterval -) - -func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { - batchSize := batchSizeSuggestion - if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { - // let's try to make use of all the workers - batchSize = rangeSize / uint32(workerCount) - } - // Use a minimum batch size to make it worth it in terms of overhead - if batchSize < minBatchSize { - batchSize = minBatchSize - } - - // Also, round the batch size to the closest, lower or equal 64 multiple - return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval -} - -func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error { - batchSize := calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) - pendingJobsCount := 0 - var result error - processSubRangeResult := func(subRangeResult rangeResult) { - pendingJobsCount-- - if result == nil && subRangeResult.err != nil { - // TODO: give account of what ledgers were correctly reingested? - errMsg := fmt.Sprintf("in subrange %d to %d", - subRangeResult.requestedRange.from, subRangeResult.requestedRange.to) - result = errors.Wrap(subRangeResult.err, errMsg) - } - } - - for subRangeFrom := fromLedger; subRangeFrom < toLedger && result == nil; { - // job queuing - subRangeTo := subRangeFrom + batchSize - if subRangeTo > toLedger { - subRangeTo = toLedger - } - subRange := ledgerRange{subRangeFrom, subRangeTo} - - select { - case <-ps.shutdown: - return errors.New("aborted") - case subRangeResult := <-ps.reingestJobResult: - processSubRangeResult(subRangeResult) - case ps.reingestJobQueue <- subRange: - pendingJobsCount++ - subRangeFrom = subRangeTo - } - } - - for pendingJobsCount > 0 { - // wait for any remaining running jobs to finish - select { - case <-ps.shutdown: - return errors.New("aborted") - case subRangeResult := <-ps.reingestJobResult: - processSubRangeResult(subRangeResult) - } - - } - - return result -} - -func (ps *ParallelSystems) Shutdown() { - if ps.shutdown != nil { - close(ps.shutdown) - ps.wait.Wait() - close(ps.reingestJobQueue) - close(ps.reingestJobResult) - } -} diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index 70f91608e8..90eb1f6a97 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -4,13 +4,12 @@ import ( "bytes" "context" "database/sql" - "math/rand" - "sort" - "sync" "testing" - "time" "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stellar/go/exp/ingest/adapters" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/exp/ingest/ledgerbackend" @@ -19,8 +18,6 @@ import ( "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) var ( @@ -105,65 +102,6 @@ func TestNewSystem(t *testing.T) { assert.Equal(t, system.ctx, system.runner.(*ProcessorRunner).ctx) } -type sorteableRanges []ledgerRange - -func (s sorteableRanges) Len() int { return len(s) } -func (s sorteableRanges) Less(i, j int) bool { return s[i].from < s[j].from } -func (s sorteableRanges) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func TestParallelReingestRange(t *testing.T) { - config := Config{} - var ( - rangesCalled sorteableRanges - shutdowns int - m sync.Mutex - ) - factory := func(c Config) (System, error) { - result := &mockSystem{} - result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Run( - func(args mock.Arguments) { - r := ledgerRange{ - from: args.Get(0).(uint32), - to: args.Get(1).(uint32), - } - m.Lock() - defer m.Unlock() - rangesCalled = append(rangesCalled, r) - // simulate call - time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) - }).Return(error(nil)) - result.On("Shutdown").Run(func(mock.Arguments) { - m.Lock() - defer m.Unlock() - shutdowns++ - }) - return result, nil - } - system, err := newParallelSystems(config, 3, factory) - assert.NoError(t, err) - err = system.ReingestRange(0, 2050, 258) - assert.NoError(t, err) - - sort.Sort(rangesCalled) - expected := sorteableRanges{ - {from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280}, - {from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050}, - } - assert.Equal(t, expected, rangesCalled) - system.Shutdown() - assert.Equal(t, 3, shutdowns) - -} - -func TestCalculateParallelLedgerBatchSize(t *testing.T) { - assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) -} - func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { historyQ := &mockDBQ{} system := &system{ diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go new file mode 100644 index 0000000000..7630addcd6 --- /dev/null +++ b/services/horizon/internal/expingest/parallel.go @@ -0,0 +1,156 @@ +package expingest + +import ( + "fmt" + "math" + "sync" + + "github.com/stellar/go/support/errors" +) + +type ledgerRange struct { + from uint32 + to uint32 +} + +type rangeResult struct { + err error + requestedRange ledgerRange +} + +type ParallelSystems struct { + workerCount uint + reingestJobQueue chan ledgerRange + shutdown chan struct{} + wait sync.WaitGroup + reingestJobResult chan rangeResult +} + +func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { + return newParallelSystems(config, workerCount, NewSystem) +} + +// private version of NewParallel systems, allowing to inject a mock system +func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { + if workerCount < 1 { + return nil, errors.New("workerCount must be > 0") + } + + result := ParallelSystems{ + workerCount: workerCount, + reingestJobQueue: make(chan ledgerRange), + shutdown: make(chan struct{}), + reingestJobResult: make(chan rangeResult), + } + for i := uint(0); i < workerCount; i++ { + s, err := systemFactory(config) + if err != nil { + result.Shutdown() + return nil, errors.Wrap(err, "cannot create new system") + } + result.wait.Add(1) + go result.work(s) + } + return &result, nil +} + +func (ps *ParallelSystems) work(s System) { + defer func() { + s.Shutdown() + ps.wait.Done() + }() + for { + select { + case <-ps.shutdown: + return + case reingestRange := <-ps.reingestJobQueue: + err := s.ReingestRange(reingestRange.from, reingestRange.to, false) + select { + case <-ps.shutdown: + return + case ps.reingestJobResult <- rangeResult{err, reingestRange}: + } + } + } +} + +const ( + historyCheckpointLedgerInterval = 64 + minBatchSize = historyCheckpointLedgerInterval +) + +func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { + batchSize := batchSizeSuggestion + if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { + // let's try to make use of all the workers + batchSize = rangeSize / uint32(workerCount) + } + // Use a minimum batch size to make it worth it in terms of overhead + if batchSize < minBatchSize { + batchSize = minBatchSize + } + + // Also, round the batch size to the closest, lower or equal 64 multiple + return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval +} + +func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error { + var ( + wait sync.WaitGroup + stop = make(chan struct{}) + batchSize = calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) + totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger) / float64(batchSize))) + ) + + wait.Add(1) + defer func() { + close(stop) + wait.Wait() + }() + + // queue subranges + go func() { + defer wait.Done() + for subRangeFrom := fromLedger; subRangeFrom < toLedger; { + // job queuing + subRangeTo := subRangeFrom + batchSize + if subRangeTo > toLedger { + subRangeTo = toLedger + } + select { + case <-stop: + return + case ps.reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: + } + subRangeFrom = subRangeTo + } + }() + + // collect subrange results + for i := uint32(0); i < totalJobs; i++ { + // collect results + select { + case <-ps.shutdown: + return errors.New("aborted") + case subRangeResult := <-ps.reingestJobResult: + if subRangeResult.err != nil { + // TODO: give account of what ledgers were correctly reingested? + errMsg := fmt.Sprintf("in subrange %d to %d", + subRangeResult.requestedRange.from, subRangeResult.requestedRange.to) + return errors.Wrap(subRangeResult.err, errMsg) + } + } + + } + + return nil +} + +func (ps *ParallelSystems) Shutdown() { + if ps.shutdown != nil { + close(ps.shutdown) + ps.wait.Wait() + close(ps.reingestJobQueue) + close(ps.reingestJobResult) + } +} diff --git a/services/horizon/internal/expingest/parallel_test.go b/services/horizon/internal/expingest/parallel_test.go new file mode 100644 index 0000000000..ad6165b821 --- /dev/null +++ b/services/horizon/internal/expingest/parallel_test.go @@ -0,0 +1,102 @@ +package expingest + +import ( + "math/rand" + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/stellar/go/support/errors" +) + +func TestCalculateParallelLedgerBatchSize(t *testing.T) { + assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) +} + +type sorteableRanges []ledgerRange + +func (s sorteableRanges) Len() int { return len(s) } +func (s sorteableRanges) Less(i, j int) bool { return s[i].from < s[j].from } +func (s sorteableRanges) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func TestParallelReingestRange(t *testing.T) { + config := Config{} + var ( + rangesCalled sorteableRanges + shutdowns int + m sync.Mutex + ) + factory := func(c Config) (System, error) { + result := &mockSystem{} + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Run( + func(args mock.Arguments) { + r := ledgerRange{ + from: args.Get(0).(uint32), + to: args.Get(1).(uint32), + } + m.Lock() + defer m.Unlock() + rangesCalled = append(rangesCalled, r) + // simulate call + time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) + }).Return(error(nil)) + result.On("Shutdown").Run(func(mock.Arguments) { + m.Lock() + defer m.Unlock() + shutdowns++ + }) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.NoError(t, err) + + sort.Sort(rangesCalled) + expected := sorteableRanges{ + {from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280}, + {from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050}, + } + assert.Equal(t, expected, rangesCalled) + system.Shutdown() + assert.Equal(t, 3, shutdowns) + +} + +func TestParallelReingestRangeError(t *testing.T) { + config := Config{} + var ( + shutdowns int + m sync.Mutex + ) + factory := func(c Config) (System, error) { + result := &mockSystem{} + // Fail on the second range + result.On("ReingestRange", uint32(1536), uint32(1792), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo")) + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Return(error(nil)) + result.On("Shutdown").Run(func(mock.Arguments) { + m.Lock() + defer m.Unlock() + shutdowns++ + }) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.Error(t, err) + assert.Equal(t, "in subrange 1536 to 1792: failed because of foo", err.Error()) + + system.Shutdown() + assert.Equal(t, 3, shutdowns) + +} From 295a506f48432c5eac3f5ab9001dea608cdc3ad3 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 24 Jun 2020 15:11:54 +0200 Subject: [PATCH 07/23] Revert "Remove job size (the larger, the faster it is)" This reverts commit 2cd8e4ee41623150b3a631c2f3df766df1428bd2. --- services/horizon/cmd/db.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index d144b28019..d3a0d04ac2 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -126,6 +126,7 @@ var dbReingestCmd = &cobra.Command{ var ( reingestForce bool parallelWorkers uint + parallelJobSize uint32 ) var reingestRangeCmdOpts = []*support.ConfigOption{ { @@ -145,6 +146,14 @@ var reingestRangeCmdOpts = []*support.ConfigOption{ FlagDefault: uint(1), Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", }, + { + Name: "parallel-job-size", + ConfigKey: ¶llelJobSize, + OptType: types.Uint32, + Required: false, + FlagDefault: uint32(256), + Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", + }, } var dbReingestRangeCmd = &cobra.Command{ @@ -218,7 +227,7 @@ var dbReingestRangeCmd = &cobra.Command{ err = system.ReingestRange( argsInt32[0], argsInt32[1], - argsInt32[1]-argsInt32[0], + parallelJobSize, ) } From e9a5f968908b1d126c10951ee73cc055ff31854a Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 24 Jun 2020 15:13:40 +0200 Subject: [PATCH 08/23] Use a large default job size --- services/horizon/cmd/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index d3a0d04ac2..4275029c7f 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -151,7 +151,7 @@ var reingestRangeCmdOpts = []*support.ConfigOption{ ConfigKey: ¶llelJobSize, OptType: types.Uint32, Required: false, - FlagDefault: uint32(256), + FlagDefault: uint32(20096), Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", }, } From f4c8d4b2af4c4041004c587337792de201c28dd8 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 24 Jun 2020 15:16:07 +0200 Subject: [PATCH 09/23] Forgot to fix the off-by-one issue --- services/horizon/internal/expingest/parallel.go | 7 ++++--- services/horizon/internal/expingest/parallel_test.go | 8 ++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index 7630addcd6..620d98a77d 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -99,7 +99,8 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS wait sync.WaitGroup stop = make(chan struct{}) batchSize = calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) - totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger) / float64(batchSize))) + // we add one because both toLedger and fromLedger are included in the rabge + totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger+1) / float64(batchSize))) ) wait.Add(1) @@ -113,7 +114,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS defer wait.Done() for subRangeFrom := fromLedger; subRangeFrom < toLedger; { // job queuing - subRangeTo := subRangeFrom + batchSize + subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch if subRangeTo > toLedger { subRangeTo = toLedger } @@ -122,7 +123,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS return case ps.reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: } - subRangeFrom = subRangeTo + subRangeFrom = subRangeTo + 1 } }() diff --git a/services/horizon/internal/expingest/parallel_test.go b/services/horizon/internal/expingest/parallel_test.go index ad6165b821..676b6b0ce1 100644 --- a/services/horizon/internal/expingest/parallel_test.go +++ b/services/horizon/internal/expingest/parallel_test.go @@ -63,8 +63,8 @@ func TestParallelReingestRange(t *testing.T) { sort.Sort(rangesCalled) expected := sorteableRanges{ - {from: 0, to: 256}, {from: 256, to: 512}, {from: 512, to: 768}, {from: 768, to: 1024}, {from: 1024, to: 1280}, - {from: 1280, to: 1536}, {from: 1536, to: 1792}, {from: 1792, to: 2048}, {from: 2048, to: 2050}, + {from: 0, to: 255}, {from: 256, to: 511}, {from: 512, to: 767}, {from: 768, to: 1023}, {from: 1024, to: 1279}, + {from: 1280, to: 1535}, {from: 1536, to: 1791}, {from: 1792, to: 2047}, {from: 2048, to: 2050}, } assert.Equal(t, expected, rangesCalled) system.Shutdown() @@ -81,7 +81,7 @@ func TestParallelReingestRangeError(t *testing.T) { factory := func(c Config) (System, error) { result := &mockSystem{} // Fail on the second range - result.On("ReingestRange", uint32(1536), uint32(1792), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo")) + result.On("ReingestRange", uint32(1536), uint32(1791), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo")) result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Return(error(nil)) result.On("Shutdown").Run(func(mock.Arguments) { m.Lock() @@ -94,7 +94,7 @@ func TestParallelReingestRangeError(t *testing.T) { assert.NoError(t, err) err = system.ReingestRange(0, 2050, 258) assert.Error(t, err) - assert.Equal(t, "in subrange 1536 to 1792: failed because of foo", err.Error()) + assert.Equal(t, "in subrange 1536 to 1791: failed because of foo", err.Error()) system.Shutdown() assert.Equal(t, 3, shutdowns) From 71142d1ac469db1c08af39663bd911afcf872036 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 24 Jun 2020 15:58:17 +0200 Subject: [PATCH 10/23] Wait for pending jobs --- .../horizon/internal/expingest/parallel.go | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index 620d98a77d..fa6d1fe5c2 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -96,11 +96,14 @@ func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error { var ( + firstErr error wait sync.WaitGroup stop = make(chan struct{}) batchSize = calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) // we add one because both toLedger and fromLedger are included in the rabge - totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger+1) / float64(batchSize))) + totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger+1) / float64(batchSize))) + pendingJobs = 0 + pendingJobsMutex sync.Mutex ) wait.Add(1) @@ -122,29 +125,39 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS case <-stop: return case ps.reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: + pendingJobsMutex.Lock() + pendingJobs++ + pendingJobsMutex.Unlock() } subRangeFrom = subRangeTo + 1 } }() // collect subrange results +collect: for i := uint32(0); i < totalJobs; i++ { - // collect results select { case <-ps.shutdown: return errors.New("aborted") case subRangeResult := <-ps.reingestJobResult: - if subRangeResult.err != nil { + pendingJobsMutex.Lock() + pendingJobs-- + if firstErr != nil { + if pendingJobs == 0 { + break collect + } + } else if subRangeResult.err != nil { // TODO: give account of what ledgers were correctly reingested? errMsg := fmt.Sprintf("in subrange %d to %d", subRangeResult.requestedRange.from, subRangeResult.requestedRange.to) - return errors.Wrap(subRangeResult.err, errMsg) + firstErr = errors.Wrap(subRangeResult.err, errMsg) } + pendingJobsMutex.Unlock() } } - return nil + return firstErr } func (ps *ParallelSystems) Shutdown() { From 347c20ec22cd26682633ba1b26dc1a813458235f Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Wed, 24 Jun 2020 20:26:52 +0200 Subject: [PATCH 11/23] Update --- .../horizon/internal/expingest/parallel.go | 174 +++++++++--------- 1 file changed, 83 insertions(+), 91 deletions(-) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index fa6d1fe5c2..d465cea48e 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -2,83 +2,76 @@ package expingest import ( "fmt" - "math" "sync" "github.com/stellar/go/support/errors" ) +const ( + historyCheckpointLedgerInterval = 64 + minBatchSize = historyCheckpointLedgerInterval +) + type ledgerRange struct { from uint32 to uint32 } -type rangeResult struct { - err error - requestedRange ledgerRange +type rangeError struct { + err error + ledgerRange ledgerRange +} + +func (e rangeError) Error() string { + return fmt.Sprintf("error when processing [%d, %d] range: %s", e.ledgerRange.from, e.ledgerRange.to, e.err) } type ParallelSystems struct { - workerCount uint - reingestJobQueue chan ledgerRange - shutdown chan struct{} - wait sync.WaitGroup - reingestJobResult chan rangeResult + config Config + workerCount uint + shutdownOnce sync.Once + shutdown chan struct{} } func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { - return newParallelSystems(config, workerCount, NewSystem) + // Leaving this because used in tests, will update after a code review. + return newParallelSystems(config, workerCount) } // private version of NewParallel systems, allowing to inject a mock system -func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { +func newParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { if workerCount < 1 { return nil, errors.New("workerCount must be > 0") } - result := ParallelSystems{ - workerCount: workerCount, - reingestJobQueue: make(chan ledgerRange), - shutdown: make(chan struct{}), - reingestJobResult: make(chan rangeResult), - } - for i := uint(0); i < workerCount; i++ { - s, err := systemFactory(config) - if err != nil { - result.Shutdown() - return nil, errors.Wrap(err, "cannot create new system") - } - result.wait.Add(1) - go result.work(s) - } - return &result, nil + return &ParallelSystems{ + config: config, + workerCount: workerCount, + shutdown: make(chan struct{}), + }, nil } -func (ps *ParallelSystems) work(s System) { - defer func() { - s.Shutdown() - ps.wait.Done() - }() +func (ps *ParallelSystems) reingestWorker(reingestJobQueue <-chan ledgerRange) error { + s, err := NewSystem(ps.config) + if err != nil { + return errors.Wrap(err, "error creating new system") + } for { select { case <-ps.shutdown: - return - case reingestRange := <-ps.reingestJobQueue: + return nil + case reingestRange := <-reingestJobQueue: err := s.ReingestRange(reingestRange.from, reingestRange.to, false) - select { - case <-ps.shutdown: - return - case ps.reingestJobResult <- rangeResult{err, reingestRange}: + if err != nil { + return rangeError{ + err: err, + ledgerRange: reingestRange, + } } } } } -const ( - historyCheckpointLedgerInterval = 64 - minBatchSize = historyCheckpointLedgerInterval -) - func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { batchSize := batchSizeSuggestion if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { @@ -96,25 +89,24 @@ func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error { var ( - firstErr error - wait sync.WaitGroup - stop = make(chan struct{}) - batchSize = calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) - // we add one because both toLedger and fromLedger are included in the rabge - totalJobs = uint32(math.Ceil(float64(toLedger-fromLedger+1) / float64(batchSize))) - pendingJobs = 0 - pendingJobsMutex sync.Mutex + batchSize = calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) + reingestJobQueue = make(chan ledgerRange) + wg sync.WaitGroup + + lowestRangeErrMutex sync.Mutex + // lowestRangeErr is an error of the failed range with the lowest starting ledger sequence that is used to tell + // the user which range to reingest in case of errors. We use that fact that System.ReingestRange is blocking, + // jobs are sent to a queue (unbuffered channel) in sequence and there is a WaitGroup waiting for all the workers + // to exit. + // Because of this when we reach `wg.Wait()` all jobs previously sent to a channel are processed (either success + // or failure). In case of a failure we save the range with the smallest sequence number because this is where + // the user needs to start again to prevent the gaps. + lowestRangeErr *rangeError ) - wait.Add(1) - defer func() { - close(stop) - wait.Wait() - }() - - // queue subranges + wg.Add(1) go func() { - defer wait.Done() + defer wg.Done() for subRangeFrom := fromLedger; subRangeFrom < toLedger; { // job queuing subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch @@ -122,49 +114,49 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS subRangeTo = toLedger } select { - case <-stop: + case <-ps.shutdown: return - case ps.reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: - pendingJobsMutex.Lock() - pendingJobs++ - pendingJobsMutex.Unlock() + case reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: } subRangeFrom = subRangeTo + 1 } }() - // collect subrange results -collect: - for i := uint32(0); i < totalJobs; i++ { - select { - case <-ps.shutdown: - return errors.New("aborted") - case subRangeResult := <-ps.reingestJobResult: - pendingJobsMutex.Lock() - pendingJobs-- - if firstErr != nil { - if pendingJobs == 0 { - break collect + for i := uint(0); i < ps.workerCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := ps.reingestWorker(reingestJobQueue) + if err != nil { + log.WithError(err).Error("error in reingest worker") + if rangeErr, ok := err.(rangeError); ok { + lowestRangeErrMutex.Lock() + if lowestRangeErr == nil || (lowestRangeErr != nil && lowestRangeErr.ledgerRange.from > rangeErr.ledgerRange.from) { + lowestRangeErr = &rangeErr + } + lowestRangeErrMutex.Unlock() } - } else if subRangeResult.err != nil { - // TODO: give account of what ledgers were correctly reingested? - errMsg := fmt.Sprintf("in subrange %d to %d", - subRangeResult.requestedRange.from, subRangeResult.requestedRange.to) - firstErr = errors.Wrap(subRangeResult.err, errMsg) + ps.Shutdown() + return } - pendingJobsMutex.Unlock() - } + }() + } + wg.Wait() + close(reingestJobQueue) + + if lowestRangeErr != nil { + return errors.Errorf("one or more jobs failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.from, toLedger) } + return nil +} + +func (ps *ParallelSystems) doShutdown() { + close(ps.shutdown) - return firstErr } -func (ps *ParallelSystems) Shutdown() { - if ps.shutdown != nil { - close(ps.shutdown) - ps.wait.Wait() - close(ps.reingestJobQueue) - close(ps.reingestJobResult) - } +func (ps *ParallelSystems) Shutdown() error { + ps.shutdownOnce.Do(ps.doShutdown) + return nil } From 8358a9f9f8a7f05417c852c44e1d506f71fc5052 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 25 Jun 2020 02:14:52 +0200 Subject: [PATCH 12/23] Review updates & tests --- services/horizon/cmd/db.go | 1 - .../horizon/internal/expingest/parallel.go | 84 +++++++++---------- .../internal/expingest/parallel_test.go | 22 +---- 3 files changed, 41 insertions(+), 66 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 4275029c7f..3949c6c9a8 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -222,7 +222,6 @@ var dbReingestRangeCmd = &cobra.Command{ if err != nil { log.Fatal(systemErr) } - defer system.Shutdown() err = system.ReingestRange( argsInt32[0], diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index d465cea48e..2a4888157b 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -27,40 +27,43 @@ func (e rangeError) Error() string { } type ParallelSystems struct { - config Config - workerCount uint - shutdownOnce sync.Once - shutdown chan struct{} + config Config + workerCount uint + systemFactory func(Config) (System, error) } func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { // Leaving this because used in tests, will update after a code review. - return newParallelSystems(config, workerCount) + return newParallelSystems(config, workerCount, NewSystem) } // private version of NewParallel systems, allowing to inject a mock system -func newParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { +func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { if workerCount < 1 { return nil, errors.New("workerCount must be > 0") } return &ParallelSystems{ - config: config, - workerCount: workerCount, - shutdown: make(chan struct{}), + config: config, + workerCount: workerCount, + systemFactory: systemFactory, }, nil } -func (ps *ParallelSystems) reingestWorker(reingestJobQueue <-chan ledgerRange) error { - s, err := NewSystem(ps.config) +func (ps *ParallelSystems) runReingestWorker(stop <-chan struct{}, reingestJobQueue <-chan ledgerRange) error { + s, err := ps.systemFactory(ps.config) if err != nil { return errors.Wrap(err, "error creating new system") } for { select { - case <-ps.shutdown: + case <-stop: return nil - case reingestRange := <-reingestJobQueue: + case reingestRange, more := <-reingestJobQueue: + if !more { + // Channel closed - no more jobs + return nil + } err := s.ReingestRange(reingestRange.from, reingestRange.to, false) if err != nil { return rangeError{ @@ -93,6 +96,11 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS reingestJobQueue = make(chan ledgerRange) wg sync.WaitGroup + // stopOnce is used to close the stop channel once: closing a closed channel panics and it can happen in case + // of errors in multiple ranges. + stopOnce sync.Once + stop = make(chan struct{}) + lowestRangeErrMutex sync.Mutex // lowestRangeErr is an error of the failed range with the lowest starting ledger sequence that is used to tell // the user which range to reingest in case of errors. We use that fact that System.ReingestRange is blocking, @@ -104,29 +112,11 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS lowestRangeErr *rangeError ) - wg.Add(1) - go func() { - defer wg.Done() - for subRangeFrom := fromLedger; subRangeFrom < toLedger; { - // job queuing - subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch - if subRangeTo > toLedger { - subRangeTo = toLedger - } - select { - case <-ps.shutdown: - return - case reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: - } - subRangeFrom = subRangeTo + 1 - } - }() - for i := uint(0); i < ps.workerCount; i++ { wg.Add(1) go func() { defer wg.Done() - err := ps.reingestWorker(reingestJobQueue) + err := ps.runReingestWorker(stop, reingestJobQueue) if err != nil { log.WithError(err).Error("error in reingest worker") if rangeErr, ok := err.(rangeError); ok { @@ -136,27 +126,33 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS } lowestRangeErrMutex.Unlock() } - ps.Shutdown() + stopOnce.Do(func() { + close(stop) + }) return } }() } - wg.Wait() + for subRangeFrom := fromLedger; subRangeFrom < toLedger; { + // job queuing + subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch + if subRangeTo > toLedger { + subRangeTo = toLedger + } + select { + case <-stop: + break + case reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: + } + subRangeFrom = subRangeTo + 1 + } + close(reingestJobQueue) + wg.Wait() if lowestRangeErr != nil { return errors.Errorf("one or more jobs failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.from, toLedger) } return nil } - -func (ps *ParallelSystems) doShutdown() { - close(ps.shutdown) - -} - -func (ps *ParallelSystems) Shutdown() error { - ps.shutdownOnce.Do(ps.doShutdown) - return nil -} diff --git a/services/horizon/internal/expingest/parallel_test.go b/services/horizon/internal/expingest/parallel_test.go index 676b6b0ce1..a878490b20 100644 --- a/services/horizon/internal/expingest/parallel_test.go +++ b/services/horizon/internal/expingest/parallel_test.go @@ -32,7 +32,6 @@ func TestParallelReingestRange(t *testing.T) { config := Config{} var ( rangesCalled sorteableRanges - shutdowns int m sync.Mutex ) factory := func(c Config) (System, error) { @@ -49,11 +48,6 @@ func TestParallelReingestRange(t *testing.T) { // simulate call time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) }).Return(error(nil)) - result.On("Shutdown").Run(func(mock.Arguments) { - m.Lock() - defer m.Unlock() - shutdowns++ - }) return result, nil } system, err := newParallelSystems(config, 3, factory) @@ -67,36 +61,22 @@ func TestParallelReingestRange(t *testing.T) { {from: 1280, to: 1535}, {from: 1536, to: 1791}, {from: 1792, to: 2047}, {from: 2048, to: 2050}, } assert.Equal(t, expected, rangesCalled) - system.Shutdown() - assert.Equal(t, 3, shutdowns) } func TestParallelReingestRangeError(t *testing.T) { config := Config{} - var ( - shutdowns int - m sync.Mutex - ) factory := func(c Config) (System, error) { result := &mockSystem{} // Fail on the second range result.On("ReingestRange", uint32(1536), uint32(1791), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo")) result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Return(error(nil)) - result.On("Shutdown").Run(func(mock.Arguments) { - m.Lock() - defer m.Unlock() - shutdowns++ - }) return result, nil } system, err := newParallelSystems(config, 3, factory) assert.NoError(t, err) err = system.ReingestRange(0, 2050, 258) assert.Error(t, err) - assert.Equal(t, "in subrange 1536 to 1791: failed because of foo", err.Error()) - - system.Shutdown() - assert.Equal(t, 3, shutdowns) + assert.Equal(t, "one or more jobs failed, recommended restart range: [1536, 2050]", err.Error()) } From fec4a16463ba9501d93600e43e483ca9fddf7c8c Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 25 Jun 2020 02:19:31 +0200 Subject: [PATCH 13/23] Fix static check --- services/horizon/internal/expingest/parallel.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index 2a4888157b..60515250b6 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -134,6 +134,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS }() } +rangeQueueLoop: for subRangeFrom := fromLedger; subRangeFrom < toLedger; { // job queuing subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch @@ -142,7 +143,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS } select { case <-stop: - break + break rangeQueueLoop case reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: } subRangeFrom = subRangeTo + 1 From d60b97f9d8b214f2a135de616fa232dc689902ed Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 25 Jun 2020 11:02:19 +0200 Subject: [PATCH 14/23] Fix channel leak --- services/horizon/internal/expingest/parallel.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index 60515250b6..03ea297845 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -59,11 +59,7 @@ func (ps *ParallelSystems) runReingestWorker(stop <-chan struct{}, reingestJobQu select { case <-stop: return nil - case reingestRange, more := <-reingestJobQueue: - if !more { - // Channel closed - no more jobs - return nil - } + case reingestRange := <-reingestJobQueue: err := s.ReingestRange(reingestRange.from, reingestRange.to, false) if err != nil { return rangeError{ @@ -149,8 +145,11 @@ rangeQueueLoop: subRangeFrom = subRangeTo + 1 } - close(reingestJobQueue) + stopOnce.Do(func() { + close(stop) + }) wg.Wait() + close(reingestJobQueue) if lowestRangeErr != nil { return errors.Errorf("one or more jobs failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.from, toLedger) From a59a4712edca3d280c9dd9a1b1a7b74a0e8137aa Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 25 Jun 2020 11:22:33 +0200 Subject: [PATCH 15/23] Tweak and simplify error management --- .../horizon/internal/expingest/parallel.go | 28 +++++++++---------- .../internal/expingest/parallel_test.go | 2 +- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index 03ea297845..e76502a0bf 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -50,15 +50,11 @@ func newParallelSystems(config Config, workerCount uint, systemFactory func(Conf }, nil } -func (ps *ParallelSystems) runReingestWorker(stop <-chan struct{}, reingestJobQueue <-chan ledgerRange) error { - s, err := ps.systemFactory(ps.config) - if err != nil { - return errors.Wrap(err, "error creating new system") - } +func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, reingestJobQueue <-chan ledgerRange) rangeError { for { select { case <-stop: - return nil + return rangeError{} case reingestRange := <-reingestJobQueue: err := s.ReingestRange(reingestRange.from, reingestRange.to, false) if err != nil { @@ -110,18 +106,20 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS for i := uint(0); i < ps.workerCount; i++ { wg.Add(1) + s, err := ps.systemFactory(ps.config) + if err != nil { + return errors.Wrap(err, "error creating new system") + } go func() { defer wg.Done() - err := ps.runReingestWorker(stop, reingestJobQueue) - if err != nil { + rangeErr := ps.runReingestWorker(s, stop, reingestJobQueue) + if rangeErr.err != nil { log.WithError(err).Error("error in reingest worker") - if rangeErr, ok := err.(rangeError); ok { - lowestRangeErrMutex.Lock() - if lowestRangeErr == nil || (lowestRangeErr != nil && lowestRangeErr.ledgerRange.from > rangeErr.ledgerRange.from) { - lowestRangeErr = &rangeErr - } - lowestRangeErrMutex.Unlock() + lowestRangeErrMutex.Lock() + if lowestRangeErr == nil || lowestRangeErr.ledgerRange.from > rangeErr.ledgerRange.from { + lowestRangeErr = &rangeErr } + lowestRangeErrMutex.Unlock() stopOnce.Do(func() { close(stop) }) @@ -152,7 +150,7 @@ rangeQueueLoop: close(reingestJobQueue) if lowestRangeErr != nil { - return errors.Errorf("one or more jobs failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.from, toLedger) + return errors.Wrapf(lowestRangeErr, "job failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.from, toLedger) } return nil } diff --git a/services/horizon/internal/expingest/parallel_test.go b/services/horizon/internal/expingest/parallel_test.go index a878490b20..31fe00bddf 100644 --- a/services/horizon/internal/expingest/parallel_test.go +++ b/services/horizon/internal/expingest/parallel_test.go @@ -77,6 +77,6 @@ func TestParallelReingestRangeError(t *testing.T) { assert.NoError(t, err) err = system.ReingestRange(0, 2050, 258) assert.Error(t, err) - assert.Equal(t, "one or more jobs failed, recommended restart range: [1536, 2050]", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [1536, 2050]: error when processing [1536, 1791] range: failed because of foo", err.Error()) } From 5447f9ab2bc80bc208923b60632c810e214c8f24 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 25 Jun 2020 11:39:28 +0200 Subject: [PATCH 16/23] Add extra test for error ordering --- .../internal/expingest/parallel_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/services/horizon/internal/expingest/parallel_test.go b/services/horizon/internal/expingest/parallel_test.go index 31fe00bddf..4c46bc6259 100644 --- a/services/horizon/internal/expingest/parallel_test.go +++ b/services/horizon/internal/expingest/parallel_test.go @@ -80,3 +80,31 @@ func TestParallelReingestRangeError(t *testing.T) { assert.Equal(t, "job failed, recommended restart range: [1536, 2050]: error when processing [1536, 1791] range: failed because of foo", err.Error()) } + +func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { + config := Config{} + var wg sync.WaitGroup + wg.Add(1) + factory := func(c Config) (System, error) { + result := &mockSystem{} + // Fail on the second range + result.On("ReingestRange", uint32(1024), uint32(1279), mock.AnythingOfType("bool")).Run(func(mock.Arguments) { + // Wait for a more recent range to error + wg.Wait() + // This sleep should help making sure the result of this range is processed later than the earlier ones + // (there are no guarantees without instrumenting ReingestRange(), but that's too complicated) + time.Sleep(50 * time.Millisecond) + }).Return(errors.New("failed because of foo")) + result.On("ReingestRange", uint32(1536), uint32(1791), mock.AnythingOfType("bool")).Run(func(mock.Arguments) { + wg.Done() + }).Return(errors.New("failed because of bar")) + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Return(error(nil)) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.Error(t, err) + assert.Equal(t, "job failed, recommended restart range: [1024, 2050]: error when processing [1024, 1279] range: failed because of foo", err.Error()) + +} From 43a04ee95ffa3db1e14c4fcc3b364c6397b89c8a Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 25 Jun 2020 11:39:47 +0200 Subject: [PATCH 17/23] Use correct error in log --- services/horizon/internal/expingest/parallel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index e76502a0bf..619c9f252e 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -114,7 +114,7 @@ func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeS defer wg.Done() rangeErr := ps.runReingestWorker(s, stop, reingestJobQueue) if rangeErr.err != nil { - log.WithError(err).Error("error in reingest worker") + log.WithError(rangeErr).Error("error in reingest worker") lowestRangeErrMutex.Lock() if lowestRangeErr == nil || lowestRangeErr.ledgerRange.from > rangeErr.ledgerRange.from { lowestRangeErr = &rangeErr From b9f93d8bee9fd78bc8107c7557c3261e6f71e337 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 25 Jun 2020 11:44:32 +0200 Subject: [PATCH 18/23] Tweak CHANGELOG --- services/horizon/CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 254244af33..3f2c06dd3b 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -5,8 +5,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).x ## Unreleased -* Add `--parallel-workers` to `horizon db reingest range`. `--parallel --workers` will parallelize reingestion using the supplied number of workers. +* Add `--parallel-workers` and `--parallel-job-size` to `horizon db reingest range`. `--parallel-workers` will parallelize reingestion using the supplied number of workers. * Add transaction set operation count to `history_ledger`([#2690](https://github.com/stellar/go/pull/2690)). Extend ingestion to store the total number of operations in the transaction set and expose it in the ledger resource via `tx_set_operation_count`. This feature allow you to assess the used capacity of a transaction set. * Remove `--ingest-failed-transactions` flag. From now on Horizon will always ingest failed transactions. WARNING: if your application is using Horizon DB directly (not recommended!) remember that now it will also contain failed txs. ([#2702](https://github.com/stellar/go/pull/2702)). From 4049da82c68f8f7f0b2e79e6da7c232312c51e9b Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 25 Jun 2020 11:49:09 +0200 Subject: [PATCH 19/23] Minor tweak --- services/horizon/internal/expingest/parallel_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/expingest/parallel_test.go b/services/horizon/internal/expingest/parallel_test.go index 4c46bc6259..83b0a340ad 100644 --- a/services/horizon/internal/expingest/parallel_test.go +++ b/services/horizon/internal/expingest/parallel_test.go @@ -87,11 +87,11 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { wg.Add(1) factory := func(c Config) (System, error) { result := &mockSystem{} - // Fail on the second range + // Fail on an lower subrange after the first error result.On("ReingestRange", uint32(1024), uint32(1279), mock.AnythingOfType("bool")).Run(func(mock.Arguments) { // Wait for a more recent range to error wg.Wait() - // This sleep should help making sure the result of this range is processed later than the earlier ones + // This sleep should help making sure the result of this range is processed later than the one below // (there are no guarantees without instrumenting ReingestRange(), but that's too complicated) time.Sleep(50 * time.Millisecond) }).Return(errors.New("failed because of foo")) From e0ec2df1b5b0651b7752d4a83b21c987b2ba54f5 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 25 Jun 2020 14:36:30 +0200 Subject: [PATCH 20/23] Use a 100K default batch size as recommended by Graydon --- services/horizon/cmd/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 3949c6c9a8..4f2a3e6324 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -151,7 +151,7 @@ var reingestRangeCmdOpts = []*support.ConfigOption{ ConfigKey: ¶llelJobSize, OptType: types.Uint32, Required: false, - FlagDefault: uint32(20096), + FlagDefault: uint32(100000), Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", }, } From 1705ad1dc43dd7926d949aa7fbefadc5c5718ee6 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 25 Jun 2020 21:21:17 +0200 Subject: [PATCH 21/23] Add simple retries to ReingestRange() --- services/horizon/cmd/db.go | 34 +++++++++++---- services/horizon/internal/expingest/main.go | 46 ++++++++++++++------- 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 4f2a3e6324..0ecdf6a420 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -124,9 +124,11 @@ var dbReingestCmd = &cobra.Command{ } var ( - reingestForce bool - parallelWorkers uint - parallelJobSize uint32 + reingestForce bool + parallelWorkers uint + parallelJobSize uint32 + retries uint + retryBackoffSeconds uint ) var reingestRangeCmdOpts = []*support.ConfigOption{ { @@ -154,6 +156,22 @@ var reingestRangeCmdOpts = []*support.ConfigOption{ FlagDefault: uint32(100000), Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", }, + { + Name: "retries", + ConfigKey: &retries, + OptType: types.Uint, + Required: false, + FlagDefault: uint(0), + Usage: "[optional] number of reingest retries", + }, + { + Name: "retry-backoff-seconds", + ConfigKey: &retryBackoffSeconds, + OptType: types.Uint, + Required: false, + FlagDefault: uint(5), + Usage: "[optional] backoff seconds between reingest retries", + }, } var dbReingestRangeCmd = &cobra.Command{ @@ -197,10 +215,12 @@ var dbReingestRangeCmd = &cobra.Command{ } ingestConfig := expingest.Config{ - CoreSession: coreSession, - NetworkPassphrase: config.NetworkPassphrase, - HistorySession: horizonSession, - HistoryArchiveURL: config.HistoryArchiveURLs[0], + CoreSession: coreSession, + NetworkPassphrase: config.NetworkPassphrase, + HistorySession: horizonSession, + HistoryArchiveURL: config.HistoryArchiveURLs[0], + MaxReingestRetries: int(retries), + ReingesRetryBackoffSeconds: int(retryBackoffSeconds), } if config.EnableCaptiveCoreIngestion { ingestConfig.StellarCorePath = config.StellarCoreBinaryPath diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 6fbe9e2b7c..695ebada6d 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -66,7 +66,9 @@ type Config struct { // MaxStreamRetries determines how many times the reader will retry when encountering // errors while streaming xdr bucket entries from the history archive. // Set MaxStreamRetries to 0 if there should be no retry attempts - MaxStreamRetries int + MaxStreamRetries int + MaxReingestRetries int + ReingesRetryBackoffSeconds int } const ( @@ -119,8 +121,10 @@ type system struct { stellarCoreClient stellarCoreClient - maxStreamRetries int - wg sync.WaitGroup + maxStreamRetries int + maxReingestRetries int + reingesRetryBackoffSeconds int + wg sync.WaitGroup // stateVerificationRunning is true when verification routine is currently // running. @@ -169,14 +173,16 @@ func NewSystem(config Config) (System, error) { historyAdapter := adapters.MakeHistoryArchiveAdapter(archive) system := &system{ - ctx: ctx, - cancel: cancel, - historyAdapter: historyAdapter, - ledgerBackend: ledgerBackend, - config: config, - historyQ: historyQ, - disableStateVerification: config.DisableStateVerification, - maxStreamRetries: config.MaxStreamRetries, + ctx: ctx, + cancel: cancel, + historyAdapter: historyAdapter, + ledgerBackend: ledgerBackend, + config: config, + historyQ: historyQ, + disableStateVerification: config.DisableStateVerification, + maxStreamRetries: config.MaxStreamRetries, + maxReingestRetries: config.MaxReingestRetries, + reingesRetryBackoffSeconds: config.ReingesRetryBackoffSeconds, stellarCoreClient: &stellarcore.Client{ URL: config.StellarCoreURL, }, @@ -265,11 +271,19 @@ func (s *system) VerifyRange(fromLedger, toLedger uint32, verifyState bool) erro // ReingestRange runs the ingestion pipeline on the range of ledgers ingesting // history data only. func (s *system) ReingestRange(fromLedger, toLedger uint32, force bool) error { - return s.runStateMachine(reingestHistoryRangeState{ - fromLedger: fromLedger, - toLedger: toLedger, - force: force, - }) + run := func() error { + return s.runStateMachine(reingestHistoryRangeState{ + fromLedger: fromLedger, + toLedger: toLedger, + force: force, + }) + } + err := run() + for retry := 0; err != nil && retry < s.maxReingestRetries; retry++ { + time.Sleep(time.Second * time.Duration(s.reingesRetryBackoffSeconds)) + err = run() + } + return err } func (s *system) runStateMachine(cur stateMachineNode) error { From 5c89c05e200bf570f9b36b65fc4ec6402e423905 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Fri, 26 Jun 2020 10:24:26 +0200 Subject: [PATCH 22/23] Add successful range logging --- services/horizon/internal/expingest/parallel.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go index 619c9f252e..2e48d3e8f2 100644 --- a/services/horizon/internal/expingest/parallel.go +++ b/services/horizon/internal/expingest/parallel.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/stellar/go/support/errors" + logpkg "github.com/stellar/go/support/log" ) const ( @@ -63,6 +64,7 @@ func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, rei ledgerRange: reingestRange, } } + log.WithFields(logpkg.F{"from": reingestRange.from, "to": reingestRange.to}).Info("successfully reingested range") } } } From 526d03d81ed9e0a3d8a07eedcdc2d54cfb0da787 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 29 Jun 2020 15:25:43 +0200 Subject: [PATCH 23/23] Print error when retrying --- services/horizon/internal/expingest/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 695ebada6d..b673d41c47 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -280,6 +280,7 @@ func (s *system) ReingestRange(fromLedger, toLedger uint32, force bool) error { } err := run() for retry := 0; err != nil && retry < s.maxReingestRetries; retry++ { + log.Warnf("reingest range [%d, %d] failed (%s), retrying", fromLedger, toLedger, err.Error()) time.Sleep(time.Second * time.Duration(s.reingesRetryBackoffSeconds)) err = run() }