Skip to content

Commit

Permalink
services/horizon: Make reaping batch sizes configurable via `--histor…
Browse files Browse the repository at this point in the history
…y-retention-reap-count`. (stellar#5272)

* Make reaper batch size configurable
* Add --history-retention-reap-count and change default to 50k
  • Loading branch information
Shaptic authored Apr 17, 2024
1 parent fd10794 commit 36d7a6c
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 17 deletions.
6 changes: 5 additions & 1 deletion services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,11 @@ func (a *App) init() error {
initSubmissionSystem(a)

// reaper
a.reaper = reap.New(a.config.HistoryRetentionCount, a.HorizonSession(), a.ledgerState)
a.reaper = reap.New(
a.config.HistoryRetentionCount,
a.config.HistoryRetentionReapCount,
a.HorizonSession(),
a.ledgerState)

// go metrics
initGoMetrics(a)
Expand Down
6 changes: 6 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ type Config struct {
// determining a "retention duration", each ledger roughly corresponds to 10
// seconds of real time.
HistoryRetentionCount uint
// HistoryRetentionReapCount is the number of ledgers worth of history data
// to remove per second from the Horizon database. It is intended to allow
// control over the amount of CPU and database load caused by reaping,
// especially if enabling reaping for the first time or in times of
// increased ledger load.
HistoryRetentionReapCount uint
// StaleThreshold represents the number of ledgers a history database may be
// out-of-date by before horizon begins to respond with an error to history
// requests.
Expand Down
3 changes: 2 additions & 1 deletion services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestReapLookupTables(t *testing.T) {

db := tt.HorizonSession()

sys := reap.New(0, db, ledgerState)
sys := reap.New(0, 0, db, ledgerState)

var (
prevLedgers, curLedgers int
Expand All @@ -43,6 +43,7 @@ func TestReapLookupTables(t *testing.T) {

ledgerState.SetStatus(tt.LoadLedgerStatus())
sys.RetentionCount = 1
sys.RetentionBatch = 50
err := sys.DeleteUnretainedHistory(tt.Ctx)
tt.Require.NoError(err)

Expand Down
16 changes: 15 additions & 1 deletion services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,23 @@ func Flags() (*Config, support.ConfigOptions) {
ConfigKey: &config.HistoryRetentionCount,
OptType: types.Uint,
FlagDefault: uint(0),
Usage: "the minimum number of ledgers to maintain within horizon's history tables. 0 signifies an unlimited number of ledgers will be retained",
Usage: "the minimum number of ledgers to maintain within Horizon's history tables (0 = retain an unlimited number of ledgers)",
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: "history-retention-reap-count",
ConfigKey: &config.HistoryRetentionReapCount,
OptType: types.Uint,
FlagDefault: uint(50_000),
Usage: "the batch size (in ledgers) to remove per reap from the Horizon database",
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetUint(opt.Name); val <= 0 || val > 500_000 {
return fmt.Errorf("flag --history-retention-reap-count must be in range [1, 500,000]")
}
return nil
},
},
&support.ConfigOption{
Name: "history-stale-threshold",
ConfigKey: &config.StaleThreshold,
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/httpt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (ht *HTTPT) Post(
// setting the retention count to the provided number.
func (ht *HTTPT) ReapHistory(retention uint) {
ht.App.reaper.RetentionCount = retention
ht.App.reaper.RetentionBatch = 50_000
ht.App.reaper.HistoryQ = &history.Q{ht.HorizonSession()}
err := ht.App.DeleteUnretainedHistory(context.Background())
ht.Require.NoError(err)
Expand Down
11 changes: 7 additions & 4 deletions services/horizon/internal/reap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@ import (
type System struct {
HistoryQ *history.Q
RetentionCount uint
ledgerState *ledger.State
ctx context.Context
cancel context.CancelFunc
RetentionBatch uint

ledgerState *ledger.State
ctx context.Context
cancel context.CancelFunc
}

// New initializes the reaper, causing it to begin polling the stellar-core
// database for now ledgers and ingesting data into the horizon database.
func New(retention uint, dbSession db.SessionInterface, ledgerState *ledger.State) *System {
func New(retention, retentionBatchSize uint, dbSession db.SessionInterface, ledgerState *ledger.State) *System {
ctx, cancel := context.WithCancel(context.Background())

r := &System{
HistoryQ: &history.Q{dbSession.Clone()},
RetentionCount: retention,
RetentionBatch: retentionBatchSize,
ledgerState: ledgerState,
ctx: ctx,
cancel: cancel,
Expand Down
25 changes: 16 additions & 9 deletions services/horizon/internal/reap/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reap

import (
"context"
"fmt"
"time"

herrors "github.com/stellar/go/services/horizon/internal/errors"
Expand Down Expand Up @@ -70,25 +71,31 @@ func (r *System) runOnce(ctx context.Context) {
}
}

// Work backwards in 100k ledger blocks to prevent using all the CPU.
// Work backwards in 50k (by default, otherwise configurable via the CLI) ledger
// blocks to prevent using all the CPU.
//
// This runs every hour, so we need to make sure it doesn't
// run for longer than an hour.
// This runs every hour, so we need to make sure it doesn't run for longer than
// an hour.
//
// Current ledger at 2021-08-12 is 36,827,497, so 100k means 368 batches. At 1
// batch/second, that seems like a reasonable balance between running well
// under an hour, and slowing it down enough to leave some CPU for other
// processes.
var batchSize = int32(100_000)
// Current ledger at 2024-04-04s is 51,092,283, so 50k means 1021 batches. At 1
// batch/second, that seems like a reasonable balance between running under an
// hour, and slowing it down enough to leave some CPU for other processes.
var sleep = 1 * time.Second

func (r *System) clearBefore(ctx context.Context, startSeq, endSeq int32) error {
batchSize := int32(r.RetentionBatch)
if batchSize <= 0 {
return fmt.Errorf("invalid batch size for reaping (%d)", batchSize)
}

for batchEndSeq := endSeq - 1; batchEndSeq >= startSeq; batchEndSeq -= batchSize {
batchStartSeq := batchEndSeq - batchSize
if batchStartSeq < startSeq {
batchStartSeq = startSeq
}
log.WithField("start_ledger", batchStartSeq).WithField("end_ledger", batchEndSeq).Info("reaper: clearing")
log.WithField("start_ledger", batchStartSeq).
WithField("end_ledger", batchEndSeq).
Info("reaper: clearing")

batchStart, batchEnd, err := toid.LedgerRangeInclusive(batchStartSeq, batchEndSeq)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/reap/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestDeleteUnretainedHistory(t *testing.T) {

db := tt.HorizonSession()

sys := New(0, db, ledgerState)
sys := New(0, 50, db, ledgerState)

// Disable sleeps for this.
sleep = 0
Expand Down

0 comments on commit 36d7a6c

Please sign in to comment.