Skip to content

Commit

Permalink
services/horizon/ingest: verifyState improvements (#3251)
Browse files Browse the repository at this point in the history
This commit adds improvements to `system.verifyState`:

* Improves waiting time for checkpoint publish. Before this commit, it
always waited for 40 seconds. In the new code it checks if checkpoint
was published every 5 seconds for a minute.
* State verifier duration is measured from the moment checkpoint is read
instead of when waiting starts.
* Split `defer`ed function into separate functions and moved near the
code connected to `defer` statement.
  • Loading branch information
bartekn authored Nov 25, 2020
1 parent dd97043 commit 64145c6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 37 deletions.
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) {
logged := done()

// it logs "State verification finished" twice, but no errors
assert.Len(t, logged, 2)
assert.Len(t, logged, 0)

historyQ.AssertExpectations(t)
}
Expand Down
88 changes: 52 additions & 36 deletions services/horizon/internal/ingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
}
s.stateVerificationRunning = true
s.stateVerificationMutex.Unlock()
defer func() {
s.stateVerificationMutex.Lock()
s.stateVerificationRunning = false
s.stateVerificationMutex.Unlock()
}()

updateMetrics := false

Expand All @@ -53,24 +58,8 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
return nil
}

startTime := time.Now()
historyQ := s.historyQ.CloneIngestionQ()

defer func() {
duration := time.Since(startTime).Seconds()
if updateMetrics {
// Don't update metrics if context cancelled.
if s.ctx.Err() != context.Canceled {
s.Metrics().StateVerifyDuration.Observe(float64(duration))
}
}
log.WithField("duration", duration).Info("State verification finished")
historyQ.Rollback()
s.stateVerificationMutex.Lock()
s.stateVerificationRunning = false
s.stateVerificationMutex.Unlock()
}()

defer historyQ.Rollback()
err := historyQ.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
Expand All @@ -95,31 +84,58 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool) error {
return nil
}

localLog.Info("Starting state verification")

if verifyAgainstLatestCheckpoint {
// Get root HAS to check if we're checking one of the latest ledgers or
// Horizon is catching up. It doesn't make sense to verify old ledgers as
// we want to check the latest state.
var historyLatestSequence uint32
historyLatestSequence, err = s.historyAdapter.GetLatestLedgerSequence()
if err != nil {
return errors.Wrap(err, "Error getting the latest ledger sequence")
}
retries := 0
for {
// Get root HAS to check if we're checking one of the latest ledgers or
// Horizon is catching up. It doesn't make sense to verify old ledgers as
// we want to check the latest state.
var historyLatestSequence uint32
historyLatestSequence, err = s.historyAdapter.GetLatestLedgerSequence()
if err != nil {
return errors.Wrap(err, "Error getting the latest ledger sequence")
}

if ledgerSequence < historyLatestSequence {
localLog.Info("Current ledger is old. Cancelling...")
return nil
}
if ledgerSequence < historyLatestSequence {
localLog.Info("Current ledger is old. Cancelling...")
return nil
}

localLog.Info("Starting state verification. Waiting 40 seconds for stellar-core to publish HAS...")
select {
case <-s.ctx.Done():
localLog.Info("State verifier shut down...")
return nil
case <-time.After(40 * time.Second):
// Wait for stellar-core to publish HAS
if ledgerSequence == historyLatestSequence {
break
}

localLog.Info("Waiting for stellar-core to publish HAS...")
select {
case <-s.ctx.Done():
localLog.Info("State verifier shut down...")
return nil
case <-time.After(5 * time.Second):
// Wait for stellar-core to publish HAS
retries++
if retries == 12 {
localLog.Info("Checkpoint not published. Cancelling...")
return nil
}
}
}
}

startTime := time.Now()
defer func() {
duration := time.Since(startTime).Seconds()
if updateMetrics {
// Don't update metrics if context cancelled.
if s.ctx.Err() != context.Canceled {
s.Metrics().StateVerifyDuration.Observe(float64(duration))
}
}
log.WithField("duration", duration).Info("State verification finished")

}()

localLog.Info("Creating state reader...")

stateReader, err := s.historyAdapter.GetState(s.ctx, ledgerSequence)
Expand Down

0 comments on commit 64145c6

Please sign in to comment.