Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/cmd: Fix parsing of reingest range flags like --parallel-workers #4127

Merged
merged 2 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Prevent duplicate errors related to liquidity pool tables during repeated reingestion of same range ([4114](https://github.com/stellar/go/pull/4114))

* In the 2.11.0 release there was a bug introduced which made the `horizon db reingest range` command ignore optional parameters like `--parallel-workers`. This bug is now fixed so all optional command line flags are parsed correctly ([4127](https://github.com/stellar/go/pull/4127))

## v2.11.0

### Changes
Expand Down
123 changes: 62 additions & 61 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,60 +228,64 @@ var (
retries uint
retryBackoffSeconds uint
)
var reingestRangeCmdOpts = []*support.ConfigOption{
{
Name: "force",
ConfigKey: &reingestForce,
OptType: types.Bool,
Required: false,
FlagDefault: false,
Usage: "[optional] if this flag is set, horizon will be blocked " +
"from ingesting until the reingestion command completes (incompatible with --parallel-workers > 1)",
},
{
Name: "parallel-workers",
ConfigKey: &parallelWorkers,
OptType: types.Uint,
Required: false,
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
OptType: types.Uint,
Required: false,
FlagDefault: uint(0),
Usage: "[optional] number of reingest retries",
},
{
Name: "retry-backoff-seconds",
ConfigKey: &retryBackoffSeconds,
OptType: types.Uint,
Required: false,
FlagDefault: uint(5),
Usage: "[optional] backoff seconds between reingest retries",
},

func ingestRangeCmdOpts() support.ConfigOptions {
return support.ConfigOptions{
{
Name: "force",
ConfigKey: &reingestForce,
OptType: types.Bool,
Required: false,
FlagDefault: false,
Usage: "[optional] if this flag is set, horizon will be blocked " +
"from ingesting until the reingestion command completes (incompatible with --parallel-workers > 1)",
},
{
Name: "parallel-workers",
ConfigKey: &parallelWorkers,
OptType: types.Uint,
Required: false,
FlagDefault: uint(1),
Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers",
},
{
Name: "parallel-job-size",
ConfigKey: &parallelJobSize,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "retries",
ConfigKey: &retries,
OptType: types.Uint,
Required: false,
FlagDefault: uint(0),
Usage: "[optional] number of reingest retries",
},
{
Name: "retry-backoff-seconds",
ConfigKey: &retryBackoffSeconds,
OptType: types.Uint,
Required: false,
FlagDefault: uint(5),
Usage: "[optional] backoff seconds between reingest retries",
},
}
}

var dbReingestRangeCmdOpts = ingestRangeCmdOpts()
var dbReingestRangeCmd = &cobra.Command{
Use: "range [Start sequence number] [End sequence number]",
Short: "reingests ledgers within a range",
Long: "reingests ledgers between X and Y sequence number (closed intervals)",
RunE: func(cmd *cobra.Command, args []string) error {
for _, co := range reingestRangeCmdOpts {
if err := co.RequireE(); err != nil {
return err
}
co.SetValue()
if err := dbReingestRangeCmdOpts.RequireE(); err != nil {
return err
}
if err := dbReingestRangeCmdOpts.SetValues(); err != nil {
return err
}

if len(args) != 2 {
Expand Down Expand Up @@ -311,16 +315,17 @@ var dbReingestRangeCmd = &cobra.Command{
},
}

var dbFillGapsCmdOpts = ingestRangeCmdOpts()
var dbFillGapsCmd = &cobra.Command{
Use: "fill-gaps [Start sequence number] [End sequence number]",
Short: "Ingests any gaps found in the horizon db",
Long: "Ingests any gaps found in the horizon db. The command takes an optional start and end parameters which restrict the range of ledgers ingested.",
RunE: func(cmd *cobra.Command, args []string) error {
for _, co := range reingestRangeCmdOpts {
if err := co.RequireE(); err != nil {
return err
}
co.SetValue()
if err := dbFillGapsCmdOpts.RequireE(); err != nil {
return err
}
if err := dbFillGapsCmdOpts.SetValues(); err != nil {
return err
}

if len(args) != 0 && len(args) != 2 {
Expand Down Expand Up @@ -486,15 +491,11 @@ func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history
}

func init() {
for _, co := range reingestRangeCmdOpts {
err := co.Init(dbReingestRangeCmd)
if err != nil {
log.Fatal(err.Error())
}
err = co.Init(dbFillGapsCmd)
if err != nil {
log.Fatal(err.Error())
}
if err := dbReingestRangeCmdOpts.Init(dbReingestRangeCmd); err != nil {
log.Fatal(err.Error())
}
if err := dbFillGapsCmdOpts.Init(dbFillGapsCmd); err != nil {
log.Fatal(err.Error())
}

viper.BindPFlags(dbReingestRangeCmd.PersistentFlags())
Expand Down
28 changes: 27 additions & 1 deletion services/horizon/internal/integration/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,19 @@ func TestReingestDB(t *testing.T) {
_, err = schema.Migrate(dbConn.DB.DB, schema.MigrateUp, 0)
tt.NoError(err)

t.Run("validate parallel range", func(t *testing.T) {
horizoncmd.RootCmd.SetArgs(command(horizonConfig,
"db",
"reingest",
"range",
"--parallel-workers=2",
"10",
"2",
))

assert.EqualError(t, horizoncmd.RootCmd.Execute(), "Invalid range: {10 2} from > to")
})

// cap reachedLedger to the nearest checkpoint ledger because reingest range cannot ingest past the most
// recent checkpoint ledger when using captive core
toLedger := uint32(reachedLedger)
Expand Down Expand Up @@ -205,6 +218,7 @@ func TestReingestDB(t *testing.T) {
horizoncmd.RootCmd.SetArgs(command(horizonConfig, "db",
"reingest",
"range",
"--parallel-workers=1",
"1",
fmt.Sprintf("%d", toLedger),
))
Expand Down Expand Up @@ -263,6 +277,18 @@ func TestFillGaps(t *testing.T) {
})
tt.NoError(err)

t.Run("validate parallel range", func(t *testing.T) {
horizoncmd.RootCmd.SetArgs(command(horizonConfig,
"db",
"fill-gaps",
"--parallel-workers=2",
"10",
"2",
))

assert.EqualError(t, horizoncmd.RootCmd.Execute(), "Invalid range: {10 2} from > to")
})

// make sure a full checkpoint has elapsed otherwise there will be nothing to reingest
var latestCheckpoint uint32
publishedFirstCheckpoint := func() bool {
Expand Down Expand Up @@ -292,7 +318,7 @@ func TestFillGaps(t *testing.T) {
filepath.Dir(horizonConfig.CaptiveCoreConfigPath),
"captive-core-reingest-range-integration-tests.cfg",
)
horizoncmd.RootCmd.SetArgs(command(horizonConfig, "db", "fill-gaps"))
horizoncmd.RootCmd.SetArgs(command(horizonConfig, "db", "fill-gaps", "--parallel-workers=1"))
tt.NoError(horizoncmd.RootCmd.Execute())

tt.NoError(historyQ.LatestLedger(context.Background(), &latestLedger))
Expand Down