Skip to content

Commit

Permalink
Implement horizon db fill-gaps command
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Nov 9, 2021
1 parent 80cfe44 commit 3c820f7
Show file tree
Hide file tree
Showing 10 changed files with 490 additions and 131 deletions.
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Add a new horizon flag `--max-assets-per-path-request` (`15` by default) that sets the number of assets to consider for strict-send and strict-recieve ([4046](https://github.com/stellar/go/pull/4046))
* Add an endpoint that allows querying for which liquidity pools an account is participating in [4043](https://github.com/stellar/go/pull/4043)
* Add a new horizon command `horizon db fill-gaps` which fills any gaps in history in the horizon db. The command takes optional start and end ledger parameters. If the start and end ledger is provided then horizon will only fill the gaps found within the given ledger range [4060](https://github.com/stellar/go/pull/4060)

## v2.10.0

Expand Down
110 changes: 90 additions & 20 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ var dbReingestRangeCmd = &cobra.Command{
for i, arg := range args {
if seq, err := strconv.ParseUint(arg, 10, 32); err != nil {
cmd.Usage()
return fmt.Errorf(`Invalid sequence number "%s"`, arg)
return fmt.Errorf(`invalid sequence number "%s"`, arg)
} else {
argsUInt32[i] = uint32(seq)
}
Expand All @@ -302,26 +302,72 @@ var dbReingestRangeCmd = &cobra.Command{
if err != nil {
return err
}
err = runDBReingestRange(argsUInt32[0], argsUInt32[1], reingestForce, parallelWorkers, *config)
if err != nil {
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
It is not possible to run the reingest command on this range in parallel with
Horizon's ingestion system.
Either reduce the range so that it doesn't overlap with Horizon's ingestion system,
or, use the force flag to ensure that Horizon's ingestion system is blocked until
the reingest command completes.`)
return runDBReingestRange([][2]uint32{{argsUInt32[0], argsUInt32[1]}}, reingestForce, parallelWorkers, *config)
},
}

var dbFillGapsCmd = &cobra.Command{
Use: "fill-gaps [Start sequence number] [End sequence number]",
Short: "Ingests any gaps found in the horizon db",
Long: "Ingests any gaps found in the horizon db. The command takes an optional start and end parameters which restrict the range of ledgers ingested.",
RunE: func(cmd *cobra.Command, args []string) error {
for _, co := range reingestRangeCmdOpts {
if err := co.RequireE(); err != nil {
return err
}
co.SetValue()
}

if len(args) != 0 && len(args) != 2 {
hlog.Errorf("Expected either 0 arguments or 2 but found %v arguments", len(args))
return ErrUsage{cmd}
}

var start, end uint64
var withRange bool
if len(args) == 2 {
var err error
start, err = strconv.ParseUint(args[0], 10, 32)
if err != nil {
cmd.Usage()
return fmt.Errorf(`invalid sequence number "%s"`, args[0])
}
end, err = strconv.ParseUint(args[1], 10, 32)
if err != nil {
cmd.Usage()
return fmt.Errorf(`invalid sequence number "%s"`, args[1])
}
withRange = true
}

err := horizon.ApplyFlags(config, flags, horizon.ApplyOptions{RequireCaptiveCoreConfig: false, AlwaysIngest: true})
if err != nil {
return err
}
var gaps []history.LedgerGap
if withRange {
gaps, err = runDBDetectGapsInRange(*config, uint32(start), uint32(end))
if err != nil {
return err
}
hlog.Infof("found gaps %v within range [%v, %v]", gaps, start, end)
} else {
gaps, err = runDBDetectGaps(*config)
if err != nil {
return err
}
hlog.Infof("found gaps %v", gaps)
}

hlog.Info("Range run successfully!")
return nil
var ledgerRanges [][2]uint32
for _, gap := range gaps {
ledgerRanges = append(ledgerRanges, [2]uint32{gap.StartSequence, gap.EndSequence})
}
return runDBReingestRange(ledgerRanges, reingestForce, parallelWorkers, *config)
},
}

func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uint, config horizon.Config) error {
func runDBReingestRange(ledgerRanges [][2]uint32, reingestForce bool, parallelWorkers uint, config horizon.Config) error {
if reingestForce && parallelWorkers > 1 {
return errors.New("--force is incompatible with --parallel-workers > 1")
}
Expand Down Expand Up @@ -364,8 +410,7 @@ func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uin
}

return system.ReingestRange(
from,
to,
ledgerRanges,
parallelJobSize,
)
}
Expand All @@ -375,11 +420,21 @@ func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uin
return systemErr
}

return system.ReingestRange(
from,
to,
reingestForce,
)
err = system.ReingestRange(ledgerRanges, reingestForce)
if err != nil {
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
It is not possible to run the reingest command on this range in parallel with
Horizon's ingestion system.
Either reduce the range so that it doesn't overlap with Horizon's ingestion system,
or, use the force flag to ensure that Horizon's ingestion system is blocked until
the reingest command completes.`)
}

return err
}
hlog.Info("Range run successfully!")
return nil
}

var dbDetectGapsCmd = &cobra.Command{
Expand Down Expand Up @@ -420,15 +475,29 @@ func runDBDetectGaps(config horizon.Config) ([]history.LedgerGap, error) {
return q.GetLedgerGaps(context.Background())
}

func runDBDetectGapsInRange(config horizon.Config, start, end uint32) ([]history.LedgerGap, error) {
horizonSession, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
return nil, err
}
q := &history.Q{horizonSession}
return q.GetLedgerGapsInRange(context.Background(), start, end)
}

func init() {
for _, co := range reingestRangeCmdOpts {
err := co.Init(dbReingestRangeCmd)
if err != nil {
log.Fatal(err.Error())
}
err = co.Init(dbFillGapsCmd)
if err != nil {
log.Fatal(err.Error())
}
}

viper.BindPFlags(dbReingestRangeCmd.PersistentFlags())
viper.BindPFlags(dbFillGapsCmd.PersistentFlags())

RootCmd.AddCommand(dbCmd)
dbCmd.AddCommand(
Expand All @@ -437,6 +506,7 @@ func init() {
dbReapCmd,
dbReingestCmd,
dbDetectGapsCmd,
dbFillGapsCmd,
)
dbMigrateCmd.AddCommand(
dbMigrateDownCmd,
Expand Down
81 changes: 76 additions & 5 deletions services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,91 @@ func (q *Q) InsertLedger(ctx context.Context,
return result.RowsAffected()
}

// GetLedgerGaps, obtains ingestion gaps in the history_ledgers table.
// GetLedgerGaps obtains ingestion gaps in the history_ledgers table.
// Returns the gaps and error.
func (q *Q) GetLedgerGaps(ctx context.Context) ([]LedgerGap, error) {
var result []LedgerGap
err := q.SelectRaw(ctx, &result, `
var gaps []LedgerGap
query := `
SELECT sequence + 1 AS gap_start,
next_number - 1 AS gap_end
FROM (
SELECT sequence,
LEAD(sequence) OVER (ORDER BY sequence) AS next_number
FROM history_ledgers
) number
WHERE sequence + 1 <> next_number;`)
return result, err
WHERE sequence + 1 <> next_number;`
if err := q.SelectRaw(ctx, &gaps, query); err != nil {
return nil, err
}
return gaps, nil
}

func max(a, b uint32) uint32 {
if a > b {
return a
}
return b
}

func min(a, b uint32) uint32 {
if a > b {
return b
}
return a
}

// GetLedgerGapsInRange obtains ingestion gaps in the history_ledgers table within the given range.
// Returns the gaps and error.
func (q *Q) GetLedgerGapsInRange(ctx context.Context, start, end uint32) ([]LedgerGap, error) {
var result []LedgerGap
var oldestLedger, latestLedger uint32

if err := q.ElderLedger(ctx, &oldestLedger); err != nil {
return nil, errors.Wrap(err, "Could not query elder ledger")
} else if oldestLedger == 0 {
return []LedgerGap{{
StartSequence: start,
EndSequence: end,
}}, nil
}

if err := q.LatestLedger(ctx, &latestLedger); err != nil {
return nil, errors.Wrap(err, "Could not query latest ledger")
}

if start < oldestLedger {
result = append(result, LedgerGap{
StartSequence: start,
EndSequence: min(end, oldestLedger-1),
})
}

gaps, err := q.GetLedgerGaps(ctx)
if err != nil {
return nil, err
}

for _, gap := range gaps {
if gap.EndSequence < start {
continue
}
if gap.StartSequence > end {
break
}
result = append(result, LedgerGap{
StartSequence: max(gap.StartSequence, start),
EndSequence: min(gap.EndSequence, end),
})
}

if latestLedger < end {
result = append(result, LedgerGap{
StartSequence: max(latestLedger+1, start),
EndSequence: end,
})
}

return result, nil
}

func ledgerHeaderToMap(
Expand Down
65 changes: 64 additions & 1 deletion services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func TestGetLedgerGaps(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(gaps, 0)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 1, 100)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 1, EndSequence: 100}}, gaps)

// Lets insert a few gaps and make sure they are detected incrementally
insertLedgerWithSequence(tt, q, 4)
insertLedgerWithSequence(tt, q, 5)
Expand All @@ -246,6 +250,42 @@ func TestGetLedgerGaps(t *testing.T) {
tt.Assert.NoError(err)
tt.Assert.Len(gaps, 0)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 1, 2)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 1, EndSequence: 2}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 1, 3)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 1, EndSequence: 3}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 1, 6)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 1, EndSequence: 3}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 3, 5)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 3, EndSequence: 3}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 4, 6)
tt.Assert.NoError(err)
tt.Assert.Len(gaps, 0)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 4, 8)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 8, EndSequence: 8}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 4, 10)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 8, EndSequence: 10}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 8, 10)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 8, EndSequence: 10}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 9, 11)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 9, EndSequence: 11}}, gaps)

