diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 68712e113f..3f2c06dd3b 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -5,6 +5,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).x ## Unreleased +* Add `--parallel-workers` and `--parallel-job-size` to `horizon db reingest range`. `--parallel-workers` will parallelize reingestion using the supplied number of workers. * Add transaction set operation count to `history_ledger`([#2690](https://github.com/stellar/go/pull/2690)). Extend ingestion to store the total number of operations in the transaction set and expose it in the ledger resource via `tx_set_operation_count`. This feature allow you to assess the used capacity of a transaction set. * Remove `--ingest-failed-transactions` flag. From now on Horizon will always ingest failed transactions. WARNING: if your application is using Horizon DB directly (not recommended!) remember that now it will also contain failed txs. ([#2702](https://github.com/stellar/go/pull/2702)). diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 119ff21a13..0ecdf6a420 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/stellar/go/services/horizon/internal/db2/schema" "github.com/stellar/go/services/horizon/internal/expingest" support "github.com/stellar/go/support/config" @@ -122,16 +123,54 @@ var dbReingestCmd = &cobra.Command{ }, } -var reingestForce bool +var ( + reingestForce bool + parallelWorkers uint + parallelJobSize uint32 + retries uint + retryBackoffSeconds uint +) var reingestRangeCmdOpts = []*support.ConfigOption{ - &support.ConfigOption{ + { Name: "force", ConfigKey: &reingestForce, OptType: types.Bool, Required: false, FlagDefault: false, Usage: "[optional] if this flag is set, horizon will be blocked " + - "from ingesting until the reingestion command completes", + "from ingesting until the reingestion command completes (incompatible with --parallel-workers > 1)", + }, + { + Name: "parallel-workers", + ConfigKey: ¶llelWorkers, + OptType: types.Uint, + Required: false, + FlagDefault: uint(1), + Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", + }, + { + Name: "parallel-job-size", + ConfigKey: ¶llelJobSize, + OptType: types.Uint32, + Required: false, + FlagDefault: uint32(100000), + Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", + }, + { + Name: "retries", + ConfigKey: &retries, + OptType: types.Uint, + Required: false, + FlagDefault: uint(0), + Usage: "[optional] number of reingest retries", + }, + { + Name: "retry-backoff-seconds", + ConfigKey: &retryBackoffSeconds, + OptType: types.Uint, + Required: false, + FlagDefault: uint(5), + Usage: "[optional] backoff seconds between reingest retries", }, } @@ -144,6 +183,9 @@ var dbReingestRangeCmd = &cobra.Command{ co.Require() co.SetValue() } + if reingestForce && parallelWorkers > 1 { + log.Fatal("--force is incompatible with --parallel-workers > 1") + } if len(args) != 2 { cmd.Usage() @@ -173,25 +215,41 @@ var dbReingestRangeCmd = &cobra.Command{ } ingestConfig := expingest.Config{ - CoreSession: coreSession, - NetworkPassphrase: config.NetworkPassphrase, - HistorySession: horizonSession, - HistoryArchiveURL: config.HistoryArchiveURLs[0], + CoreSession: coreSession, + NetworkPassphrase: config.NetworkPassphrase, + HistorySession: horizonSession, + HistoryArchiveURL: config.HistoryArchiveURLs[0], + MaxReingestRetries: int(retries), + ReingesRetryBackoffSeconds: int(retryBackoffSeconds), } if config.EnableCaptiveCoreIngestion { ingestConfig.StellarCorePath = config.StellarCoreBinaryPath } - system, err := expingest.NewSystem(ingestConfig) - if err != nil { - log.Fatal(err) + if parallelWorkers < 2 { + system, systemErr := expingest.NewSystem(ingestConfig) + if err != nil { + log.Fatal(systemErr) + } + + err = system.ReingestRange( + argsInt32[0], + argsInt32[1], + reingestForce, + ) + } else { + system, systemErr := expingest.NewParallelSystems(ingestConfig, parallelWorkers) + if err != nil { + log.Fatal(systemErr) + } + + err = system.ReingestRange( + argsInt32[0], + argsInt32[1], + parallelJobSize, + ) } - err = system.ReingestRange( - argsInt32[0], - argsInt32[1], - reingestForce, - ) if err == nil { hlog.Info("Range run successfully!") return diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index e26ba84a7f..b9695f6027 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -69,7 +69,7 @@ type App struct { orderBookStream *expingest.OrderBookStream submitter *txsub.System paths paths.Finder - expingester *expingest.System + expingester expingest.System reaper *reap.System ticks *time.Ticker diff --git a/services/horizon/internal/expingest/build_state_test.go b/services/horizon/internal/expingest/build_state_test.go index f984f9d779..f17eca2773 100644 --- a/services/horizon/internal/expingest/build_state_test.go +++ b/services/horizon/internal/expingest/build_state_test.go @@ -19,7 +19,7 @@ type BuildStateTestSuite struct { suite.Suite historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter - system *System + system *system runner *mockProcessorsRunner stellarCoreClient *mockStellarCoreClient checkpointLedger uint32 @@ -33,7 +33,7 @@ func (s *BuildStateTestSuite) SetupTest() { s.stellarCoreClient = &mockStellarCoreClient{} s.checkpointLedger = uint32(63) s.lastLedger = 0 - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, diff --git a/services/horizon/internal/expingest/db_integration_test.go b/services/horizon/internal/expingest/db_integration_test.go index fc4c5b9634..9045f8134d 100644 --- a/services/horizon/internal/expingest/db_integration_test.go +++ b/services/horizon/internal/expingest/db_integration_test.go @@ -60,7 +60,7 @@ type DBTestSuite struct { sequence uint32 ledgerBackend *ledgerbackend.MockDatabaseBackend historyAdapter *adapters.MockHistoryArchiveAdapter - system *System + system *system tt *test.T } @@ -77,7 +77,7 @@ func (s *DBTestSuite) SetupTest() { s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} var err error - s.system, err = NewSystem(Config{ + sIface, err := NewSystem(Config{ CoreSession: s.tt.CoreSession(), HistorySession: s.tt.HorizonSession(), HistoryArchiveURL: "http://ignore.test", @@ -85,6 +85,7 @@ func (s *DBTestSuite) SetupTest() { DisableStateVerification: false, }) s.Assert().NoError(err) + s.system = sIface.(*system) s.sequence = uint32(28660351) s.setupMocksForBuildState() diff --git a/services/horizon/internal/expingest/fsm.go b/services/horizon/internal/expingest/fsm.go index b2dc95b579..f4c4cb059b 100644 --- a/services/horizon/internal/expingest/fsm.go +++ b/services/horizon/internal/expingest/fsm.go @@ -18,7 +18,7 @@ var ( ) type stateMachineNode interface { - run(*System) (transition, error) + run(*system) (transition, error) String() string } @@ -92,7 +92,7 @@ func (stopState) String() string { return "stop" } -func (stopState) run(s *System) (transition, error) { +func (stopState) run(s *system) (transition, error) { return stop(), errors.New("Cannot run terminal state") } @@ -102,7 +102,7 @@ func (startState) String() string { return "start" } -func (startState) run(s *System) (transition, error) { +func (startState) run(s *system) (transition, error) { if err := s.historyQ.Begin(); err != nil { return start(), errors.Wrap(err, "Error starting a transaction") } @@ -213,7 +213,7 @@ func (b buildState) String() string { return fmt.Sprintf("buildFromCheckpoint(checkpointLedger=%d)", b.checkpointLedger) } -func (b buildState) run(s *System) (transition, error) { +func (b buildState) run(s *system) (transition, error) { if b.checkpointLedger == 0 { return start(), errors.New("unexpected checkpointLedger value") } @@ -307,7 +307,7 @@ func (r resumeState) String() string { return fmt.Sprintf("resume(latestSuccessfullyProcessedLedger=%d)", r.latestSuccessfullyProcessedLedger) } -func (r resumeState) run(s *System) (transition, error) { +func (r resumeState) run(s *system) (transition, error) { if r.latestSuccessfullyProcessedLedger == 0 { return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value") } @@ -404,7 +404,7 @@ func (r resumeState) run(s *System) (transition, error) { } duration := time.Since(startTime) - s.Metrics.LedgerIngestionTimer.Update(duration) + s.Metrics().LedgerIngestionTimer.Update(duration) log. WithFields(changeStats.Map()). WithFields(ledgerTransactionStats.Map()). @@ -436,7 +436,7 @@ func (h historyRangeState) String() string { } // historyRangeState is used when catching up history data -func (h historyRangeState) run(s *System) (transition, error) { +func (h historyRangeState) run(s *system) (transition, error) { if h.fromLedger == 0 || h.toLedger == 0 || h.fromLedger > h.toLedger { return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) @@ -478,7 +478,7 @@ func (h historyRangeState) run(s *System) (transition, error) { return start(), nil } -func runTransactionProcessorsOnLedger(s *System, ledger uint32) error { +func runTransactionProcessorsOnLedger(s *system, ledger uint32) error { log.WithFields(logpkg.F{ "sequence": ledger, "state": false, @@ -520,7 +520,7 @@ func (h reingestHistoryRangeState) String() string { ) } -func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger uint32) error { +func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error { if s.historyQ.GetTx() == nil { return errors.New("expected transaction to be present") } @@ -549,7 +549,7 @@ func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger u } // reingestHistoryRangeState is used as a command to reingest historical data -func (h reingestHistoryRangeState) run(s *System) (transition, error) { +func (h reingestHistoryRangeState) run(s *system) (transition, error) { if h.fromLedger == 0 || h.toLedger == 0 || h.fromLedger > h.toLedger { return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) @@ -634,7 +634,7 @@ func (waitForCheckpointState) String() string { return "waitForCheckpoint" } -func (waitForCheckpointState) run(*System) (transition, error) { +func (waitForCheckpointState) run(*system) (transition, error) { log.Info("Waiting for the next checkpoint...") time.Sleep(10 * time.Second) return start(), nil @@ -655,7 +655,7 @@ func (v verifyRangeState) String() string { ) } -func (v verifyRangeState) run(s *System) (transition, error) { +func (v verifyRangeState) run(s *system) (transition, error) { if v.fromLedger == 0 || v.toLedger == 0 || v.fromLedger > v.toLedger { return stop(), errors.Errorf("invalid range: [%d, %d]", v.fromLedger, v.toLedger) @@ -754,7 +754,7 @@ func (stressTestState) String() string { return "stressTest" } -func (stressTestState) run(s *System) (transition, error) { +func (stressTestState) run(s *system) (transition, error) { if err := s.historyQ.Begin(); err != nil { err = errors.Wrap(err, "Error starting a transaction") return stop(), err @@ -813,7 +813,7 @@ func (stressTestState) run(s *System) (transition, error) { return stop(), nil } -func (s *System) completeIngestion(ledger uint32) error { +func (s *system) completeIngestion(ledger uint32) error { if ledger == 0 { return errors.New("ledger must be positive") } diff --git a/services/horizon/internal/expingest/ingest_history_range_state_test.go b/services/horizon/internal/expingest/ingest_history_range_state_test.go index d9cbc7e5ab..8c62e3e24c 100644 --- a/services/horizon/internal/expingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/expingest/ingest_history_range_state_test.go @@ -21,14 +21,14 @@ type IngestHistoryRangeStateTestSuite struct { historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner - system *System + system *system } func (s *IngestHistoryRangeStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, @@ -168,7 +168,7 @@ type ReingestHistoryRangeStateTestSuite struct { historyAdapter *adapters.MockHistoryArchiveAdapter ledgerBackend *mockLedgerBackend runner *mockProcessorsRunner - system *System + system *system } func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { @@ -176,7 +176,7 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.ledgerBackend = &mockLedgerBackend{} s.runner = &mockProcessorsRunner{} - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, diff --git a/services/horizon/internal/expingest/init_state_test.go b/services/horizon/internal/expingest/init_state_test.go index a27e345261..7e2773686b 100644 --- a/services/horizon/internal/expingest/init_state_test.go +++ b/services/horizon/internal/expingest/init_state_test.go @@ -17,13 +17,13 @@ type InitStateTestSuite struct { suite.Suite historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter - system *System + system *system } func (s *InitStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index ba5f390feb..b673d41c47 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -9,6 +9,7 @@ import ( "time" "github.com/rcrowley/go-metrics" + "github.com/stellar/go/clients/stellarcore" "github.com/stellar/go/exp/ingest/adapters" ingesterrors "github.com/stellar/go/exp/ingest/errors" @@ -65,7 +66,9 @@ type Config struct { // MaxStreamRetries determines how many times the reader will retry when encountering // errors while streaming xdr bucket entries from the history archive. // Set MaxStreamRetries to 0 if there should be no retry attempts - MaxStreamRetries int + MaxStreamRetries int + MaxReingestRetries int + ReingesRetryBackoffSeconds int } const ( @@ -80,23 +83,33 @@ type stellarCoreClient interface { SetCursor(ctx context.Context, id string, cursor int32) error } -type System struct { - Metrics struct { - // LedgerIngestionTimer exposes timing metrics about the rate and - // duration of ledger ingestion (including updating DB and graph). - LedgerIngestionTimer metrics.Timer +type Metrics struct { + // LedgerIngestionTimer exposes timing metrics about the rate and + // duration of ledger ingestion (including updating DB and graph). + LedgerIngestionTimer metrics.Timer - // LedgerInMemoryIngestionTimer exposes timing metrics about the rate and - // duration of ingestion into in-memory graph only. - LedgerInMemoryIngestionTimer metrics.Timer + // LedgerInMemoryIngestionTimer exposes timing metrics about the rate and + // duration of ingestion into in-memory graph only. + LedgerInMemoryIngestionTimer metrics.Timer - // StateVerifyTimer exposes timing metrics about the rate and - // duration of state verification. - StateVerifyTimer metrics.Timer - } + // StateVerifyTimer exposes timing metrics about the rate and + // duration of state verification. + StateVerifyTimer metrics.Timer +} + +type System interface { + Run() + Metrics() Metrics + StressTest(numTransactions, changesPerTransaction int) error + VerifyRange(fromLedger, toLedger uint32, verifyState bool) error + ReingestRange(fromLedger, toLedger uint32, force bool) error + Shutdown() +} - ctx context.Context - cancel context.CancelFunc +type system struct { + metrics Metrics + ctx context.Context + cancel context.CancelFunc config Config @@ -108,8 +121,10 @@ type System struct { stellarCoreClient stellarCoreClient - maxStreamRetries int - wg sync.WaitGroup + maxStreamRetries int + maxReingestRetries int + reingesRetryBackoffSeconds int + wg sync.WaitGroup // stateVerificationRunning is true when verification routine is currently // running. @@ -120,7 +135,7 @@ type System struct { disableStateVerification bool } -func NewSystem(config Config) (*System, error) { +func NewSystem(config Config) (System, error) { ctx, cancel := context.WithCancel(context.Background()) archive, err := historyarchive.Connect( @@ -157,15 +172,17 @@ func NewSystem(config Config) (*System, error) { historyAdapter := adapters.MakeHistoryArchiveAdapter(archive) - system := &System{ - ctx: ctx, - cancel: cancel, - historyAdapter: historyAdapter, - ledgerBackend: ledgerBackend, - config: config, - historyQ: historyQ, - disableStateVerification: config.DisableStateVerification, - maxStreamRetries: config.MaxStreamRetries, + system := &system{ + ctx: ctx, + cancel: cancel, + historyAdapter: historyAdapter, + ledgerBackend: ledgerBackend, + config: config, + historyQ: historyQ, + disableStateVerification: config.DisableStateVerification, + maxStreamRetries: config.MaxStreamRetries, + maxReingestRetries: config.MaxReingestRetries, + reingesRetryBackoffSeconds: config.ReingesRetryBackoffSeconds, stellarCoreClient: &stellarcore.Client{ URL: config.StellarCoreURL, }, @@ -182,10 +199,14 @@ func NewSystem(config Config) (*System, error) { return system, nil } -func (s *System) initMetrics() { - s.Metrics.LedgerIngestionTimer = metrics.NewTimer() - s.Metrics.LedgerInMemoryIngestionTimer = metrics.NewTimer() - s.Metrics.StateVerifyTimer = metrics.NewTimer() +func (s *system) initMetrics() { + s.metrics.LedgerIngestionTimer = metrics.NewTimer() + s.metrics.LedgerInMemoryIngestionTimer = metrics.NewTimer() + s.metrics.StateVerifyTimer = metrics.NewTimer() +} + +func (s *system) Metrics() Metrics { + return s.metrics } // Run starts ingestion system. Ingestion system supports distributed ingestion @@ -217,11 +238,11 @@ func (s *System) initMetrics() { // a database. // * If instances is a NOT leader, it runs ledger pipeline without updating a // a database so order book graph is updated but database is not overwritten. -func (s *System) Run() { +func (s *system) Run() { s.runStateMachine(startState{}) } -func (s *System) StressTest(numTransactions, changesPerTransaction int) error { +func (s *system) StressTest(numTransactions, changesPerTransaction int) error { if numTransactions <= 0 { return errors.New("transactions must be positive") } @@ -239,7 +260,7 @@ func (s *System) StressTest(numTransactions, changesPerTransaction int) error { // VerifyRange runs the ingestion pipeline on the range of ledgers. When // verifyState is true it verifies the state when ingestion is complete. -func (s *System) VerifyRange(fromLedger, toLedger uint32, verifyState bool) error { +func (s *system) VerifyRange(fromLedger, toLedger uint32, verifyState bool) error { return s.runStateMachine(verifyRangeState{ fromLedger: fromLedger, toLedger: toLedger, @@ -249,15 +270,24 @@ func (s *System) VerifyRange(fromLedger, toLedger uint32, verifyState bool) erro // ReingestRange runs the ingestion pipeline on the range of ledgers ingesting // history data only. -func (s *System) ReingestRange(fromLedger, toLedger uint32, force bool) error { - return s.runStateMachine(reingestHistoryRangeState{ - fromLedger: fromLedger, - toLedger: toLedger, - force: force, - }) +func (s *system) ReingestRange(fromLedger, toLedger uint32, force bool) error { + run := func() error { + return s.runStateMachine(reingestHistoryRangeState{ + fromLedger: fromLedger, + toLedger: toLedger, + force: force, + }) + } + err := run() + for retry := 0; err != nil && retry < s.maxReingestRetries; retry++ { + log.Warnf("reingest range [%d, %d] failed (%s), retrying", fromLedger, toLedger, err.Error()) + time.Sleep(time.Second * time.Duration(s.reingesRetryBackoffSeconds)) + err = run() + } + return err } -func (s *System) runStateMachine(cur stateMachineNode) error { +func (s *system) runStateMachine(cur stateMachineNode) error { defer func() { s.wg.Wait() }() @@ -310,7 +340,7 @@ func (s *System) runStateMachine(cur stateMachineNode) error { } } -func (s *System) maybeVerifyState(lastIngestedLedger uint32) { +func (s *system) maybeVerifyState(lastIngestedLedger uint32) { stateInvalid, err := s.historyQ.GetExpStateInvalid() if err != nil && !isCancelledError(err) { log.WithField("err", err).Error("Error getting state invalid value") @@ -348,20 +378,20 @@ func (s *System) maybeVerifyState(lastIngestedLedger uint32) { } } -func (s *System) incrementStateVerificationErrors() int { +func (s *system) incrementStateVerificationErrors() int { s.stateVerificationMutex.Lock() defer s.stateVerificationMutex.Unlock() s.stateVerificationErrors++ return s.stateVerificationErrors } -func (s *System) resetStateVerificationErrors() { +func (s *system) resetStateVerificationErrors() { s.stateVerificationMutex.Lock() defer s.stateVerificationMutex.Unlock() s.stateVerificationErrors = 0 } -func (s *System) updateCursor(ledgerSequence uint32) error { +func (s *system) updateCursor(ledgerSequence uint32) error { if s.stellarCoreClient == nil { return nil } @@ -381,7 +411,7 @@ func (s *System) updateCursor(ledgerSequence uint32) error { return nil } -func (s *System) Shutdown() { +func (s *system) Shutdown() { log.Info("Shutting down ingestion system...") s.stateVerificationMutex.Lock() defer s.stateVerificationMutex.Unlock() diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index 3baa1f333c..90eb1f6a97 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -7,6 +7,9 @@ import ( "testing" "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stellar/go/exp/ingest/adapters" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/exp/ingest/ledgerbackend" @@ -15,8 +18,6 @@ import ( "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) var ( @@ -89,8 +90,9 @@ func TestNewSystem(t *testing.T) { HistoryArchiveURL: "https://history.stellar.org/prd/core-live/core_live_001", } - system, err := NewSystem(config) + sIface, err := NewSystem(config) assert.NoError(t, err) + system := sIface.(*system) assert.Equal(t, config, system.config) assert.Equal(t, config.DisableStateVerification, system.disableStateVerification) @@ -102,7 +104,7 @@ func TestNewSystem(t *testing.T) { func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: context.Background(), } @@ -116,7 +118,7 @@ func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { func TestStateMachineTransition(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: context.Background(), } @@ -133,7 +135,7 @@ func TestStateMachineTransition(t *testing.T) { func TestContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: ctx, } @@ -151,7 +153,7 @@ func TestContextCancel(t *testing.T) { // non-zero exit code. func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ ctx: context.Background(), historyQ: historyQ, } @@ -165,7 +167,7 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing. func TestMaybeVerifyStateGetExpStateInvalidDBErrCancelOrContextCanceled(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: context.Background(), } @@ -191,7 +193,7 @@ func TestMaybeVerifyStateGetExpStateInvalidDBErrCancelOrContextCanceled(t *testi } func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { historyQ := &mockDBQ{} - system := &System{ + system := &system{ historyQ: historyQ, ctx: context.Background(), } @@ -433,3 +435,37 @@ func (m *mockStellarCoreClient) SetCursor(ctx context.Context, id string, cursor } var _ stellarCoreClient = (*mockStellarCoreClient)(nil) + +type mockSystem struct { + mock.Mock +} + +func (m *mockSystem) Run() { + m.Called() +} + +func (m *mockSystem) Metrics() Metrics { + args := m.Called() + return args.Get(0).(Metrics) +} + +func (m *mockSystem) StressTest(numTransactions, changesPerTransaction int) error { + args := m.Called(numTransactions, changesPerTransaction) + return args.Error(0) +} + +func (m *mockSystem) VerifyRange(fromLedger, toLedger uint32, verifyState bool) error { + args := m.Called(fromLedger, toLedger, verifyState) + return args.Error(0) +} + +func (m *mockSystem) ReingestRange(fromLedger, toLedger uint32, force bool) error { + args := m.Called(fromLedger, toLedger, force) + return args.Error(0) +} + +func (m *mockSystem) Shutdown() { + m.Called() +} + +var _ System = (*mockSystem)(nil) diff --git a/services/horizon/internal/expingest/parallel.go b/services/horizon/internal/expingest/parallel.go new file mode 100644 index 0000000000..2e48d3e8f2 --- /dev/null +++ b/services/horizon/internal/expingest/parallel.go @@ -0,0 +1,158 @@ +package expingest + +import ( + "fmt" + "sync" + + "github.com/stellar/go/support/errors" + logpkg "github.com/stellar/go/support/log" +) + +const ( + historyCheckpointLedgerInterval = 64 + minBatchSize = historyCheckpointLedgerInterval +) + +type ledgerRange struct { + from uint32 + to uint32 +} + +type rangeError struct { + err error + ledgerRange ledgerRange +} + +func (e rangeError) Error() string { + return fmt.Sprintf("error when processing [%d, %d] range: %s", e.ledgerRange.from, e.ledgerRange.to, e.err) +} + +type ParallelSystems struct { + config Config + workerCount uint + systemFactory func(Config) (System, error) +} + +func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { + // Leaving this because used in tests, will update after a code review. + return newParallelSystems(config, workerCount, NewSystem) +} + +// private version of NewParallel systems, allowing to inject a mock system +func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { + if workerCount < 1 { + return nil, errors.New("workerCount must be > 0") + } + + return &ParallelSystems{ + config: config, + workerCount: workerCount, + systemFactory: systemFactory, + }, nil +} + +func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, reingestJobQueue <-chan ledgerRange) rangeError { + for { + select { + case <-stop: + return rangeError{} + case reingestRange := <-reingestJobQueue: + err := s.ReingestRange(reingestRange.from, reingestRange.to, false) + if err != nil { + return rangeError{ + err: err, + ledgerRange: reingestRange, + } + } + log.WithFields(logpkg.F{"from": reingestRange.from, "to": reingestRange.to}).Info("successfully reingested range") + } + } +} + +func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { + batchSize := batchSizeSuggestion + if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { + // let's try to make use of all the workers + batchSize = rangeSize / uint32(workerCount) + } + // Use a minimum batch size to make it worth it in terms of overhead + if batchSize < minBatchSize { + batchSize = minBatchSize + } + + // Also, round the batch size to the closest, lower or equal 64 multiple + return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval +} + +func (ps *ParallelSystems) ReingestRange(fromLedger, toLedger uint32, batchSizeSuggestion uint32) error { + var ( + batchSize = calculateParallelLedgerBatchSize(toLedger-fromLedger, batchSizeSuggestion, ps.workerCount) + reingestJobQueue = make(chan ledgerRange) + wg sync.WaitGroup + + // stopOnce is used to close the stop channel once: closing a closed channel panics and it can happen in case + // of errors in multiple ranges. + stopOnce sync.Once + stop = make(chan struct{}) + + lowestRangeErrMutex sync.Mutex + // lowestRangeErr is an error of the failed range with the lowest starting ledger sequence that is used to tell + // the user which range to reingest in case of errors. We use that fact that System.ReingestRange is blocking, + // jobs are sent to a queue (unbuffered channel) in sequence and there is a WaitGroup waiting for all the workers + // to exit. + // Because of this when we reach `wg.Wait()` all jobs previously sent to a channel are processed (either success + // or failure). In case of a failure we save the range with the smallest sequence number because this is where + // the user needs to start again to prevent the gaps. + lowestRangeErr *rangeError + ) + + for i := uint(0); i < ps.workerCount; i++ { + wg.Add(1) + s, err := ps.systemFactory(ps.config) + if err != nil { + return errors.Wrap(err, "error creating new system") + } + go func() { + defer wg.Done() + rangeErr := ps.runReingestWorker(s, stop, reingestJobQueue) + if rangeErr.err != nil { + log.WithError(rangeErr).Error("error in reingest worker") + lowestRangeErrMutex.Lock() + if lowestRangeErr == nil || lowestRangeErr.ledgerRange.from > rangeErr.ledgerRange.from { + lowestRangeErr = &rangeErr + } + lowestRangeErrMutex.Unlock() + stopOnce.Do(func() { + close(stop) + }) + return + } + }() + } + +rangeQueueLoop: + for subRangeFrom := fromLedger; subRangeFrom < toLedger; { + // job queuing + subRangeTo := subRangeFrom + (batchSize - 1) // we subtract one because both from and to are part of the batch + if subRangeTo > toLedger { + subRangeTo = toLedger + } + select { + case <-stop: + break rangeQueueLoop + case reingestJobQueue <- ledgerRange{subRangeFrom, subRangeTo}: + } + subRangeFrom = subRangeTo + 1 + } + + stopOnce.Do(func() { + close(stop) + }) + wg.Wait() + close(reingestJobQueue) + + if lowestRangeErr != nil { + return errors.Wrapf(lowestRangeErr, "job failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.from, toLedger) + } + return nil +} diff --git a/services/horizon/internal/expingest/parallel_test.go b/services/horizon/internal/expingest/parallel_test.go new file mode 100644 index 0000000000..83b0a340ad --- /dev/null +++ b/services/horizon/internal/expingest/parallel_test.go @@ -0,0 +1,110 @@ +package expingest + +import ( + "math/rand" + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/stellar/go/support/errors" +) + +func TestCalculateParallelLedgerBatchSize(t *testing.T) { + assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4)) + assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) +} + +type sorteableRanges []ledgerRange + +func (s sorteableRanges) Len() int { return len(s) } +func (s sorteableRanges) Less(i, j int) bool { return s[i].from < s[j].from } +func (s sorteableRanges) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func TestParallelReingestRange(t *testing.T) { + config := Config{} + var ( + rangesCalled sorteableRanges + m sync.Mutex + ) + factory := func(c Config) (System, error) { + result := &mockSystem{} + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Run( + func(args mock.Arguments) { + r := ledgerRange{ + from: args.Get(0).(uint32), + to: args.Get(1).(uint32), + } + m.Lock() + defer m.Unlock() + rangesCalled = append(rangesCalled, r) + // simulate call + time.Sleep(time.Millisecond * time.Duration(10+rand.Int31n(50))) + }).Return(error(nil)) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.NoError(t, err) + + sort.Sort(rangesCalled) + expected := sorteableRanges{ + {from: 0, to: 255}, {from: 256, to: 511}, {from: 512, to: 767}, {from: 768, to: 1023}, {from: 1024, to: 1279}, + {from: 1280, to: 1535}, {from: 1536, to: 1791}, {from: 1792, to: 2047}, {from: 2048, to: 2050}, + } + assert.Equal(t, expected, rangesCalled) + +} + +func TestParallelReingestRangeError(t *testing.T) { + config := Config{} + factory := func(c Config) (System, error) { + result := &mockSystem{} + // Fail on the second range + result.On("ReingestRange", uint32(1536), uint32(1791), mock.AnythingOfType("bool")).Return(errors.New("failed because of foo")) + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Return(error(nil)) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.Error(t, err) + assert.Equal(t, "job failed, recommended restart range: [1536, 2050]: error when processing [1536, 1791] range: failed because of foo", err.Error()) + +} + +func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { + config := Config{} + var wg sync.WaitGroup + wg.Add(1) + factory := func(c Config) (System, error) { + result := &mockSystem{} + // Fail on an lower subrange after the first error + result.On("ReingestRange", uint32(1024), uint32(1279), mock.AnythingOfType("bool")).Run(func(mock.Arguments) { + // Wait for a more recent range to error + wg.Wait() + // This sleep should help making sure the result of this range is processed later than the one below + // (there are no guarantees without instrumenting ReingestRange(), but that's too complicated) + time.Sleep(50 * time.Millisecond) + }).Return(errors.New("failed because of foo")) + result.On("ReingestRange", uint32(1536), uint32(1791), mock.AnythingOfType("bool")).Run(func(mock.Arguments) { + wg.Done() + }).Return(errors.New("failed because of bar")) + result.On("ReingestRange", mock.AnythingOfType("uint32"), mock.AnythingOfType("uint32"), mock.AnythingOfType("bool")).Return(error(nil)) + return result, nil + } + system, err := newParallelSystems(config, 3, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 2050, 258) + assert.Error(t, err) + assert.Equal(t, "job failed, recommended restart range: [1024, 2050]: error when processing [1024, 1279] range: failed because of foo", err.Error()) + +} diff --git a/services/horizon/internal/expingest/resume_state_test.go b/services/horizon/internal/expingest/resume_state_test.go index 04ec556be1..ff6ca22bd9 100644 --- a/services/horizon/internal/expingest/resume_state_test.go +++ b/services/horizon/internal/expingest/resume_state_test.go @@ -23,7 +23,7 @@ type ResumeTestTestSuite struct { historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner stellarCoreClient *mockStellarCoreClient - system *System + system *system } func (s *ResumeTestTestSuite) SetupTest() { @@ -32,7 +32,7 @@ func (s *ResumeTestTestSuite) SetupTest() { s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} s.stellarCoreClient = &mockStellarCoreClient{} - s.system = &System{ + s.system = &system{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, diff --git a/services/horizon/internal/expingest/stress_test.go b/services/horizon/internal/expingest/stress_test.go index c0d02f1b44..5ea22bf87f 100644 --- a/services/horizon/internal/expingest/stress_test.go +++ b/services/horizon/internal/expingest/stress_test.go @@ -18,14 +18,14 @@ type StressTestStateTestSuite struct { historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner - system *System + system *system } func (s *StressTestStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} - s.system = &System{ + s.system = &system{ historyQ: s.historyQ, historyAdapter: s.historyAdapter, runner: s.runner, diff --git a/services/horizon/internal/expingest/verify.go b/services/horizon/internal/expingest/verify.go index aa40fc9b68..8f8f24efdd 100644 --- a/services/horizon/internal/expingest/verify.go +++ b/services/horizon/internal/expingest/verify.go @@ -30,7 +30,7 @@ const stateVerifierExpectedIngestionVersion = 10 // verifyState is called as a go routine from pipeline post hook every 64 // ledgers. It checks if the state is correct. If another go routine is already // running it exits. -func (s *System) verifyState(verifyAgainstLatestCheckpoint bool) error { +func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { s.stateVerificationMutex.Lock() if s.stateVerificationRunning { log.Warn("State verification is already running...") @@ -57,7 +57,7 @@ func (s *System) verifyState(verifyAgainstLatestCheckpoint bool) error { defer func() { duration := time.Since(startTime) if updateMetrics { - s.Metrics.StateVerifyTimer.Update(duration) + s.Metrics().StateVerifyTimer.Update(duration) } log.WithField("duration", duration.Seconds()).Info("State verification finished") historyQ.Rollback() diff --git a/services/horizon/internal/expingest/verify_range_state_test.go b/services/horizon/internal/expingest/verify_range_state_test.go index a9cbcd99a8..be042c60f3 100644 --- a/services/horizon/internal/expingest/verify_range_state_test.go +++ b/services/horizon/internal/expingest/verify_range_state_test.go @@ -25,14 +25,14 @@ type VerifyRangeStateTestSuite struct { historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter runner *mockProcessorsRunner - system *System + system *system } func (s *VerifyRangeStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} - s.system = &System{ + s.system = &system{ historyQ: s.historyQ, historyAdapter: s.historyAdapter, runner: s.runner, diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index e8dc49fd55..8da9860d3f 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -161,9 +161,9 @@ func initIngestMetrics(app *App) { if app.expingester == nil { return } - app.metrics.Register("ingest.ledger_ingestion", app.expingester.Metrics.LedgerIngestionTimer) - app.metrics.Register("ingest.ledger_in_memory_ingestion", app.expingester.Metrics.LedgerInMemoryIngestionTimer) - app.metrics.Register("ingest.state_verify", app.expingester.Metrics.StateVerifyTimer) + app.metrics.Register("ingest.ledger_ingestion", app.expingester.Metrics().LedgerIngestionTimer) + app.metrics.Register("ingest.ledger_in_memory_ingestion", app.expingester.Metrics().LedgerInMemoryIngestionTimer) + app.metrics.Register("ingest.state_verify", app.expingester.Metrics().StateVerifyTimer) } func initTxSubMetrics(app *App) {