Skip to content

Commit

Permalink
services/horizon: Implement horizon db fill-gaps command (#4060)
Browse files Browse the repository at this point in the history
horizon db fill-gaps can be called with no parameters in which case it queries for any existing gaps in the horizon db and then proceeds to ingest history to fill the gaps.

The command can also be called with a start and end ledger parameter. In which case, the command will only query for gaps within the provided range.

horizon db fill-gaps 1 1000 will fill any gaps occurring within the range 1-1000
  • Loading branch information
tamirms authored Nov 10, 2021
1 parent a216798 commit 371f727
Show file tree
Hide file tree
Showing 11 changed files with 519 additions and 163 deletions.
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Add a new horizon flag `--max-assets-per-path-request` (`15` by default) that sets the number of assets to consider for strict-send and strict-recieve ([4046](https://github.com/stellar/go/pull/4046))
* Add an endpoint that allows querying for which liquidity pools an account is participating in [4043](https://github.com/stellar/go/pull/4043)
* Add a new horizon command `horizon db fill-gaps` which fills any gaps in history in the horizon db. The command takes optional start and end ledger parameters. If the start and end ledger is provided then horizon will only fill the gaps found within the given ledger range [4060](https://github.com/stellar/go/pull/4060)

## v2.10.0

Expand Down
113 changes: 92 additions & 21 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ var dbReingestRangeCmd = &cobra.Command{
for i, arg := range args {
if seq, err := strconv.ParseUint(arg, 10, 32); err != nil {
cmd.Usage()
return fmt.Errorf(`Invalid sequence number "%s"`, arg)
return fmt.Errorf(`invalid sequence number "%s"`, arg)
} else {
argsUInt32[i] = uint32(seq)
}
Expand All @@ -302,26 +302,73 @@ var dbReingestRangeCmd = &cobra.Command{
if err != nil {
return err
}
err = runDBReingestRange(argsUInt32[0], argsUInt32[1], reingestForce, parallelWorkers, *config)
if err != nil {
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
It is not possible to run the reingest command on this range in parallel with
Horizon's ingestion system.
Either reduce the range so that it doesn't overlap with Horizon's ingestion system,
or, use the force flag to ensure that Horizon's ingestion system is blocked until
the reingest command completes.`)
return runDBReingestRange(
[]history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}},
reingestForce,
parallelWorkers,
*config,
)
},
}

var dbFillGapsCmd = &cobra.Command{
Use: "fill-gaps [Start sequence number] [End sequence number]",
Short: "Ingests any gaps found in the horizon db",
Long: "Ingests any gaps found in the horizon db. The command takes an optional start and end parameters which restrict the range of ledgers ingested.",
RunE: func(cmd *cobra.Command, args []string) error {
for _, co := range reingestRangeCmdOpts {
if err := co.RequireE(); err != nil {
return err
}
co.SetValue()
}

if len(args) != 0 && len(args) != 2 {
hlog.Errorf("Expected either 0 arguments or 2 but found %v arguments", len(args))
return ErrUsage{cmd}
}

var start, end uint64
var withRange bool
if len(args) == 2 {
var err error
start, err = strconv.ParseUint(args[0], 10, 32)
if err != nil {
cmd.Usage()
return fmt.Errorf(`invalid sequence number "%s"`, args[0])
}
end, err = strconv.ParseUint(args[1], 10, 32)
if err != nil {
cmd.Usage()
return fmt.Errorf(`invalid sequence number "%s"`, args[1])
}
withRange = true
}

err := horizon.ApplyFlags(config, flags, horizon.ApplyOptions{RequireCaptiveCoreConfig: false, AlwaysIngest: true})
if err != nil {
return err
}
var gaps []history.LedgerRange
if withRange {
gaps, err = runDBDetectGapsInRange(*config, uint32(start), uint32(end))
if err != nil {
return err
}
hlog.Infof("found gaps %v within range [%v, %v]", gaps, start, end)
} else {
gaps, err = runDBDetectGaps(*config)
if err != nil {
return err
}
hlog.Infof("found gaps %v", gaps)
}

hlog.Info("Range run successfully!")
return nil
return runDBReingestRange(gaps, reingestForce, parallelWorkers, *config)
},
}

func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uint, config horizon.Config) error {
func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config) error {
if reingestForce && parallelWorkers > 1 {
return errors.New("--force is incompatible with --parallel-workers > 1")
}
Expand Down Expand Up @@ -364,8 +411,7 @@ func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uin
}

return system.ReingestRange(
from,
to,
ledgerRanges,
parallelJobSize,
)
}
Expand All @@ -375,11 +421,21 @@ func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uin
return systemErr
}

return system.ReingestRange(
from,
to,
reingestForce,
)
err = system.ReingestRange(ledgerRanges, reingestForce)
if err != nil {
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
It is not possible to run the reingest command on this range in parallel with
Horizon's ingestion system.
Either reduce the range so that it doesn't overlap with Horizon's ingestion system,
or, use the force flag to ensure that Horizon's ingestion system is blocked until
the reingest command completes.`)
}

return err
}
hlog.Info("Range run successfully!")
return nil
}

var dbDetectGapsCmd = &cobra.Command{
Expand Down Expand Up @@ -411,7 +467,7 @@ var dbDetectGapsCmd = &cobra.Command{
},
}

func runDBDetectGaps(config horizon.Config) ([]history.LedgerGap, error) {
func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) {
horizonSession, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
return nil, err
Expand All @@ -420,15 +476,29 @@ func runDBDetectGaps(config horizon.Config) ([]history.LedgerGap, error) {
return q.GetLedgerGaps(context.Background())
}

func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history.LedgerRange, error) {
horizonSession, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
return nil, err
}
q := &history.Q{horizonSession}
return q.GetLedgerGapsInRange(context.Background(), start, end)
}

func init() {
for _, co := range reingestRangeCmdOpts {
err := co.Init(dbReingestRangeCmd)
if err != nil {
log.Fatal(err.Error())
}
err = co.Init(dbFillGapsCmd)
if err != nil {
log.Fatal(err.Error())
}
}

viper.BindPFlags(dbReingestRangeCmd.PersistentFlags())
viper.BindPFlags(dbFillGapsCmd.PersistentFlags())

RootCmd.AddCommand(dbCmd)
dbCmd.AddCommand(
Expand All @@ -437,6 +507,7 @@ func init() {
dbReapCmd,
dbReingestCmd,
dbDetectGapsCmd,
dbFillGapsCmd,
)
dbMigrateCmd.AddCommand(
dbMigrateDownCmd,
Expand Down
94 changes: 86 additions & 8 deletions services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"sort"
"time"

sq "github.com/Masterminds/squirrel"
Expand Down Expand Up @@ -130,20 +131,97 @@ func (q *Q) InsertLedger(ctx context.Context,
return result.RowsAffected()
}

// GetLedgerGaps, obtains ingestion gaps in the history_ledgers table.
// GetLedgerGaps obtains ingestion gaps in the history_ledgers table.
// Returns the gaps and error.
func (q *Q) GetLedgerGaps(ctx context.Context) ([]LedgerGap, error) {
var result []LedgerGap
err := q.SelectRaw(ctx, &result, `
SELECT sequence + 1 AS gap_start,
next_number - 1 AS gap_end
func (q *Q) GetLedgerGaps(ctx context.Context) ([]LedgerRange, error) {
var gaps []LedgerRange
query := `
SELECT sequence + 1 AS start,
next_number - 1 AS end
FROM (
SELECT sequence,
LEAD(sequence) OVER (ORDER BY sequence) AS next_number
FROM history_ledgers
) number
WHERE sequence + 1 <> next_number;`)
return result, err
WHERE sequence + 1 <> next_number;`
if err := q.SelectRaw(ctx, &gaps, query); err != nil {
return nil, err
}
sort.Slice(gaps, func(i, j int) bool {
return gaps[i].StartSequence < gaps[j].StartSequence
})
return gaps, nil
}

func max(a, b uint32) uint32 {
if a > b {
return a
}
return b
}

func min(a, b uint32) uint32 {
if a > b {
return b
}
return a
}

// GetLedgerGapsInRange obtains ingestion gaps in the history_ledgers table within the given range.
// Returns the gaps and error.
func (q *Q) GetLedgerGapsInRange(ctx context.Context, start, end uint32) ([]LedgerRange, error) {
var result []LedgerRange
var oldestLedger, latestLedger uint32

if err := q.ElderLedger(ctx, &oldestLedger); err != nil {
return nil, errors.Wrap(err, "Could not query elder ledger")
} else if oldestLedger == 0 {
return []LedgerRange{{
StartSequence: start,
EndSequence: end,
}}, nil
}

if err := q.LatestLedger(ctx, &latestLedger); err != nil {
return nil, errors.Wrap(err, "Could not query latest ledger")
}

if start < oldestLedger {
result = append(result, LedgerRange{
StartSequence: start,
EndSequence: min(end, oldestLedger-1),
})
}
if end <= oldestLedger {
return result, nil
}

gaps, err := q.GetLedgerGaps(ctx)
if err != nil {
return nil, err
}

for _, gap := range gaps {
if gap.EndSequence < start {
continue
}
if gap.StartSequence > end {
break
}
result = append(result, LedgerRange{
StartSequence: max(gap.StartSequence, start),
EndSequence: min(gap.EndSequence, end),
})
}

if latestLedger < end {
result = append(result, LedgerRange{
StartSequence: max(latestLedger+1, start),
EndSequence: end,
})
}

return result, nil
}

func ledgerHeaderToMap(
Expand Down
Loading

0 comments on commit 371f727

Please sign in to comment.