var expectedGaps []LedgerGap

insertLedgerWithSequence(tt, q, 99)
Expand All @@ -258,6 +298,30 @@ func TestGetLedgerGaps(t *testing.T) {
expectedGaps = append(expectedGaps, LedgerGap{8, 98})
tt.Assert.Equal(expectedGaps, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 10, 11)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 10, EndSequence: 11}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 4, 11)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 8, EndSequence: 11}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 1, 11)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 1, EndSequence: 3}, {StartSequence: 8, EndSequence: 11}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 10, 105)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 10, EndSequence: 98}, {StartSequence: 103, EndSequence: 105}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 100, 105)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 103, EndSequence: 105}}, gaps)

gaps, err = q.GetLedgerGapsInRange(context.Background(), 105, 110)
tt.Assert.NoError(err)
tt.Assert.Equal([]LedgerGap{{StartSequence: 105, EndSequence: 110}}, gaps)

// Yet another gap, this time to a single-ledger cluster
insertLedgerWithSequence(tt, q, 1000)

Expand All @@ -272,5 +336,4 @@ func TestGetLedgerGaps(t *testing.T) {
tt.Assert.NoError(err)
expectedGaps = append(expectedGaps, LedgerGap{1001, 1001})
tt.Assert.Equal(expectedGaps, gaps)

}
Loading

0 comments on commit 3c820f7

Please sign in to comment.