Skip to content

Commit

Permalink
stellar#5099: changed historyRange and reingestHistoryRange states to…
Browse files Browse the repository at this point in the history
… send batches of ledgers to processors
  • Loading branch information
sreuland committed Nov 15, 2023
1 parent 4670509 commit fd46a14
Show file tree
Hide file tree
Showing 16 changed files with 854 additions and 559 deletions.
283 changes: 0 additions & 283 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -585,288 +584,6 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

type historyRangeState struct {
fromLedger uint32
toLedger uint32
}

func (h historyRangeState) String() string {
return fmt.Sprintf(
"historyRange(fromLedger=%d, toLedger=%d)",
h.fromLedger,
h.toLedger,
)
}

func (historyRangeState) GetState() State {
return HistoryRange
}

// historyRangeState is used when catching up history data
func (h historyRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return start(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
}

err := s.maybePrepareRange(s.ctx, h.fromLedger)
if err != nil {
return start(), err
}

if err = s.historyQ.Begin(s.ctx); err != nil {
return start(), errors.Wrap(err, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// acquire distributed lock so no one else can perform ingestion operations.
if _, err = s.historyQ.GetLastLedgerIngest(s.ctx); err != nil {
return start(), errors.Wrap(err, getLastIngestedErrMsg)
}

lastHistoryLedger, err := s.historyQ.GetLatestHistoryLedger(s.ctx)
if err != nil {
return start(), errors.Wrap(err, "could not get latest history ledger")
}

// We should be ingesting the ledger which occurs after
// lastHistoryLedger. Otherwise, some other horizon node has
// already completed the ingest history range operation and
// we should go back to the init state
if lastHistoryLedger != h.fromLedger-1 {
return start(), nil
}

for cur := h.fromLedger; cur <= h.toLedger; cur++ {
var ledgerCloseMeta xdr.LedgerCloseMeta

log.WithField("sequence", cur).Info("Waiting for ledger to be available in the backend...")
startTime := time.Now()

ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur)
if err != nil {
// Commit finished work in case of ledger backend error.
commitErr := s.historyQ.Commit()
if commitErr != nil {
log.WithError(commitErr).Error("Error committing partial range results")
} else {
log.Info("Committed partial range results")
}
return start(), errors.Wrap(err, "error getting ledger")
}

log.WithFields(logpkg.F{
"sequence": cur,
"duration": time.Since(startTime).Seconds(),
}).Info("Ledger returned from the backend")

if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil {
return start(), err
}
}

if err = s.historyQ.Commit(); err != nil {
return start(), errors.Wrap(err, commitErrMsg)
}

return start(), nil
}

func runTransactionProcessorsOnLedger(s *system, ledger xdr.LedgerCloseMeta) error {
log.WithFields(logpkg.F{
"sequence": ledger.LedgerSequence(),
"state": false,
"ledger": true,
"commit": false,
}).Info("Processing ledger")
startTime := time.Now()

ledgerTransactionStats, _, tradeStats, err := s.runner.RunTransactionProcessorsOnLedger(ledger)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", ledger.LedgerSequence()))
}

log.
WithFields(ledgerTransactionStats.Map()).
WithFields(tradeStats.Map()).
WithFields(logpkg.F{
"sequence": ledger.LedgerSequence(),
"duration": time.Since(startTime).Seconds(),
"state": false,
"ledger": true,
"commit": false,
}).
Info("Processed ledger")
return nil
}

type reingestHistoryRangeState struct {
fromLedger uint32
toLedger uint32
force bool
}

func (h reingestHistoryRangeState) String() string {
return fmt.Sprintf(
"reingestHistoryRange(fromLedger=%d, toLedger=%d, force=%t)",
h.fromLedger,
h.toLedger,
h.force,
)
}

func (reingestHistoryRangeState) GetState() State {
return ReingestHistoryRange
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
}

// Clear history data before ingesting - used in `reingest range` command.
start, end, err := toid.LedgerRangeInclusive(
int32(fromLedger),
int32(toLedger),
)
if err != nil {
return errors.Wrap(err, "Invalid range")
}

err = s.historyQ.DeleteRangeAll(s.ctx, start, end)
if err != nil {
return errors.Wrap(err, "error in DeleteRangeAll")
}

for cur := fromLedger; cur <= toLedger; cur++ {
var ledgerCloseMeta xdr.LedgerCloseMeta
ledgerCloseMeta, err = s.ledgerBackend.GetLedger(s.ctx, cur)
if err != nil {
return errors.Wrap(err, "error getting ledger")
}

if err = runTransactionProcessorsOnLedger(s, ledgerCloseMeta); err != nil {
return err
}
}

return nil
}

func (h reingestHistoryRangeState) prepareRange(s *system) (transition, error) {
log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
}).Info("Preparing ledger backend to retrieve range")
startTime := time.Now()

err := s.ledgerBackend.PrepareRange(s.ctx, ledgerbackend.BoundedRange(h.fromLedger, h.toLedger))
if err != nil {
return stop(), errors.Wrap(err, "error preparing range")
}

log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
"duration": time.Since(startTime).Seconds(),
}).Info("Range ready")

return transition{}, nil
}

// reingestHistoryRangeState is used as a command to reingest historical data
func (h reingestHistoryRangeState) run(s *system) (transition, error) {
if h.fromLedger == 0 || h.toLedger == 0 ||
h.fromLedger > h.toLedger {
return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger)
}

if h.fromLedger == 1 {
log.Warn("Ledger 1 is pregenerated and not available, starting from ledger 2.")
h.fromLedger = 2
}

var startTime time.Time

if h.force {
if t, err := h.prepareRange(s); err != nil {
return t, err
}
startTime = time.Now()

if err := s.historyQ.Begin(s.ctx); err != nil {
return stop(), errors.Wrap(err, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// acquire distributed lock so no one else can perform ingestion operations.
if _, err := s.historyQ.GetLastLedgerIngest(s.ctx); err != nil {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil {
return stop(), err
}

if err := s.historyQ.Commit(); err != nil {
return stop(), errors.Wrap(err, commitErrMsg)
}
} else {
lastIngestedLedger, err := s.historyQ.GetLastLedgerIngestNonBlocking(s.ctx)
if err != nil {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if lastIngestedLedger > 0 && h.toLedger >= lastIngestedLedger {
return stop(), ErrReingestRangeConflict{lastIngestedLedger}
}

// Only prepare the range after checking the bounds to enable an early error return
var t transition
if t, err = h.prepareRange(s); err != nil {
return t, err
}
startTime = time.Now()

for cur := h.fromLedger; cur <= h.toLedger; cur++ {
err = func(ledger uint32) error {
if e := s.historyQ.Begin(s.ctx); e != nil {
return errors.Wrap(e, "Error starting a transaction")
}
defer s.historyQ.Rollback()

// ingest each ledger in a separate transaction to prevent deadlocks
// when acquiring ShareLocks from multiple parallel reingest range processes
if e := h.ingestRange(s, ledger, ledger); e != nil {
return e
}

if e := s.historyQ.Commit(); e != nil {
return errors.Wrap(e, commitErrMsg)
}

return nil
}(cur)
if err != nil {
return stop(), err
}
}
}

err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter)
if err != nil {
return stop(), errors.Wrap(err, "Error rebuilding trade aggregations")
}

log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
"duration": time.Since(startTime).Seconds(),
}).Info("Reingestion done")

return stop(), nil
}

type waitForCheckpointState struct{}

func (waitForCheckpointState) String() string {
Expand Down
Loading

0 comments on commit fd46a14

Please sign in to comment.