From 7f05c44ed136ddda765097d5c575f830a1c3994a Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 8 Jun 2021 12:47:08 +0200 Subject: [PATCH 1/6] Intial implementation of the `db detect-gaps` command --- services/horizon/cmd/db.go | 62 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 7cc5b3d1c9..49ef70d522 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -295,6 +295,67 @@ func runDBReingestRange(from, to uint32, reingestForce bool, parallelWorkers uin ) } +var dbDetectGapsCmd = &cobra.Command{ + Use: "detect-gaps", + Short: "detects ingestion gaps in Horizon's database", + Long: "detects ingestion gaps in Horizon's database and prints a list of reingest commands needed to fill the gaps", + Run: func(cmd *cobra.Command, args []string) { + requireAndSetFlag(horizon.DatabaseURLFlagName) + if len(args) != 0 { + cmd.Usage() + os.Exit(1) + } + gaps, err := runDBDetectGaps(*config) + if err != nil { + log.Fatal(err) + } + if len(gaps) == 0 { + hlog.Info("No gaps found") + return + } + cmdname := os.Args[0] + for _, g := range gaps { + fmt.Printf("%s db reingest %d %d\n", cmdname, g.GapStartSequence, g.GapEndSequence) + } + }, +} + +type gap struct { + GapStartSequence uint32 + GapEndSequence uint32 +} + +func runDBDetectGaps(config horizon.Config) ([]gap, error) { + fmt.Println(config.DatabaseURL) + db, err := sql.Open("postgres", config.DatabaseURL) + if err != nil { + return nil, err + } + query := `SELECT sequence + 1 AS gap_start, + next_number - 1 AS gap_end + FROM ( + SELECT sequence, + LEAD(sequence) OVER (ORDER BY sequence) AS next_number + FROM history_ledgers + ) number + WHERE sequence + 1 <> next_number;` + var result []gap + rows, err := db.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var g gap + err := rows.Scan(&g.GapStartSequence, &g.GapEndSequence) + if err != nil { + return nil, err + } + result = append(result, g) + } + return result, nil +} + func init() { for _, co := range reingestRangeCmdOpts { err := co.Init(dbReingestRangeCmd) @@ -311,6 +372,7 @@ func init() { dbMigrateCmd, dbReapCmd, dbReingestCmd, + dbDetectGapsCmd, ) dbReingestCmd.AddCommand(dbReingestRangeCmd) } From 39a5787c30c098432b5464ff68a48de7cc19b50e Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 8 Jun 2021 13:09:58 +0200 Subject: [PATCH 2/6] Move db querying to the history package --- services/horizon/cmd/db.go | 37 +++---------------- .../horizon/internal/db2/history/ledger.go | 16 ++++++++ services/horizon/internal/db2/history/main.go | 5 +++ 3 files changed, 27 insertions(+), 31 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 49ef70d522..cbb3f1d831 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -11,6 +11,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/stellar/go/services/horizon/internal/db2/history" horizon "github.com/stellar/go/services/horizon/internal" "github.com/stellar/go/services/horizon/internal/db2/schema" @@ -315,45 +316,19 @@ var dbDetectGapsCmd = &cobra.Command{ } cmdname := os.Args[0] for _, g := range gaps { - fmt.Printf("%s db reingest %d %d\n", cmdname, g.GapStartSequence, g.GapEndSequence) + fmt.Printf("%s db reingest %d %d\n", cmdname, g.StartSequence, g.EndSequence) } }, } -type gap struct { - GapStartSequence uint32 - GapEndSequence uint32 -} - -func runDBDetectGaps(config horizon.Config) ([]gap, error) { +func runDBDetectGaps(config horizon.Config) ([]history.LedgerGap, error) { fmt.Println(config.DatabaseURL) - db, err := sql.Open("postgres", config.DatabaseURL) - if err != nil { - return nil, err - } - query := `SELECT sequence + 1 AS gap_start, - next_number - 1 AS gap_end - FROM ( - SELECT sequence, - LEAD(sequence) OVER (ORDER BY sequence) AS next_number - FROM history_ledgers - ) number - WHERE sequence + 1 <> next_number;` - var result []gap - rows, err := db.Query(query) + horizonSession, err := db.Open("postgres", config.DatabaseURL) if err != nil { return nil, err } - defer rows.Close() - for rows.Next() { - var g gap - err := rows.Scan(&g.GapStartSequence, &g.GapEndSequence) - if err != nil { - return nil, err - } - result = append(result, g) - } - return result, nil + q := &history.Q{horizonSession} + return q.GetLedgerGaps(context.Background()) } func init() { diff --git a/services/horizon/internal/db2/history/ledger.go b/services/horizon/internal/db2/history/ledger.go index 927295bd0d..b405f32623 100644 --- a/services/horizon/internal/db2/history/ledger.go +++ b/services/horizon/internal/db2/history/ledger.go @@ -130,6 +130,22 @@ func (q *Q) InsertLedger(ctx context.Context, return result.RowsAffected() } +// GetLedgerGaps, obtains ingestion gaps in the history_ledgers table. +// Returns the gaps and error. +func (q *Q) GetLedgerGaps(ctx context.Context) ([]LedgerGap, error) { + var result []LedgerGap + err := q.SelectRaw(ctx, &result, ` + SELECT sequence + 1 AS gap_start, + next_number - 1 AS gap_end + FROM ( + SELECT sequence, + LEAD(sequence) OVER (ORDER BY sequence) AS next_number + FROM history_ledgers + ) number + WHERE sequence + 1 <> next_number;`) + return result, err +} + func ledgerHeaderToMap( ledger xdr.LedgerHeaderHistoryEntry, successTxsCount int, diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 6dc183d232..dff3e6d397 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -537,6 +537,11 @@ type LedgerCache struct { queued map[int32]struct{} } +type LedgerGap struct { + StartSequence uint32 `db:"gap_start"` + EndSequence uint32 `db:"gap_end"` +} + // LedgersQ is a helper struct to aid in configuring queries that loads // slices of Ledger structs. type LedgersQ struct { From 5afc2700828830536781d769c8ecd3796539b93c Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 8 Jun 2021 13:46:13 +0200 Subject: [PATCH 3/6] Add test --- .../internal/db2/history/ledger_test.go | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index 6101b99bd5..c0beb391d3 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -1,8 +1,11 @@ package history import ( + "context" "database/sql" "encoding/hex" + "fmt" + "math/rand" "testing" "time" @@ -148,3 +151,126 @@ func TestInsertLedger(t *testing.T) { tt.Assert.True(exists) tt.Assert.Equal(expectedLedger.LedgerHash, hash) } + +func insertLedgerWithSequence(tt *test.T, q *Q, seq uint32) { + // generate random hashes to avoid insert clashes due to UNIQUE constraints + var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) + ledgerHashHex := fmt.Sprintf("%064x", rnd.Uint32()) + previousLedgerHashHex := fmt.Sprintf("%064x", rnd.Uint32()) + + expectedLedger := Ledger{ + Sequence: int32(seq), + LedgerHash: ledgerHashHex, + PreviousLedgerHash: null.NewString(previousLedgerHashHex, true), + TotalOrderID: TotalOrderID{toid.New(int32(69859), 0, 0).ToInt64()}, + ImporterVersion: 123, + TransactionCount: 12, + SuccessfulTransactionCount: new(int32), + FailedTransactionCount: new(int32), + TxSetOperationCount: new(int32), + OperationCount: 23, + TotalCoins: 23451, + FeePool: 213, + BaseReserve: 687, + MaxTxSetSize: 345, + ProtocolVersion: 12, + BaseFee: 100, + ClosedAt: time.Now().UTC().Truncate(time.Second), + } + *expectedLedger.SuccessfulTransactionCount = 12 + *expectedLedger.FailedTransactionCount = 3 + *expectedLedger.TxSetOperationCount = 26 + + var ledgerHash, previousLedgerHash xdr.Hash + + written, err := hex.Decode(ledgerHash[:], []byte(expectedLedger.LedgerHash)) + tt.Assert.NoError(err) + tt.Assert.Equal(len(ledgerHash), written) + + written, err = hex.Decode(previousLedgerHash[:], []byte(expectedLedger.PreviousLedgerHash.String)) + tt.Assert.NoError(err) + tt.Assert.Equal(len(previousLedgerHash), written) + + ledgerEntry := xdr.LedgerHeaderHistoryEntry{ + Hash: ledgerHash, + Header: xdr.LedgerHeader{ + LedgerVersion: 12, + PreviousLedgerHash: previousLedgerHash, + LedgerSeq: xdr.Uint32(expectedLedger.Sequence), + TotalCoins: xdr.Int64(expectedLedger.TotalCoins), + FeePool: xdr.Int64(expectedLedger.FeePool), + BaseFee: xdr.Uint32(expectedLedger.BaseFee), + BaseReserve: xdr.Uint32(expectedLedger.BaseReserve), + MaxTxSetSize: xdr.Uint32(expectedLedger.MaxTxSetSize), + ScpValue: xdr.StellarValue{ + CloseTime: xdr.TimePoint(expectedLedger.ClosedAt.Unix()), + }, + }, + } + ledgerHeaderBase64, err := xdr.MarshalBase64(ledgerEntry.Header) + tt.Assert.NoError(err) + expectedLedger.LedgerHeaderXDR = null.NewString(ledgerHeaderBase64, true) + rowsAffected, err := q.InsertLedger(tt.Ctx, + ledgerEntry, + 12, + 3, + 23, + 26, + int(expectedLedger.ImporterVersion), + ) + tt.Assert.NoError(err) + tt.Assert.Equal(rowsAffected, int64(1)) +} + +func TestGetLedgerGaps(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + // The DB is empty, so there shouldn't be any gaps + gaps, err := q.GetLedgerGaps(context.Background()) + tt.Assert.NoError(err) + tt.Assert.Len(gaps, 0) + + // Lets insert a few gaps and make sure they are detected incrementally + insertLedgerWithSequence(tt, q, 4) + insertLedgerWithSequence(tt, q, 5) + insertLedgerWithSequence(tt, q, 6) + insertLedgerWithSequence(tt, q, 7) + + // since there is a single ledger cluster, there should still be no gaps + // (we don't start from ledger 0) + gaps, err = q.GetLedgerGaps(context.Background()) + tt.Assert.NoError(err) + tt.Assert.Len(gaps, 0) + + var expectedGaps []LedgerGap + + insertLedgerWithSequence(tt, q, 99) + insertLedgerWithSequence(tt, q, 100) + insertLedgerWithSequence(tt, q, 101) + insertLedgerWithSequence(tt, q, 102) + + gaps, err = q.GetLedgerGaps(context.Background()) + tt.Assert.NoError(err) + expectedGaps = append(expectedGaps, LedgerGap{8, 98}) + tt.Assert.Equal(expectedGaps, gaps) + + // Yet another gap, this time to a single-ledger cluster + insertLedgerWithSequence(tt, q, 1000) + + gaps, err = q.GetLedgerGaps(context.Background()) + tt.Assert.NoError(err) + expectedGaps = append(expectedGaps, LedgerGap{103, 999}) + tt.Assert.Equal(expectedGaps, gaps) + + // Yet another gap, this time the gap only contains a ledger + insertLedgerWithSequence(tt, q, 1002) + gaps, err = q.GetLedgerGaps(context.Background()) + tt.Assert.NoError(err) + expectedGaps = append(expectedGaps, LedgerGap{1001, 1001}) + tt.Assert.Equal(expectedGaps, gaps) + +} From f29449d3e59d5ed72db4ca3dca65212634812f56 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 8 Jun 2021 13:49:40 +0200 Subject: [PATCH 4/6] Refine printed message --- services/horizon/cmd/db.go | 1 + 1 file changed, 1 insertion(+) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index cbb3f1d831..f554dff2a3 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -314,6 +314,7 @@ var dbDetectGapsCmd = &cobra.Command{ hlog.Info("No gaps found") return } + fmt.Println("Horizon commands to run in order to fill in the gaps:") cmdname := os.Args[0] for _, g := range gaps { fmt.Printf("%s db reingest %d %d\n", cmdname, g.StartSequence, g.EndSequence) From 315e77644ef32e1f692c2489c07b76bbdad44a7b Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 8 Jun 2021 13:51:17 +0200 Subject: [PATCH 5/6] Add CHANGELOG entry --- services/horizon/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 9724e52dc0..0157ab3650 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -9,6 +9,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). * Fix bug in `horizon db reingest range` command, which would throw a duplicate entry conflict error from the DB. ([3661](https://github.com/stellar/go/pull/3661)). * Fix bug in DB metrics preventing Horizon from starting when read-only replica middleware is enabled. ([3668](https://github.com/stellar/go/pull/3668)). +* Add new command `horizon db detect-gaps`, which detects ingestion gaps in the database. The command prints out the `db reingest` commands to run in order to fill the gaps found. ## v2.4.0 From 19c347300d10475555d11627c2b85e3569b91591 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 8 Jun 2021 17:13:06 +0200 Subject: [PATCH 6/6] Update services/horizon/cmd/db.go Co-authored-by: Bartek Nowotarski --- services/horizon/cmd/db.go | 1 - 1 file changed, 1 deletion(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index f554dff2a3..3d3f1dbdb3 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -323,7 +323,6 @@ var dbDetectGapsCmd = &cobra.Command{ } func runDBDetectGaps(config horizon.Config) ([]history.LedgerGap, error) { - fmt.Println(config.DatabaseURL) horizonSession, err := db.Open("postgres", config.DatabaseURL) if err != nil { return nil, err