Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Add horizon db detect-gaps command #3672

Merged
merged 6 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Fix bug in `horizon db reingest range` command, which would throw a duplicate entry conflict error from the DB. ([3661](https://github.com/stellar/go/pull/3661)).
* Fix bug in DB metrics preventing Horizon from starting when read-only replica middleware is enabled. ([3668](https://github.com/stellar/go/pull/3668)).
* Add new command `horizon db detect-gaps`, which detects ingestion gaps in the database. The command prints out the `db reingest` commands to run in order to fill the gaps found.

## v2.4.0

Expand Down
38 changes: 38 additions & 0 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stellar/go/services/horizon/internal/db2/history"

horizon "github.com/stellar/go/services/horizon/internal"
"github.com/stellar/go/services/horizon/internal/db2/schema"
Expand Down Expand Up @@ -295,6 +296,42 @@ func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uin
)
}

var dbDetectGapsCmd = &cobra.Command{
Use: "detect-gaps",
Short: "detects ingestion gaps in Horizon's database",
Long: "detects ingestion gaps in Horizon's database and prints a list of reingest commands needed to fill the gaps",
Run: func(cmd *cobra.Command, args []string) {
requireAndSetFlag(horizon.DatabaseURLFlagName)
if len(args) != 0 {
cmd.Usage()
os.Exit(1)
}
gaps, err := runDBDetectGaps(*config)
if err != nil {
log.Fatal(err)
}
if len(gaps) == 0 {
hlog.Info("No gaps found")
return
}
fmt.Println("Horizon commands to run in order to fill in the gaps:")
cmdname := os.Args[0]
for _, g := range gaps {
fmt.Printf("%s db reingest %d %d\n", cmdname, g.StartSequence, g.EndSequence)
}
},
}

func runDBDetectGaps(config horizon.Config) ([]history.LedgerGap, error) {
fmt.Println(config.DatabaseURL)
2opremio marked this conversation as resolved.
Show resolved Hide resolved
horizonSession, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
return nil, err
}
q := &history.Q{horizonSession}
return q.GetLedgerGaps(context.Background())
}

func init() {
for _, co := range reingestRangeCmdOpts {
err := co.Init(dbReingestRangeCmd)
Expand All @@ -311,6 +348,7 @@ func init() {
dbMigrateCmd,
dbReapCmd,
dbReingestCmd,
dbDetectGapsCmd,
)
dbReingestCmd.AddCommand(dbReingestRangeCmd)
}
16 changes: 16 additions & 0 deletions services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ func (q *Q) InsertLedger(ctx context.Context,
return result.RowsAffected()
}

// GetLedgerGaps, obtains ingestion gaps in the history_ledgers table.
// Returns the gaps and error.
func (q *Q) GetLedgerGaps(ctx context.Context) ([]LedgerGap, error) {
var result []LedgerGap
err := q.SelectRaw(ctx, &result, `
SELECT sequence + 1 AS gap_start,
next_number - 1 AS gap_end
FROM (
SELECT sequence,
LEAD(sequence) OVER (ORDER BY sequence) AS next_number
FROM history_ledgers
) number
WHERE sequence + 1 <> next_number;`)
return result, err
}

func ledgerHeaderToMap(
ledger xdr.LedgerHeaderHistoryEntry,
successTxsCount int,
Expand Down
126 changes: 126 additions & 0 deletions services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package history

import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -148,3 +151,126 @@ func TestInsertLedger(t *testing.T) {
tt.Assert.True(exists)
tt.Assert.Equal(expectedLedger.LedgerHash, hash)
}

func insertLedgerWithSequence(tt *test.T, q *Q, seq uint32) {
// generate random hashes to avoid insert clashes due to UNIQUE constraints
var rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
ledgerHashHex := fmt.Sprintf("%064x", rnd.Uint32())
previousLedgerHashHex := fmt.Sprintf("%064x", rnd.Uint32())

expectedLedger := Ledger{
Sequence: int32(seq),
LedgerHash: ledgerHashHex,
PreviousLedgerHash: null.NewString(previousLedgerHashHex, true),
TotalOrderID: TotalOrderID{toid.New(int32(69859), 0, 0).ToInt64()},
ImporterVersion: 123,
TransactionCount: 12,
SuccessfulTransactionCount: new(int32),
FailedTransactionCount: new(int32),
TxSetOperationCount: new(int32),
OperationCount: 23,
TotalCoins: 23451,
FeePool: 213,
BaseReserve: 687,
MaxTxSetSize: 345,
ProtocolVersion: 12,
BaseFee: 100,
ClosedAt: time.Now().UTC().Truncate(time.Second),
}
*expectedLedger.SuccessfulTransactionCount = 12
*expectedLedger.FailedTransactionCount = 3
*expectedLedger.TxSetOperationCount = 26

var ledgerHash, previousLedgerHash xdr.Hash

written, err := hex.Decode(ledgerHash[:], []byte(expectedLedger.LedgerHash))
tt.Assert.NoError(err)
tt.Assert.Equal(len(ledgerHash), written)

written, err = hex.Decode(previousLedgerHash[:], []byte(expectedLedger.PreviousLedgerHash.String))
tt.Assert.NoError(err)
tt.Assert.Equal(len(previousLedgerHash), written)

ledgerEntry := xdr.LedgerHeaderHistoryEntry{
Hash: ledgerHash,
Header: xdr.LedgerHeader{
LedgerVersion: 12,
PreviousLedgerHash: previousLedgerHash,
LedgerSeq: xdr.Uint32(expectedLedger.Sequence),
TotalCoins: xdr.Int64(expectedLedger.TotalCoins),
FeePool: xdr.Int64(expectedLedger.FeePool),
BaseFee: xdr.Uint32(expectedLedger.BaseFee),
BaseReserve: xdr.Uint32(expectedLedger.BaseReserve),
MaxTxSetSize: xdr.Uint32(expectedLedger.MaxTxSetSize),
ScpValue: xdr.StellarValue{
CloseTime: xdr.TimePoint(expectedLedger.ClosedAt.Unix()),
},
},
}
ledgerHeaderBase64, err := xdr.MarshalBase64(ledgerEntry.Header)
tt.Assert.NoError(err)
expectedLedger.LedgerHeaderXDR = null.NewString(ledgerHeaderBase64, true)
rowsAffected, err := q.InsertLedger(tt.Ctx,
ledgerEntry,
12,
3,
23,
26,
int(expectedLedger.ImporterVersion),
)
tt.Assert.NoError(err)
tt.Assert.Equal(rowsAffected, int64(1))
}

func TestGetLedgerGaps(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)

q := &Q{tt.HorizonSession()}

// The DB is empty, so there shouldn't be any gaps
gaps, err := q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
tt.Assert.Len(gaps, 0)

// Lets insert a few gaps and make sure they are detected incrementally
insertLedgerWithSequence(tt, q, 4)
insertLedgerWithSequence(tt, q, 5)
insertLedgerWithSequence(tt, q, 6)
insertLedgerWithSequence(tt, q, 7)

// since there is a single ledger cluster, there should still be no gaps
// (we don't start from ledger 0)
gaps, err = q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
tt.Assert.Len(gaps, 0)

var expectedGaps []LedgerGap

insertLedgerWithSequence(tt, q, 99)
insertLedgerWithSequence(tt, q, 100)
insertLedgerWithSequence(tt, q, 101)
insertLedgerWithSequence(tt, q, 102)

gaps, err = q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
expectedGaps = append(expectedGaps, LedgerGap{8, 98})
tt.Assert.Equal(expectedGaps, gaps)

// Yet another gap, this time to a single-ledger cluster
insertLedgerWithSequence(tt, q, 1000)

gaps, err = q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
expectedGaps = append(expectedGaps, LedgerGap{103, 999})
tt.Assert.Equal(expectedGaps, gaps)

// Yet another gap, this time the gap only contains a ledger
insertLedgerWithSequence(tt, q, 1002)
gaps, err = q.GetLedgerGaps(context.Background())
tt.Assert.NoError(err)
expectedGaps = append(expectedGaps, LedgerGap{1001, 1001})
tt.Assert.Equal(expectedGaps, gaps)

}
5 changes: 5 additions & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,11 @@ type LedgerCache struct {
queued map[int32]struct{}
}

type LedgerGap struct {
StartSequence uint32 `db:"gap_start"`
EndSequence uint32 `db:"gap_end"`
}

// LedgersQ is a helper struct to aid in configuring queries that loads
// slices of Ledger structs.
type LedgersQ struct {
Expand Down