Skip to content

Commit

Permalink
services/horizon/internal: Improve horizon history reaper (#5331)
Browse files Browse the repository at this point in the history
Improve horizon history reaper
  • Loading branch information
tamirms authored Jun 24, 2024
1 parent 90c18e2 commit b5c5b62
Show file tree
Hide file tree
Showing 27 changed files with 853 additions and 316 deletions.
30 changes: 21 additions & 9 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"go/types"
"log"
"os"
"os/signal"
"strconv"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stellar/go/services/horizon/internal/db2/history"

horizon "github.com/stellar/go/services/horizon/internal"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/db2/schema"
"github.com/stellar/go/services/horizon/internal/ingest"
support "github.com/stellar/go/support/config"
Expand Down Expand Up @@ -220,17 +221,28 @@ var dbReapCmd = &cobra.Command{
Short: "reaps (i.e. removes) any reapable history data",
Long: "reap removes any historical data that is earlier than the configured retention cutoff",
RunE: func(cmd *cobra.Command, args []string) error {
app, err := horizon.NewAppFromFlags(globalConfig, globalFlags)

err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: false})
if err != nil {
return err
}
defer func() {
app.Shutdown()
app.CloseDB()
}()
ctx := context.Background()
app.UpdateHorizonLedgerState(ctx)
return app.DeleteUnretainedHistory(ctx)

session, err := db.Open("postgres", globalConfig.DatabaseURL)
if err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}
defer session.Close()

reaper := ingest.NewReaper(
ingest.ReapConfig{
RetentionCount: uint32(globalConfig.HistoryRetentionCount),
BatchSize: uint32(globalConfig.HistoryRetentionReapCount),
},
session,
)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
return reaper.DeleteUnretainedHistory(ctx)
},
}

Expand Down
26 changes: 0 additions & 26 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/services/horizon/internal/operationfeestats"
"github.com/stellar/go/services/horizon/internal/paths"
"github.com/stellar/go/services/horizon/internal/reap"
"github.com/stellar/go/services/horizon/internal/txsub"
"github.com/stellar/go/support/app"
"github.com/stellar/go/support/db"
Expand All @@ -47,7 +46,6 @@ type App struct {
submitter *txsub.System
paths paths.Finder
ingester ingest.System
reaper *reap.System
ticks *time.Ticker
ledgerState *ledger.State

Expand Down Expand Up @@ -107,14 +105,6 @@ func (a *App) Serve() error {
}()
}

if a.reaper != nil {
wg.Add(1)
go func() {
a.reaper.Run()
wg.Done()
}()
}

// configure shutdown signal handler
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
Expand Down Expand Up @@ -169,9 +159,6 @@ func (a *App) Shutdown() {
if a.ingester != nil {
a.ingester.Shutdown()
}
if a.reaper != nil {
a.reaper.Shutdown()
}
a.ticks.Stop()
}

Expand Down Expand Up @@ -441,12 +428,6 @@ func (a *App) UpdateStellarCoreInfo(ctx context.Context) error {
return nil
}

// DeleteUnretainedHistory forwards to the app's reaper. See
// `reap.DeleteUnretainedHistory` for details
func (a *App) DeleteUnretainedHistory(ctx context.Context) error {
return a.reaper.DeleteUnretainedHistory(ctx)
}

