diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 8ae9762a35..048383f46a 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -221,7 +221,7 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { logged := done() // it logs "State verification finished" twice, but no errors - assert.Len(t, logged, 2) + assert.Len(t, logged, 0) historyQ.AssertExpectations(t) } diff --git a/services/horizon/internal/ingest/verify.go b/services/horizon/internal/ingest/verify.go index 56b274f706..7226118e31 100644 --- a/services/horizon/internal/ingest/verify.go +++ b/services/horizon/internal/ingest/verify.go @@ -41,6 +41,11 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { } s.stateVerificationRunning = true s.stateVerificationMutex.Unlock() + defer func() { + s.stateVerificationMutex.Lock() + s.stateVerificationRunning = false + s.stateVerificationMutex.Unlock() + }() updateMetrics := false @@ -53,24 +58,8 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { return nil } - startTime := time.Now() historyQ := s.historyQ.CloneIngestionQ() - - defer func() { - duration := time.Since(startTime).Seconds() - if updateMetrics { - // Don't update metrics if context cancelled. - if s.ctx.Err() != context.Canceled { - s.Metrics().StateVerifyDuration.Observe(float64(duration)) - } - } - log.WithField("duration", duration).Info("State verification finished") - historyQ.Rollback() - s.stateVerificationMutex.Lock() - s.stateVerificationRunning = false - s.stateVerificationMutex.Unlock() - }() - + defer historyQ.Rollback() err := historyQ.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, ReadOnly: true, @@ -95,31 +84,58 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error { return nil } + localLog.Info("Starting state verification") + if verifyAgainstLatestCheckpoint { - // Get root HAS to check if we're checking one of the latest ledgers or - // Horizon is catching up. It doesn't make sense to verify old ledgers as - // we want to check the latest state. - var historyLatestSequence uint32 - historyLatestSequence, err = s.historyAdapter.GetLatestLedgerSequence() - if err != nil { - return errors.Wrap(err, "Error getting the latest ledger sequence") - } + retries := 0 + for { + // Get root HAS to check if we're checking one of the latest ledgers or + // Horizon is catching up. It doesn't make sense to verify old ledgers as + // we want to check the latest state. + var historyLatestSequence uint32 + historyLatestSequence, err = s.historyAdapter.GetLatestLedgerSequence() + if err != nil { + return errors.Wrap(err, "Error getting the latest ledger sequence") + } - if ledgerSequence < historyLatestSequence { - localLog.Info("Current ledger is old. Cancelling...") - return nil - } + if ledgerSequence < historyLatestSequence { + localLog.Info("Current ledger is old. Cancelling...") + return nil + } - localLog.Info("Starting state verification. Waiting 40 seconds for stellar-core to publish HAS...") - select { - case <-s.ctx.Done(): - localLog.Info("State verifier shut down...") - return nil - case <-time.After(40 * time.Second): - // Wait for stellar-core to publish HAS + if ledgerSequence == historyLatestSequence { + break + } + + localLog.Info("Waiting for stellar-core to publish HAS...") + select { + case <-s.ctx.Done(): + localLog.Info("State verifier shut down...") + return nil + case <-time.After(5 * time.Second): + // Wait for stellar-core to publish HAS + retries++ + if retries == 12 { + localLog.Info("Checkpoint not published. Cancelling...") + return nil + } + } } } + startTime := time.Now() + defer func() { + duration := time.Since(startTime).Seconds() + if updateMetrics { + // Don't update metrics if context cancelled. + if s.ctx.Err() != context.Canceled { + s.Metrics().StateVerifyDuration.Observe(float64(duration)) + } + } + log.WithField("duration", duration).Info("State verification finished") + + }() + localLog.Info("Creating state reader...") stateReader, err := s.historyAdapter.GetState(s.ctx, ledgerSequence)