diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 8c7df9552c..4283156ecc 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -12,6 +12,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). * Fix bug in horizon reap system (used by `horizon db reap` command and when horizon is configured with `--history-retention-count`) which could lead to partial deletions. ([3754](https://github.com/stellar/go/pull/3754)) * Log debug messages from captive core at the appropriate log level. ([3746](https://github.com/stellar/go/pull/3746)) * Add a feature flag `--captive-core-reuse-storage-path`/`CAPTIVE_CORE_REUSE_STORAGE_PATH` that will reuse Captive Core's storage path for bucket files when applicable for better performance. ([3750](https://github.com/stellar/go/pull/3750)) +* Limit reap to 100k ledgers/second, to prevent excess CPU usage ([3823](https://github.com/stellar/go/pull/3823)). ## v2.6.1 diff --git a/services/horizon/internal/reap/system.go b/services/horizon/internal/reap/system.go index 915aa5dee8..02a10945a2 100644 --- a/services/horizon/internal/reap/system.go +++ b/services/horizon/internal/reap/system.go @@ -26,7 +26,7 @@ func (r *System) DeleteUnretainedHistory(ctx context.Context) error { return nil } - err := r.clearBefore(ctx, targetElder) + err := r.clearBefore(ctx, latest.HistoryElder, targetElder) if err != nil { return err } @@ -70,28 +70,48 @@ func (r *System) runOnce(ctx context.Context) { } } -func (r *System) clearBefore(ctx context.Context, seq int32) error { - log.WithField("new_elder", seq).Info("reaper: clearing") +// Work backwards in 100k ledger blocks to prevent using all the CPU. +// +// This runs every hour, so we need to make sure it doesn't +// run for longer than an hour. +// +// Current ledger at 2021-08-12 is 36,827,497, so 100k means 368 batches. At 1 +// batch/second, that seems like a reasonable balance between running well +// under an hour, and slowing it down enough to leave some CPU for other +// processes. +var batchSize = int32(100_000) +var sleep = 1 * time.Second + +func (r *System) clearBefore(ctx context.Context, startSeq, endSeq int32) error { + for batchEndSeq := endSeq - 1; batchEndSeq >= startSeq; batchEndSeq -= batchSize { + batchStartSeq := batchEndSeq - batchSize + if batchStartSeq < startSeq { + batchStartSeq = startSeq + } + log.WithField("start_ledger", batchStartSeq).WithField("end_ledger", batchEndSeq).Info("reaper: clearing") - start, end, err := toid.LedgerRangeInclusive(1, seq-1) - if err != nil { - return err - } + batchStart, batchEnd, err := toid.LedgerRangeInclusive(batchStartSeq, batchEndSeq) + if err != nil { + return err + } - err = r.HistoryQ.Begin() - if err != nil { - return errors.Wrap(err, "Error in begin") - } - defer r.HistoryQ.Rollback() + err = r.HistoryQ.Begin() + if err != nil { + return errors.Wrap(err, "Error in begin") + } + defer r.HistoryQ.Rollback() - err = r.HistoryQ.DeleteRangeAll(ctx, start, end) - if err != nil { - return errors.Wrap(err, "Error in DeleteRangeAll") - } + err = r.HistoryQ.DeleteRangeAll(ctx, batchStart, batchEnd) + if err != nil { + return errors.Wrap(err, "Error in DeleteRangeAll") + } - err = r.HistoryQ.Commit() - if err != nil { - return errors.Wrap(err, "Error in commit") + err = r.HistoryQ.Commit() + if err != nil { + return errors.Wrap(err, "Error in commit") + } + + time.Sleep(sleep) } return nil diff --git a/services/horizon/internal/reap/system_test.go b/services/horizon/internal/reap/system_test.go index 695a5ea3dc..7d5d5a70b2 100644 --- a/services/horizon/internal/reap/system_test.go +++ b/services/horizon/internal/reap/system_test.go @@ -17,6 +17,9 @@ func TestDeleteUnretainedHistory(t *testing.T) { sys := New(0, db, ledgerState) + // Disable sleeps for this. + sleep = 0 + var ( prev int cur int