Skip to content

Commit

Permalink
services/horizon/cmd: Fix parsing of reingest range flags like --para…
Browse files Browse the repository at this point in the history
…llel-workers (#4127)

#4060 introduced a bug where command line flags such as --parallel-workers are not getting parsed when running the horizon db reingest range command. This commit fixes this bug so that all the reingest range command line flags are parsed correctly for both the reingest range command and the fill-gaps command.
  • Loading branch information
tamirms authored Dec 6, 2021
1 parent 070f2e8 commit 01d98e6
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 62 deletions.
2 changes: 2 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Prevent duplicate errors related to liquidity pool tables during repeated reingestion of same range ([4114](https://github.com/stellar/go/pull/4114))

* In the 2.11.0 release there was a bug introduced which made the `horizon db reingest range` command ignore optional parameters like `--parallel-workers`. This bug is now fixed so all optional command line flags are parsed correctly ([4127](https://github.com/stellar/go/pull/4127))

## v2.11.0

### Changes
Expand Down
123 changes: 62 additions & 61 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,60 +228,64 @@ var (
retries uint
retryBackoffSeconds uint
)
var reingestRangeCmdOpts = []*support.ConfigOption{
{
Name: "force",
ConfigKey: &reingestForce,
OptType: types.Bool,
Required: false,
FlagDefault: false,
Usage: "[optional] if this flag is set, horizon will be blocked " +
"from ingesting until the reingestion command completes (incompatible with --parallel-workers > 1)",
},
{
Name: "parallel-workers",
ConfigKey: &parallelWorkers,
OptType: types.Uint,
Required: false,
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
OptType: types.Uint,
Required: false,
FlagDefault: uint(0),
Usage: "[optional] number of reingest retries",
},
{
Name: "retry-backoff-seconds",
ConfigKey: &retryBackoffSeconds,
OptType: types.Uint,
Required: false,
FlagDefault: uint(5),
Usage: "[optional] backoff seconds between reingest retries",
},

func ingestRangeCmdOpts() support.ConfigOptions {
return support.ConfigOptions{
{
Name: "force",
ConfigKey: &reingestForce,
OptType: types.Bool,
Required: false,
FlagDefault: false,
Usage: "[optional] if this flag is set, horizon will be blocked " +
"from ingesting until the reingestion command completes (incompatible with --parallel-workers > 1)",
},
{
Name: "parallel-workers",
ConfigKey: &parallelWorkers,
OptType: types.Uint,
Required: false,
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
OptType: types.Uint,
Required: false,
FlagDefault: uint(0),
Usage: "[optional] number of reingest retries",
},
{
Name: "retry-backoff-seconds",
ConfigKey: &retryBackoffSeconds,
OptType: types.Uint,
Required: false,
FlagDefault: uint(5),
Usage: "[optional] backoff seconds between reingest retries",
},
}
}

var dbReingestRangeCmdOpts = ingestRangeCmdOpts()
var dbReingestRangeCmd = &cobra.Command{
Use: "range [Start sequence number] [End sequence number]",
Short: "reingests ledgers within a range",
Long: "reingests ledgers between X and Y sequence number (closed intervals)",
RunE: func(cmd *cobra.Command, args []string) error {
for _, co := range reingestRangeCmdOpts {
if err := co.RequireE(); err != nil {
return err
}
co.SetValue()
if err := dbReingestRangeCmdOpts.RequireE(); err != nil {
return err
}
if err := dbReingestRangeCmdOpts.SetValues(); err != nil {
return err
}

if len(args) != 2 {
Expand Down Expand Up @@ -311,16 +315,17 @@ var dbReingestRangeCmd = &cobra.Command{
},
}

var dbFillGapsCmdOpts = ingestRangeCmdOpts()
var dbFillGapsCmd = &cobra.Command{
Use: "fill-gaps [Start sequence number] [End sequence number]",
Short: "Ingests any gaps found in the horizon db",
Long: "Ingests any gaps found in the horizon db. The command takes an optional start and end parameters which restrict the range of ledgers ingested.",
RunE: func(cmd *cobra.Command, args []string) error {
for _, co := range reingestRangeCmdOpts {
if err := co.RequireE(); err != nil {
return err
}
co.SetValue()
if err := dbFillGapsCmdOpts.RequireE(); err != nil {
return err
}
if err := dbFillGapsCmdOpts.SetValues(); err != nil {
return err
}

if len(args) != 0 && len(args) != 2 {
Expand Down Expand Up @@ -486,15 +491,11 @@ func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history
}

func init() {
for _, co := range reingestRangeCmdOpts {
err := co.Init(dbReingestRangeCmd)
if err != nil {
log.Fatal(err.Error())
}
err = co.Init(dbFillGapsCmd)
if err != nil {
log.Fatal(err.Error())
}
if err := dbReingestRangeCmdOpts.Init(dbReingestRangeCmd); err != nil {
log.Fatal(err.Error())
}
if err := dbFillGapsCmdOpts.Init(dbFillGapsCmd); err != nil {
log.Fatal(err.Error())
}

viper.BindPFlags(dbReingestRangeCmd.PersistentFlags())
Expand Down
28 changes: 27 additions & 1 deletion services/horizon/internal/integration/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ func TestReingestDB(t *testing.T) {
_, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0)
tt.NoError(err)

t.Run("validate parallel range", func(t *testing.T) {
horizoncmd.RootCmd.SetArgs(command(horizonConfig,
"db",
"reingest",
"range",
"--parallel-workers=2",
"10",
"2",
))

assert.EqualError(t, horizoncmd.RootCmd.Execute(), "Invalid range: {10 2} from > to")
})

// cap reachedLedger to the nearest checkpoint ledger because reingest range cannot ingest past the most
// recent checkpoint ledger when using captive core
toLedger := uint32(reachedLedger)
Expand Down Expand Up @@ -205,6 +218,7 @@ func TestReingestDB(t *testing.T) {
horizoncmd.RootCmd.SetArgs(command(horizonConfig, "db",
"reingest",
"range",
"--parallel-workers=1",
"1",
fmt.Sprintf("%d", toLedger),
))
Expand Down Expand Up @@ -263,6 +277,18 @@ func TestFillGaps(t *testing.T) {
})
tt.NoError(err)

t.Run("validate parallel range", func(t *testing.T) {
horizoncmd.RootCmd.SetArgs(command(horizonConfig,
"db",
"fill-gaps",
"--parallel-workers=2",
"10",
"2",
))

assert.EqualError(t, horizoncmd.RootCmd.Execute(), "Invalid range: {10 2} from > to")
})

// make sure a full checkpoint has elapsed otherwise there will be nothing to reingest
var latestCheckpoint uint32
publishedFirstCheckpoint := func() bool {
Expand Down Expand Up @@ -292,7 +318,7 @@ func TestFillGaps(t *testing.T) {
filepath.Dir(horizonConfig.CaptiveCoreConfigPath),
"captive-core-reingest-range-integration-tests.cfg",
)
horizoncmd.RootCmd.SetArgs(command(horizonConfig, "db", "fill-gaps"))
horizoncmd.RootCmd.SetArgs(command(horizonConfig, "db", "fill-gaps", "--parallel-workers=1"))
tt.NoError(horizoncmd.RootCmd.Execute())

tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger))
Expand Down

0 comments on commit 01d98e6

Please sign in to comment.