Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/ingest: verifyState improvements #3251

Merged
merged 2 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) {
logged := done()

// it logs "State verification finished" twice, but no errors
assert.Len(t, logged, 2)
assert.Len(t, logged, 0)

historyQ.AssertExpectations(t)
}
Expand Down
88 changes: 52 additions & 36 deletions services/horizon/internal/ingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
}
s.stateVerificationRunning = true
s.stateVerificationMutex.Unlock()
defer func() {
s.stateVerificationMutex.Lock()
s.stateVerificationRunning = false
s.stateVerificationMutex.Unlock()
}()

updateMetrics := false

Expand All @@ -53,24 +58,8 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
return nil
}

startTime := time.Now()
historyQ := s.historyQ.CloneIngestionQ()

defer func() {
duration := time.Since(startTime).Seconds()
if updateMetrics {
// Don't update metrics if context cancelled.
if s.ctx.Err() != context.Canceled {
s.Metrics().StateVerifyDuration.Observe(float64(duration))
}
}
log.WithField("duration", duration).Info("State verification finished")
historyQ.Rollback()
s.stateVerificationMutex.Lock()
s.stateVerificationRunning = false
s.stateVerificationMutex.Unlock()
}()

defer historyQ.Rollback()
err := historyQ.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
Expand All @@ -95,31 +84,58 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
return nil
}

localLog.Info("Starting state verification")

if verifyAgainstLatestCheckpoint {
// Get root HAS to check if we're checking one of the latest ledgers or
// Horizon is catching up. It doesn't make sense to verify old ledgers as
// we want to check the latest state.
var historyLatestSequence uint32
historyLatestSequence, err = s.historyAdapter.GetLatestLedgerSequence()
if err != nil {
return errors.Wrap(err, "Error getting the latest ledger sequence")
}
retries := 0
for {
// Get root HAS to check if we're checking one of the latest ledgers or
// Horizon is catching up. It doesn't make sense to verify old ledgers as
// we want to check the latest state.
var historyLatestSequence uint32
historyLatestSequence, err = s.historyAdapter.GetLatestLedgerSequence()
if err != nil {
return errors.Wrap(err, "Error getting the latest ledger sequence")
}

if ledgerSequence < historyLatestSequence {
localLog.Info("Current ledger is old. Cancelling...")
return nil
}
if ledgerSequence < historyLatestSequence {
localLog.Info("Current ledger is old. Cancelling...")
return nil
}

localLog.Info("Starting state verification. Waiting 40 seconds for stellar-core to publish HAS...")
select {
case <-s.ctx.Done():
localLog.Info("State verifier shut down...")
return nil
case <-time.After(40 * time.Second):
// Wait for stellar-core to publish HAS
if ledgerSequence == historyLatestSequence {
break
}

localLog.Info("Waiting for stellar-core to publish HAS...")
select {
case <-s.ctx.Done():
localLog.Info("State verifier shut down...")
return nil
case <-time.After(5 * time.Second):
// Wait for stellar-core to publish HAS
retries++
if retries == 12 {
localLog.Info("Checkpoint not published. Cancelling...")
return nil
}
}
}
}

startTime := time.Now()
defer func() {
duration := time.Since(startTime).Seconds()
if updateMetrics {
// Don't update metrics if context cancelled.
if s.ctx.Err() != context.Canceled {
s.Metrics().StateVerifyDuration.Observe(float64(duration))
}
}
log.WithField("duration", duration).Info("State verification finished")

}()

localLog.Info("Creating state reader...")

stateReader, err := s.historyAdapter.GetState(s.ctx, ledgerSequence)
Expand Down