// Tick triggers horizon to update all of it's background processes such as
// transaction submission, metrics, ingestion and reaping.
func (a *App) Tick(ctx context.Context) error {
Expand Down Expand Up @@ -511,13 +492,6 @@ func (a *App) init() error {
// txsub
initSubmissionSystem(a)

// reaper
a.reaper = reap.New(
a.config.HistoryRetentionCount,
a.config.HistoryRetentionReapCount,
a.HorizonSession(),
a.ledgerState)

// go metrics
initGoMetrics(a)

Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ type Config struct {
// especially if enabling reaping for the first time or in times of
// increased ledger load.
HistoryRetentionReapCount uint
// ReapFrequency configures how often (in units of ledgers) history is reaped.
// If ReapFrequency is set to 1 history is reaped after ingesting every ledger.
// If ReapFrequency is set to 2 history is reaped after ingesting every two ledgers.
// etc...
ReapFrequency uint
// StaleThreshold represents the number of ledgers a history database may be
// out-of-date by before horizon begins to respond with an error to history
// requests.
Expand Down
14 changes: 14 additions & 0 deletions services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package history

import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"sort"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
Expand Down Expand Up @@ -150,6 +152,18 @@ func (i *ledgerBatchInsertBuilder) Exec(ctx context.Context, session db.SessionI
return i.builder.Exec(ctx, session, i.table)
}

func (q *Q) GetNextLedgerSequence(ctx context.Context, start uint32) (uint32, bool, error) {
var value uint32
err := q.GetRaw(ctx, &value, `SELECT sequence FROM history_ledgers WHERE sequence > ?`, start)
if err == sql.ErrNoRows {
return 0, false, nil
}
if err != nil {
return 0, false, err
}
return value, true, nil
}

// GetLedgerGaps obtains ingestion gaps in the history_ledgers table.
// Returns the gaps and error.
func (q *Q) GetLedgerGaps(ctx context.Context) ([]LedgerRange, error) {
Expand Down
51 changes: 51 additions & 0 deletions services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/guregu/null"

"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -337,3 +338,53 @@ func TestGetLedgerGaps(t *testing.T) {
expectedGaps = append(expectedGaps, LedgerRange{1001, 1001})
tt.Assert.Equal(expectedGaps, gaps)
}

func TestGetNextLedgerSequence(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)

q := &Q{tt.HorizonSession()}

_, ok, err := q.GetNextLedgerSequence(context.Background(), 0)
tt.Assert.NoError(err)
tt.Assert.False(ok)

insertLedgerWithSequence(tt, q, 4)
insertLedgerWithSequence(tt, q, 5)
insertLedgerWithSequence(tt, q, 6)
insertLedgerWithSequence(tt, q, 7)

insertLedgerWithSequence(tt, q, 99)
insertLedgerWithSequence(tt, q, 100)
insertLedgerWithSequence(tt, q, 101)
insertLedgerWithSequence(tt, q, 102)

seq, ok, err := q.GetNextLedgerSequence(context.Background(), 0)
tt.Assert.NoError(err)
tt.Assert.True(ok)
tt.Assert.Equal(uint32(4), seq)

seq, ok, err = q.GetNextLedgerSequence(context.Background(), 4)
tt.Assert.NoError(err)
tt.Assert.True(ok)
tt.Assert.Equal(uint32(5), seq)

seq, ok, err = q.GetNextLedgerSequence(context.Background(), 10)
tt.Assert.NoError(err)
tt.Assert.True(ok)
tt.Assert.Equal(uint32(99), seq)

seq, ok, err = q.GetNextLedgerSequence(context.Background(), 101)
tt.Assert.NoError(err)
tt.Assert.True(ok)
tt.Assert.Equal(uint32(102), seq)

_, ok, err = q.GetNextLedgerSequence(context.Background(), 102)
tt.Assert.NoError(err)
tt.Assert.False(ok)

_, ok, err = q.GetNextLedgerSequence(context.Background(), 110)
tt.Assert.NoError(err)
tt.Assert.False(ok)
}
17 changes: 11 additions & 6 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,12 @@ type IngestionQ interface {
GetOfferCompactionSequence(context.Context) (uint32, error)
GetLiquidityPoolCompactionSequence(context.Context) (uint32, error)
TruncateIngestStateTables(context.Context) error
DeleteRangeAll(ctx context.Context, start, end int64) error
DeleteRangeAll(ctx context.Context, start, end int64) (int64, error)
DeleteTransactionsFilteredTmpOlderThan(ctx context.Context, howOldInSeconds uint64) (int64, error)
TryStateVerificationLock(ctx context.Context) (bool, error)
GetNextLedgerSequence(context.Context, uint32) (uint32, bool, error)
TryStateVerificationLock(context.Context) (bool, error)
TryReaperLock(context.Context) (bool, error)
ElderLedger(context.Context, interface{}) error
}

// QAccounts defines account related queries.
Expand Down Expand Up @@ -1154,7 +1157,8 @@ func constructReapLookupTablesQuery(table string, historyTables []tableObjectFie

// DeleteRangeAll deletes a range of rows from all history tables between
// `start` and `end` (exclusive).
func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) (int64, error) {
var total int64
for table, column := range map[string]string{
"history_effects": "history_operation_id",
"history_ledgers": "id",
Expand All @@ -1169,12 +1173,13 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
"history_transaction_liquidity_pools": "history_transaction_id",
"history_transactions": "id",
} {
err := q.DeleteRange(ctx, start, end, table, column)
count, err := q.DeleteRange(ctx, start, end, table, column)
if err != nil {
return errors.Wrapf(err, "Error clearing %s", table)
return 0, errors.Wrapf(err, "Error clearing %s", table)
}
total += count
}
return nil
return total, nil
}

// upsertRows builds and executes an upsert query that allows very fast upserts
Expand Down
20 changes: 10 additions & 10 deletions services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@ import (
"testing"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/services/horizon/internal/reap"
"github.com/stellar/go/services/horizon/internal/ingest"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestReapLookupTables(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
ledgerState := &ledger.State{}
ledgerState.SetStatus(tt.Scenario("kahuna"))
tt.Scenario("kahuna")

db := tt.HorizonSession()

sys := reap.New(0, 0, db, ledgerState)
reaper := ingest.NewReaper(
ingest.ReapConfig{
RetentionCount: 1,
BatchSize: 50,
},
db,
)

var (
prevLedgers, curLedgers int
Expand All @@ -41,10 +44,7 @@ func TestReapLookupTables(t *testing.T) {
tt.Require.NoError(err)
}

ledgerState.SetStatus(tt.LoadLedgerStatus())
sys.RetentionCount = 1
sys.RetentionBatch = 50
err := sys.DeleteUnretainedHistory(tt.Ctx)
err := reaper.DeleteUnretainedHistory(tt.Ctx)
tt.Require.NoError(err)

q := &history.Q{tt.HorizonSession()}
Expand Down
28 changes: 23 additions & 5 deletions services/horizon/internal/db2/history/verify_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,34 @@ import (
"github.com/stellar/go/support/errors"
)

// stateVerificationLockId is the objid for the advisory lock acquired during
// state verification. The value is arbitrary. The only requirement is that
// all ingesting nodes use the same value which is why it's hard coded here.
const stateVerificationLockId = 73897213
const (
// stateVerificationLockId is the objid for the advisory lock acquired during
// state verification. The value is arbitrary. The only requirement is that
// all ingesting nodes use the same value which is why it's hard coded here.`1
stateVerificationLockId = 73897213
// reaperLockId is the objid for the advisory lock acquired during
// reaping. The value is arbitrary. The only requirement is that
// all ingesting nodes use the same value which is why it's hard coded here.
reaperLockId = 944670730
)

// TryStateVerificationLock attempts to acquire the state verification lock
// which gives the ingesting node exclusive access to perform state verification.
// TryStateVerificationLock returns true if the lock was acquired or false if the
// lock could not be acquired because it is held by another node.
func (q *Q) TryStateVerificationLock(ctx context.Context) (bool, error) {
return q.tryAdvisoryLock(ctx, stateVerificationLockId)
}

// TryReaperLock attempts to acquire the reaper lock
// which gives the ingesting node exclusive access to perform reaping.
// TryReaperLock returns true if the lock was acquired or false if the
// lock could not be acquired because it is held by another node.
func (q *Q) TryReaperLock(ctx context.Context) (bool, error) {
return q.tryAdvisoryLock(ctx, reaperLockId)
}

func (q *Q) tryAdvisoryLock(ctx context.Context, lockId int) (bool, error) {
if tx := q.GetTx(); tx == nil {
return false, errors.New("cannot be called outside of a transaction")
}
Expand All @@ -26,7 +44,7 @@ func (q *Q) TryStateVerificationLock(ctx context.Context) (bool, error) {
context.WithValue(ctx, &db.QueryTypeContextKey, db.AdvisoryLockQueryType),
&acquired,
"SELECT pg_try_advisory_xact_lock(?)",
stateVerificationLockId,
lockId,
)
if err != nil {
return false, errors.Wrap(err, "error acquiring advisory lock for state verification")
Expand Down
18 changes: 18 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,24 @@ func Flags() (*Config, support.ConfigOptions) {
return nil
},
},
&support.ConfigOption{
Name: "reap-frequency",
ConfigKey: &config.ReapFrequency,
OptType: types.Uint,
FlagDefault: uint(720),
Usage: "the frequency in units of ledgers for how often history is reaped. " +
"A value of 1 implies history is trimmed after every ledger. " +
"A value of 2 implies history is trimmed on every second ledger.",
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
val := viper.GetUint(opt.Name)
if val <= 0 {
return fmt.Errorf("flag --reap-frequency must be positive")
}
*(opt.ConfigKey.(*uint)) = val
return nil
},
},
&support.ConfigOption{
Name: "history-stale-threshold",
ConfigKey: &config.StaleThreshold,
Expand Down
Loading

0 comments on commit b5c5b62

Please sign in to comment.