Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Parallelize db reingest range #2724

Merged
merged 23 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).x

## Unreleased

* Add `--parallel-workers` and `--parallel-job-size` to `horizon db reingest range`. `--parallel-workers` will parallelize reingestion using the supplied number of workers.
* Add transaction set operation count to `history_ledger`([#2690](https://github.com/stellar/go/pull/2690)).
Extend ingestion to store the total number of operations in the transaction set and expose it in the ledger resource via `tx_set_operation_count`. This feature allow you to assess the used capacity of a transaction set.
* Remove `--ingest-failed-transactions` flag. From now on Horizon will always ingest failed transactions. WARNING: if your application is using Horizon DB directly (not recommended!) remember that now it will also contain failed txs. ([#2702](https://github.com/stellar/go/pull/2702)).
Expand Down
88 changes: 73 additions & 15 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/stellar/go/services/horizon/internal/db2/schema"
"github.com/stellar/go/services/horizon/internal/expingest"
support "github.com/stellar/go/support/config"
Expand Down Expand Up @@ -122,16 +123,54 @@ var dbReingestCmd = &cobra.Command{
},
}

var reingestForce bool
var (
reingestForce bool
parallelWorkers uint
parallelJobSize uint32
retries uint
retryBackoffSeconds uint
)
var reingestRangeCmdOpts = []*support.ConfigOption{
&support.ConfigOption{
{
Name: "force",
ConfigKey: &reingestForce,
OptType: types.Bool,
Required: false,
FlagDefault: false,
Usage: "[optional] if this flag is set, horizon will be blocked " +
"from ingesting until the reingestion command completes",
"from ingesting until the reingestion command completes (incompatible with --parallel-workers > 1)",
},
{
Name: "parallel-workers",
ConfigKey: &parallelWorkers,
OptType: types.Uint,
Required: false,
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
OptType: types.Uint,
Required: false,
FlagDefault: uint(0),
Usage: "[optional] number of reingest retries",
},
{
Name: "retry-backoff-seconds",
ConfigKey: &retryBackoffSeconds,
OptType: types.Uint,
Required: false,
FlagDefault: uint(5),
Usage: "[optional] backoff seconds between reingest retries",
},
}

Expand All @@ -144,6 +183,9 @@ var dbReingestRangeCmd = &cobra.Command{
co.Require()
co.SetValue()
}
if reingestForce && parallelWorkers > 1 {
log.Fatal("--force is incompatible with --parallel-workers > 1")
}

if len(args) != 2 {
cmd.Usage()
Expand Down Expand Up @@ -173,25 +215,41 @@ var dbReingestRangeCmd = &cobra.Command{
}

ingestConfig := expingest.Config{
CoreSession: coreSession,
NetworkPassphrase: config.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
CoreSession: coreSession,
NetworkPassphrase: config.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
MaxReingestRetries: int(retries),
ReingesRetryBackoffSeconds: int(retryBackoffSeconds),
}
if config.EnableCaptiveCoreIngestion {
ingestConfig.StellarCorePath = config.StellarCoreBinaryPath
}

system, err := expingest.NewSystem(ingestConfig)
if err != nil {
log.Fatal(err)
if parallelWorkers < 2 {
system, systemErr := expingest.NewSystem(ingestConfig)
if err != nil {
log.Fatal(systemErr)
}

err = system.ReingestRange(
argsInt32[0],
argsInt32[1],
reingestForce,
)
} else {
system, systemErr := expingest.NewParallelSystems(ingestConfig, parallelWorkers)
if err != nil {
log.Fatal(systemErr)
}

err = system.ReingestRange(
argsInt32[0],
argsInt32[1],
parallelJobSize,
)
}

err = system.ReingestRange(
argsInt32[0],
argsInt32[1],
reingestForce,
)
if err == nil {
hlog.Info("Range run successfully!")
return
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type App struct {
orderBookStream *expingest.OrderBookStream
submitter *txsub.System
paths paths.Finder
expingester *expingest.System
expingester expingest.System
reaper *reap.System
ticks *time.Ticker

Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/expingest/build_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type BuildStateTestSuite struct {
suite.Suite
historyQ *mockDBQ
historyAdapter *adapters.MockHistoryArchiveAdapter
system *System
system *system
runner *mockProcessorsRunner
stellarCoreClient *mockStellarCoreClient
checkpointLedger uint32
Expand All @@ -33,7 +33,7 @@ func (s *BuildStateTestSuite) SetupTest() {
s.stellarCoreClient = &mockStellarCoreClient{}
s.checkpointLedger = uint32(63)
s.lastLedger = 0
s.system = &System{
s.system = &system{
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/expingest/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type DBTestSuite struct {
sequence uint32
ledgerBackend *ledgerbackend.MockDatabaseBackend
historyAdapter *adapters.MockHistoryArchiveAdapter
system *System
system *system
tt *test.T
}

Expand All @@ -77,14 +77,15 @@ func (s *DBTestSuite) SetupTest() {
s.ledgerBackend = &ledgerbackend.MockDatabaseBackend{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
var err error
s.system, err = NewSystem(Config{
sIface, err := NewSystem(Config{
CoreSession: s.tt.CoreSession(),
HistorySession: s.tt.HorizonSession(),
HistoryArchiveURL: "http://ignore.test",
MaxStreamRetries: 3,
DisableStateVerification: false,
})
s.Assert().NoError(err)
s.system = sIface.(*system)

s.sequence = uint32(28660351)
s.setupMocksForBuildState()
Expand Down
28 changes: 14 additions & 14 deletions services/horizon/internal/expingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
)

type stateMachineNode interface {
run(*System) (transition, error)
run(*system) (transition, error)
String() string
}

Expand Down Expand Up @@ -92,7 +92,7 @@ func (stopState) String() string {
return "stop"
}

func (stopState) run(s *System) (transition, error) {
func (stopState) run(s *system) (transition, error) {
return stop(), errors.New("Cannot run terminal state")
}

Expand All @@ -102,7 +102,7 @@ func (startState) String() string {
return "start"
}

func (startState) run(s *System) (transition, error) {
func (startState) run(s *system) (transition, error) {
if err := s.historyQ.Begin(); err != nil {
return start(), errors.Wrap(err, "Error starting a transaction")
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (b buildState) String() string {
return fmt.Sprintf("buildFromCheckpoint(checkpointLedger=%d)", b.checkpointLedger)
}

func (b buildState) run(s *System) (transition, error) {
func (b buildState) run(s *system) (transition, error) {
if b.checkpointLedger == 0 {
return start(), errors.New("unexpected checkpointLedger value")
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func (r resumeState) String() string {
return fmt.Sprintf("resume(latestSuccessfullyProcessedLedger=%d)", r.latestSuccessfullyProcessedLedger)
}

func (r resumeState) run(s *System) (transition, error) {
func (r resumeState) run(s *system) (transition, error) {
if r.latestSuccessfullyProcessedLedger == 0 {
return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value")
}
Expand Down Expand Up @@ -404,7 +404,7 @@ func (r resumeState) run(s *System) (transition, error) {
}

duration := time.Since(startTime)
s.Metrics.LedgerIngestionTimer.Update(duration)
s.Metrics().LedgerIngestionTimer.Update(duration)
log.
WithFields(changeStats.Map()).
WithFields(ledgerTransactionStats.Map()).
Expand Down Expand Up @@ -436,7 +436,7 @@ func (h historyRangeState) String() string {
}

// historyRangeState is used when catching up history data
func (h historyRangeState) run(s *System) (transition, error) {
func (h historyRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
Expand Down Expand Up @@ -478,7 +478,7 @@ func (h historyRangeState) run(s *System) (transition, error) {
return start(), nil
}

func runTransactionProcessorsOnLedger(s *System, ledger uint32) error {
func runTransactionProcessorsOnLedger(s *system, ledger uint32) error {
log.WithFields(logpkg.F{
"sequence": ledger,
"state": false,
Expand Down Expand Up @@ -520,7 +520,7 @@ func (h reingestHistoryRangeState) String() string {
)
}

func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger uint32) error {
func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
}
Expand Down Expand Up @@ -549,7 +549,7 @@ func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger u
}

// reingestHistoryRangeState is used as a command to reingest historical data
func (h reingestHistoryRangeState) run(s *System) (transition, error) {
func (h reingestHistoryRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
Expand Down Expand Up @@ -634,7 +634,7 @@ func (waitForCheckpointState) String() string {
return "waitForCheckpoint"
}

func (waitForCheckpointState) run(*System) (transition, error) {
func (waitForCheckpointState) run(*system) (transition, error) {
log.Info("Waiting for the next checkpoint...")
time.Sleep(10 * time.Second)
return start(), nil
Expand All @@ -655,7 +655,7 @@ func (v verifyRangeState) String() string {
)
}

func (v verifyRangeState) run(s *System) (transition, error) {
func (v verifyRangeState) run(s *system) (transition, error) {
if v.fromLedger == 0 || v.toLedger == 0 ||
v.fromLedger > v.toLedger {
return stop(), errors.Errorf("invalid range: [%d, %d]", v.fromLedger, v.toLedger)
Expand Down Expand Up @@ -754,7 +754,7 @@ func (stressTestState) String() string {
return "stressTest"
}

func (stressTestState) run(s *System) (transition, error) {
func (stressTestState) run(s *system) (transition, error) {
if err := s.historyQ.Begin(); err != nil {
err = errors.Wrap(err, "Error starting a transaction")
return stop(), err
Expand Down Expand Up @@ -813,7 +813,7 @@ func (stressTestState) run(s *System) (transition, error) {
return stop(), nil
}

func (s *System) completeIngestion(ledger uint32) error {
func (s *system) completeIngestion(ledger uint32) error {
if ledger == 0 {
return errors.New("ledger must be positive")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ type IngestHistoryRangeStateTestSuite struct {
historyQ *mockDBQ
historyAdapter *adapters.MockHistoryArchiveAdapter
runner *mockProcessorsRunner
system *System
system *system
}

func (s *IngestHistoryRangeStateTestSuite) SetupTest() {
s.historyQ = &mockDBQ{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
s.runner = &mockProcessorsRunner{}
s.system = &System{
s.system = &system{
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
Expand Down Expand Up @@ -168,15 +168,15 @@ type ReingestHistoryRangeStateTestSuite struct {
historyAdapter *adapters.MockHistoryArchiveAdapter
ledgerBackend *mockLedgerBackend
runner *mockProcessorsRunner
system *System
system *system
}

func (s *ReingestHistoryRangeStateTestSuite) SetupTest() {
s.historyQ = &mockDBQ{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
s.ledgerBackend = &mockLedgerBackend{}
s.runner = &mockProcessorsRunner{}
s.system = &System{
s.system = &system{
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/expingest/init_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ type InitStateTestSuite struct {
suite.Suite
historyQ *mockDBQ
historyAdapter *adapters.MockHistoryArchiveAdapter
system *System
system *system
}

func (s *InitStateTestSuite) SetupTest() {
s.historyQ = &mockDBQ{}
s.historyAdapter = &adapters.MockHistoryArchiveAdapter{}
s.system = &System{
s.system = &system{
ctx: context.Background(),
historyQ: s.historyQ,
historyAdapter: s.historyAdapter,
Expand Down
Loading