Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/ingest: historyRange and reingestHistoryRange states send batches of ledgers to tx processors #5117

Merged
merged 12 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@
var (
reingestForce bool
parallelWorkers uint
maxLedgersPerFlush uint32
parallelJobSize uint32
retries uint
retryBackoffSeconds uint
Expand Down Expand Up @@ -275,6 +276,14 @@
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "ledgers-per-flush",
ConfigKey: &maxLedgersPerFlush,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(10),
sreuland marked this conversation as resolved.
Show resolved Hide resolved
Usage: "[optional] size of ledgers batch for tx processors to retain in memory first, before flushing once to the workers ongoing db transaction, default is 10, 0 disables batching, effectively flush to db tx per each ledger.",

Check failure on line 285 in services/horizon/cmd/db.go

View workflow job for this annotation

GitHub Actions / golangci

line is 236 characters (lll)
},
{
Name: "retries",
ConfigKey: &retries,
Expand Down Expand Up @@ -415,6 +424,7 @@
StellarCoreURL: config.StellarCoreURL,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
MaxLedgerPerFlush: maxLedgersPerFlush,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
283 changes: 0 additions & 283 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -585,288 +584,6 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

type historyRangeState struct {
fromLedger uint32
toLedger uint32
}

func (h historyRangeState) String() string {
return fmt.Sprintf(
"historyRange(fromLedger=%d, toLedger=%d)",
h.fromLedger,
h.toLedger,
)
}

func (historyRangeState) GetState() State {
return HistoryRange
}

// historyRangeState is used when catching up history data
func (h historyRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
}

err := s.maybePrepareRange(s.ctx, h.fromLedger)
if err != nil {
return start(), err
}

if err = s.historyQ.Begin(s.ctx); err != nil {
return start(), errors.Wrap(err, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// acquire distributed lock so no one else can perform ingestion operations.
if _, err = s.historyQ.GetLastLedgerIngest(s.ctx); err != nil {
return start(), errors.Wrap(err, getLastIngestedErrMsg)
}

lastHistoryLedger, err := s.historyQ.GetLatestHistoryLedger(s.ctx)
if err != nil {
return start(), errors.Wrap(err, "could not get latest history ledger")
}

// We should be ingesting the ledger which occurs after
// lastHistoryLedger. Otherwise, some other horizon node has
// already completed the ingest history range operation and
// we should go back to the init state
if lastHistoryLedger != h.fromLedger-1 {
return start(), nil
}

for cur := h.fromLedger; cur <= h.toLedger; cur++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...")
startTime := time.Now()

ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur)
if err != nil {
// Commit finished work in case of ledger backend error.
commitErr := s.historyQ.Commit()
if commitErr != nil {
log.WithError(commitErr).Error("Error committing partial range results")
} else {
log.Info("Committed partial range results")
}
return start(), errors.Wrap(err, "error getting ledger")
}

log.WithFields(logpkg.F{
"sequence": cur,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil {
return start(), err
}
}

if err = s.historyQ.Commit(); err != nil {
return start(), errors.Wrap(err, commitErrMsg)
}

return start(), nil
}

func runTransactionProcessorsOnLedger(s *system, ledger xdr.LedgerCloseMeta) error {
log.WithFields(logpkg.F{
"sequence": ledger.LedgerSequence(),
"state": false,
"ledger": true,
"commit": false,
}).Info("Processing ledger")
startTime := time.Now()

ledgerTransactionStats, _, tradeStats, err := s.runner.RunTransactionProcessorsOnLedger(ledger)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", ledger.LedgerSequence()))
}

log.
WithFields(ledgerTransactionStats.Map()).
WithFields(tradeStats.Map()).
WithFields(logpkg.F{
"sequence": ledger.LedgerSequence(),
"duration": time.Since(startTime).Seconds(),
"state": false,
"ledger": true,
"commit": false,
}).
Info("Processed ledger")
return nil
}

type reingestHistoryRangeState struct {
fromLedger uint32
toLedger uint32
force bool
}

func (h reingestHistoryRangeState) String() string {
return fmt.Sprintf(
"reingestHistoryRange(fromLedger=%d, toLedger=%d, force=%t)",
h.fromLedger,
h.toLedger,
h.force,
)
}

func (reingestHistoryRangeState) GetState() State {
return ReingestHistoryRange
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
}

// Clear history data before ingesting - used in `reingest range` command.
start, end, err := toid.LedgerRangeInclusive(
int32(fromLedger),
int32(toLedger),
)
if err != nil {
return errors.Wrap(err, "Invalid range")
}

err = s.historyQ.DeleteRangeAll(s.ctx, start, end)
if err != nil {
return errors.Wrap(err, "error in DeleteRangeAll")
}

for cur := fromLedger; cur <= toLedger; cur++ {
var ledgerCloseMeta xdr.LedgerCloseMeta
ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur)
if err != nil {
return errors.Wrap(err, "error getting ledger")
}

if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil {
return err
}
}

return nil
}

func (h reingestHistoryRangeState) prepareRange(s *system) (transition, error) {
log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
}).Info("Preparing ledger backend to retrieve range")
startTime := time.Now()

err := s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(h.fromLedger, h.toLedger))
if err != nil {
return stop(), errors.Wrap(err, "error preparing range")
}

log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
"duration": time.Since(startTime).Seconds(),
}).Info("Range ready")

return transition{}, nil
}

// reingestHistoryRangeState is used as a command to reingest historical data
func (h reingestHistoryRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
}

if h.fromLedger == 1 {
log.Warn("Ledger 1 is pregenerated and not available, starting from ledger 2.")
h.fromLedger = 2
}

var startTime time.Time

if h.force {
if t, err := h.prepareRange(s); err != nil {
return t, err
}
startTime = time.Now()

if err := s.historyQ.Begin(s.ctx); err != nil {
return stop(), errors.Wrap(err, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// acquire distributed lock so no one else can perform ingestion operations.
if _, err := s.historyQ.GetLastLedgerIngest(s.ctx); err != nil {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil {
return stop(), err
}

if err := s.historyQ.Commit(); err != nil {
return stop(), errors.Wrap(err, commitErrMsg)
}
} else {
lastIngestedLedger, err := s.historyQ.GetLastLedgerIngestNonBlocking(s.ctx)
if err != nil {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if lastIngestedLedger > 0 && h.toLedger >= lastIngestedLedger {
return stop(), ErrReingestRangeConflict{lastIngestedLedger}
}

// Only prepare the range after checking the bounds to enable an early error return
var t transition
if t, err = h.prepareRange(s); err != nil {
return t, err
}
startTime = time.Now()

for cur := h.fromLedger; cur <= h.toLedger; cur++ {
err = func(ledger uint32) error {
if e := s.historyQ.Begin(s.ctx); e != nil {
return errors.Wrap(e, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// ingest each ledger in a separate transaction to prevent deadlocks
// when acquiring ShareLocks from multiple parallel reingest range processes
if e := h.ingestRange(s, ledger, ledger); e != nil {
return e
}

if e := s.historyQ.Commit(); e != nil {
return errors.Wrap(e, commitErrMsg)
}

return nil
}(cur)
if err != nil {
return stop(), err
}
}
}

err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter)
if err != nil {
return stop(), errors.Wrap(err, "Error rebuilding trade aggregations")
}

log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
"duration": time.Since(startTime).Seconds(),
}).Info("Reingestion done")

return stop(), nil
}

type waitForCheckpointState struct{}

func (waitForCheckpointState) String() string {
Expand Down
Loading
Loading