From 2b4492308cd6cc75d5b03d497adbd16e77b5c98a Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 6 May 2024 22:09:39 -0700 Subject: [PATCH 1/4] #5256: added horizon_ingest_error_restarts metric output --- services/horizon/internal/ingest/fsm.go | 20 +++++- services/horizon/internal/ingest/main.go | 18 +++++ .../internal/ingest/resume_state_test.go | 71 ++++++++++++++++++- 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 5dce974e35..029cc449bd 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -3,6 +3,7 @@ package ingest import ( "context" "fmt" + "reflect" "strings" "time" @@ -405,7 +406,19 @@ func (resumeState) GetState() State { return Resume } -func (r resumeState) run(s *system) (transition, error) { +func (r resumeState) run(s *system) (transitionResult transition, errorResult error) { + defer func() { + if errorResult != nil { + // capture any restarts that are being triggered by the state + switch reflect.TypeOf(transitionResult.node) { + case (reflect.TypeFor[startState]()): + r.incrementRestartMetric(s, "start") + case (reflect.TypeFor[resumeState]()): + r.incrementRestartMetric(s, "retry") + } + } + }() + if r.latestSuccessfullyProcessedLedger == 0 { return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value") } @@ -574,6 +587,11 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string] } } +func (r resumeState) incrementRestartMetric(s *system, restartType string) { + s.Metrics().IngestionErrorRestartCounter. + With(prometheus.Labels{"type": restartType}).Inc() +} + func (r resumeState) addLoaderDurationsMetricFromMap(s *system, m map[string]time.Duration) { for loaderName, value := range m { s.Metrics().LoadersRunDurationSummary. diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index d88cc3a3ce..bdf413888f 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -173,6 +173,10 @@ type Metrics struct { // ArchiveRequestCounter counts how many http requests are sent to history server HistoryArchiveStatsCounter *prometheus.CounterVec + + // IngestionErrorRestartCounter counts the number of times the live/forward ingestion state machine + // initiates a restart or retry. + IngestionErrorRestartCounter *prometheus.CounterVec } type System interface { @@ -443,6 +447,19 @@ func (s *system) initMetrics() { }, []string{"source", "type"}, ) + + s.metrics.IngestionErrorRestartCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "error_restarts", + Help: "Counters of the number of times the live/forward ingestion state machine initiates a restart. " + + "when 'type' label is 'start' means some aspect of ledger order is out of sync between data from " + + "captive core meta pipe and horizon's db, restarting to see if condition resolves. " + + "when 'type' label is 'retry' means ingestion is getting an unexpected error while " + + "processing network ledger data which it can't resolve. If this metric is constantly increasing, " + + "it means ingestion is stuck in a retry loop on an error it can't resolve, effectively halted.", + }, + []string{"type"}, + ) } func (s *system) GetCurrentState() State { @@ -471,6 +488,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(s.metrics.LoadersStatsSummary) registry.MustRegister(s.metrics.StateVerifyLedgerEntriesCount) registry.MustRegister(s.metrics.HistoryArchiveStatsCounter) + registry.MustRegister(s.metrics.IngestionErrorRestartCounter) s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon") } diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index 985391883f..0a364a12b1 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -6,6 +6,8 @@ import ( "context" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -28,6 +30,7 @@ type ResumeTestTestSuite struct { runner *mockProcessorsRunner stellarCoreClient *mockStellarCoreClient system *system + registry *prometheus.Registry } func (s *ResumeTestTestSuite) SetupTest() { @@ -37,6 +40,7 @@ func (s *ResumeTestTestSuite) SetupTest() { s.historyAdapter = &mockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} s.stellarCoreClient = &mockStellarCoreClient{} + s.registry = prometheus.NewRegistry() s.system = &system{ ctx: s.ctx, historyQ: s.historyQ, @@ -47,8 +51,8 @@ func (s *ResumeTestTestSuite) SetupTest() { runStateVerificationOnLedger: ledgerEligibleForStateVerification(64, 1), } s.system.initMetrics() - s.historyQ.On("Rollback").Return(nil).Once() + s.registry.Register(s.system.Metrics().IngestionErrorRestartCounter) s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(nil).Once() @@ -72,6 +76,7 @@ func (s *ResumeTestTestSuite) TearDownTest() { s.historyAdapter.AssertExpectations(t) s.ledgerBackend.AssertExpectations(t) s.stellarCoreClient.AssertExpectations(t) + s.registry.Unregister(s.system.Metrics().IngestionErrorRestartCounter) } func (s *ResumeTestTestSuite) TestInvalidParam() { @@ -86,6 +91,7 @@ func (s *ResumeTestTestSuite) TestInvalidParam() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) + assertErrorRestartMetrics(s.registry, "start", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() { @@ -103,6 +109,7 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) + assertErrorRestartMetrics(s.registry, "start", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() { @@ -118,6 +125,7 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() s.Assert().Error(err) s.Assert().EqualError(err, "error getting ledger blocking: my error") s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) + assertErrorRestartMetrics(s.registry, "start", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestBeginReturnsError() { @@ -136,6 +144,7 @@ func (s *ResumeTestTestSuite) TestBeginReturnsError() { }, next, ) + assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestGetLastLedgerIngestReturnsError() { @@ -152,6 +161,7 @@ func (s *ResumeTestTestSuite) TestGetLastLedgerIngestReturnsError() { }, next, ) + assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestGetLatestLedgerLessThanCurrent() { @@ -165,6 +175,7 @@ func (s *ResumeTestTestSuite) TestGetLatestLedgerLessThanCurrent() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) + assertErrorRestartMetrics(s.registry, "start", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestGetIngestionVersionError() { @@ -182,6 +193,7 @@ func (s *ResumeTestTestSuite) TestGetIngestionVersionError() { }, next, ) + assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestIngestionVersionLessThanCurrentVersion() { @@ -195,6 +207,7 @@ func (s *ResumeTestTestSuite) TestIngestionVersionLessThanCurrentVersion() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) + assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestIngestionVersionGreaterThanCurrentVersion() { @@ -208,6 +221,7 @@ func (s *ResumeTestTestSuite) TestIngestionVersionGreaterThanCurrentVersion() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) + assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestGetLatestLedgerError() { @@ -226,6 +240,7 @@ func (s *ResumeTestTestSuite) TestGetLatestLedgerError() { }, next, ) + assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestLatestHistoryLedgerLessThanIngestLedger() { @@ -240,6 +255,7 @@ func (s *ResumeTestTestSuite) TestLatestHistoryLedgerLessThanIngestLedger() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) + assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestLatestHistoryLedgerGreaterThanIngestLedger() { @@ -254,6 +270,7 @@ func (s *ResumeTestTestSuite) TestLatestHistoryLedgerGreaterThanIngestLedger() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) + assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { @@ -313,6 +330,7 @@ func (s *ResumeTestTestSuite) TestBumpIngestLedger() { }, next, ) + assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestIngestAllMasterNode() { @@ -327,6 +345,7 @@ func (s *ResumeTestTestSuite) TestIngestAllMasterNode() { }, next, ) + assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestRebuildTradeAggregationBucketsError() { @@ -357,6 +376,28 @@ func (s *ResumeTestTestSuite) TestRebuildTradeAggregationBucketsError() { }, next, ) + assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) +} + +func (s *ResumeTestTestSuite) TestRunAllProcessorsError() { + s.historyQ.On("Begin", s.ctx).Return(nil).Once() + s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() + s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() + s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(100), nil) + + s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")). + Return(ledgerStats{}, errors.New("processor error")).Once() + + next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) + s.Assert().ErrorContains(err, "processor error") + s.Assert().Equal( + transition{ + node: resumeState{latestSuccessfullyProcessedLedger: 100}, + sleepDuration: defaultSleep, + }, + next, + ) + assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { @@ -398,6 +439,7 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { }, next, ) + assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { @@ -448,4 +490,31 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { }, next, ) + assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) +} + +func assertErrorRestartMetrics(reg *prometheus.Registry, assertRestartType string, assertRestartCount float64, assert *assert.Assertions) { + + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "horizon_ingest_error_restarts" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), assertRestartCount) + var restartType = "" + for _, label := range metricFamily.GetMetric()[0].GetLabel() { + if label.GetName() == "type" { + restartType = label.GetValue() + } + } + + assert.Equal(restartType, assertRestartType) + return + } + } + + if assertRestartCount > 0.0 { + assert.Fail("horizon_ingest_restarts metrics were not correct") + } } From 3319801ce7c8370db5564811c431e49732fca702 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 8 May 2024 21:58:43 -0700 Subject: [PATCH 2/4] #5256: use existing ingestion state machine error trap to emit new error counting metrics, per review feedback --- go.mod | 4 +- go.sum | 8 +- services/horizon/internal/ingest/fsm.go | 44 +++++--- .../ingest/ingest_history_range_state_test.go | 1 + services/horizon/internal/ingest/main.go | 28 ++--- services/horizon/internal/ingest/main_test.go | 104 +++++++++++++++++- .../internal/ingest/resume_state_test.go | 71 +----------- 7 files changed, 151 insertions(+), 109 deletions(-) diff --git a/go.mod b/go.mod index fb02c94507..bb6a691f40 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/spf13/viper v1.17.0 github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 github.com/xdrpp/goxdr v0.1.1 google.golang.org/api v0.170.0 @@ -136,7 +136,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca // indirect github.com/spf13/cast v1.5.1 // indirect - github.com/stretchr/objx v0.5.1 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.34.0 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20151027082146-e0fe6f683076 // indirect diff --git a/go.sum b/go.sum index 6d8713de6d..6f4c2d37b2 100644 --- a/go.sum +++ b/go.sum @@ -419,8 +419,8 @@ github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/g github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0= -github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= @@ -428,9 +428,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 h1:g3yQGZK+G6dfF/mw/SOwsTMzUVkpT4hB8pHxpbTXkKw= diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 029cc449bd..a1fb743fd6 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -3,7 +3,6 @@ package ingest import ( "context" "fmt" - "reflect" "strings" "time" @@ -47,6 +46,32 @@ const ( ReingestHistoryRange ) +// provide a name represention for a state +func (state State) Name() string { + switch state { + case Start: + return "start" + case Stop: + return "stop" + case Build: + return "build" + case Resume: + return "resume" + case WaitForCheckpoint: + return "waitforcheckpoint" + case StressTest: + return "stresstest" + case VerifyRange: + return "verifyrange" + case HistoryRange: + return "historyrange" + case ReingestHistoryRange: + return "reingesthistoryrange" + default: + return "none" + } +} + type stateMachineNode interface { run(*system) (transition, error) String() string @@ -407,18 +432,6 @@ func (resumeState) GetState() State { } func (r resumeState) run(s *system) (transitionResult transition, errorResult error) { - defer func() { - if errorResult != nil { - // capture any restarts that are being triggered by the state - switch reflect.TypeOf(transitionResult.node) { - case (reflect.TypeFor[startState]()): - r.incrementRestartMetric(s, "start") - case (reflect.TypeFor[resumeState]()): - r.incrementRestartMetric(s, "retry") - } - } - }() - if r.latestSuccessfullyProcessedLedger == 0 { return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value") } @@ -587,11 +600,6 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string] } } -func (r resumeState) incrementRestartMetric(s *system, restartType string) { - s.Metrics().IngestionErrorRestartCounter. - With(prometheus.Labels{"type": restartType}).Inc() -} - func (r resumeState) addLoaderDurationsMetricFromMap(s *system, m map[string]time.Duration) { for loaderName, value := range m { s.Metrics().LoadersRunDurationSummary. diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index cf248a6a7d..cf89ce4ab2 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -291,6 +291,7 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { } s.historyQ.On("GetTx").Return(nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() + s.system.initMetrics() } func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() { diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index bdf413888f..49a59425e0 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -174,9 +174,9 @@ type Metrics struct { // ArchiveRequestCounter counts how many http requests are sent to history server HistoryArchiveStatsCounter *prometheus.CounterVec - // IngestionErrorRestartCounter counts the number of times the live/forward ingestion state machine - // initiates a restart or retry. - IngestionErrorRestartCounter *prometheus.CounterVec + // IngestionErrorCounter counts the number of times the live/forward ingestion state machine + // encounters an error condition. + IngestionErrorCounter *prometheus.CounterVec } type System interface { @@ -448,17 +448,14 @@ func (s *system) initMetrics() { []string{"source", "type"}, ) - s.metrics.IngestionErrorRestartCounter = prometheus.NewCounterVec( + s.metrics.IngestionErrorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "horizon", Subsystem: "ingest", Name: "error_restarts", - Help: "Counters of the number of times the live/forward ingestion state machine initiates a restart. " + - "when 'type' label is 'start' means some aspect of ledger order is out of sync between data from " + - "captive core meta pipe and horizon's db, restarting to see if condition resolves. " + - "when 'type' label is 'retry' means ingestion is getting an unexpected error while " + - "processing network ledger data which it can't resolve. If this metric is constantly increasing, " + - "it means ingestion is stuck in a retry loop on an error it can't resolve, effectively halted.", + Namespace: "horizon", Subsystem: "ingest", Name: "errors_total", + Help: "Counters of the number of times the live/forward ingestion state machine encountered an error. " + + "'current_state' label has the name of the state that error occurred. " + + "'next_state' label has the name of the next state requested from the current_state.", }, - []string{"type"}, + []string{"current_state", "next_state"}, ) } @@ -488,7 +485,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(s.metrics.LoadersStatsSummary) registry.MustRegister(s.metrics.StateVerifyLedgerEntriesCount) registry.MustRegister(s.metrics.HistoryArchiveStatsCounter) - registry.MustRegister(s.metrics.IngestionErrorRestartCounter) + registry.MustRegister(s.metrics.IngestionErrorCounter) s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon") } @@ -661,6 +658,11 @@ func (s *system) runStateMachine(cur stateMachineNode) error { // so we log these errors using the info log level logger.Info("Error in ingestion state machine") } else { + s.Metrics().IngestionErrorCounter. + With(prometheus.Labels{ + "current_state": cur.GetState().Name(), + "next_state": next.node.GetState().Name(), + }).Inc() logger.Error("Error in ingestion state machine") } } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 6f5d89bd6a..3c7c587aa2 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "database/sql" + "sync" "testing" "time" @@ -113,10 +114,13 @@ func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { historyQ: historyQ, ctx: context.Background(), } + reg := setupMetrics(system) historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() - assert.PanicsWithValue(t, "unexpected transaction", func() { + defer func() { + assertErrorRestartMetrics(reg, "", "", 0, t) + }() system.Run() }) } @@ -127,12 +131,17 @@ func TestStateMachineTransition(t *testing.T) { historyQ: historyQ, ctx: context.Background(), } + reg := setupMetrics(system) historyQ.On("GetTx").Return(nil).Once() historyQ.On("Begin", mock.Anything).Return(errors.New("my error")).Once() historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() assert.PanicsWithValue(t, "unexpected transaction", func() { + defer func() { + // the test triggers error in the first start state exec, so metric is added + assertErrorRestartMetrics(reg, "start", "start", 1, t) + }() system.Run() }) } @@ -144,12 +153,14 @@ func TestContextCancel(t *testing.T) { historyQ: historyQ, ctx: ctx, } + reg := setupMetrics(system) historyQ.On("GetTx").Return(nil).Once() - historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(errors.New("my error")).Once() + historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(context.Canceled).Once() cancel() assert.NoError(t, system.runStateMachine(startState{})) + assertErrorRestartMetrics(reg, "", "", 0, t) } // TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError checks if the @@ -162,12 +173,61 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing. ctx: context.Background(), historyQ: historyQ, } + reg := setupMetrics(system) historyQ.On("GetTx").Return(nil).Once() err := system.runStateMachine(verifyRangeState{}) assert.Error(t, err) assert.EqualError(t, err, "invalid range: [0, 0]") + assertErrorRestartMetrics(reg, "verifyrange", "stop", 1, t) +} + +func TestStateMachineRestartEmitsMetric(t *testing.T) { + historyQ := &mockDBQ{} + ledgerBackend := &mockLedgerBackend{} + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + system := &system{ + ctx: ctx, + historyQ: historyQ, + ledgerBackend: ledgerBackend, + } + + ledgerBackend.On("IsPrepared", system.ctx, ledgerbackend.UnboundedRange(101)).Return(true, nil) + ledgerBackend.On("GetLedger", system.ctx, uint32(101)).Return(xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: 101, + LedgerVersion: xdr.Uint32(MaxSupportedProtocolVersion), + BucketListHash: xdr.Hash{1, 2, 3}, + }, + }, + }, + }, nil) + + reg := setupMetrics(system) + + historyQ.On("GetTx").Return(nil) + historyQ.On("Begin", system.ctx).Return(errors.New("stop state machine")) + + wg.Add(1) + go func() { + defer wg.Done() + system.runStateMachine(resumeState{latestSuccessfullyProcessedLedger: 100}) + }() + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + // this checks every 50ms up to 10s total, for at least 3 fsm retries based on a db Begin error + // this condition should be met as the fsm retries every second. + assertErrorRestartMetrics(reg, "resume", "resume", 3, c) + }, 10*time.Second, 50*time.Millisecond, "horizon_ingest_errors_total metric was not incremented on a fsm error") } func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) { @@ -248,6 +308,7 @@ func TestCurrentStateRaceCondition(t *testing.T) { historyQ: historyQ, ctx: context.Background(), } + reg := setupMetrics(s) historyQ.On("GetTx").Return(nil) historyQ.On("Begin", s.ctx).Return(nil) @@ -280,6 +341,45 @@ loop: } close(getCh) <-doneCh + assertErrorRestartMetrics(reg, "", "", 0, t) +} + +func setupMetrics(system *system) *prometheus.Registry { + registry := prometheus.NewRegistry() + system.initMetrics() + registry.Register(system.Metrics().IngestionErrorCounter) + return registry +} + +func assertErrorRestartMetrics(reg *prometheus.Registry, assertCurrentState string, assertNextState string, assertRestartCount float64, t assert.TestingT) { + assert := assert.New(t) + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "horizon_ingest_errors_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), assertRestartCount) + var metricCurrentState = "" + var metricNextState = "" + for _, label := range metricFamily.GetMetric()[0].GetLabel() { + if label.GetName() == "current_state" { + metricCurrentState = label.GetValue() + } + if label.GetName() == "next_state" { + metricNextState = label.GetValue() + } + } + + assert.Equal(metricCurrentState, assertCurrentState) + assert.Equal(metricNextState, assertNextState) + return + } + } + + if assertRestartCount > 0.0 { + assert.Fail("horizon_ingest_errors_total metrics were not correct") + } } type mockDBQ struct { diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index 0a364a12b1..985391883f 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -6,8 +6,6 @@ import ( "context" "testing" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -30,7 +28,6 @@ type ResumeTestTestSuite struct { runner *mockProcessorsRunner stellarCoreClient *mockStellarCoreClient system *system - registry *prometheus.Registry } func (s *ResumeTestTestSuite) SetupTest() { @@ -40,7 +37,6 @@ func (s *ResumeTestTestSuite) SetupTest() { s.historyAdapter = &mockHistoryArchiveAdapter{} s.runner = &mockProcessorsRunner{} s.stellarCoreClient = &mockStellarCoreClient{} - s.registry = prometheus.NewRegistry() s.system = &system{ ctx: s.ctx, historyQ: s.historyQ, @@ -51,8 +47,8 @@ func (s *ResumeTestTestSuite) SetupTest() { runStateVerificationOnLedger: ledgerEligibleForStateVerification(64, 1), } s.system.initMetrics() + s.historyQ.On("Rollback").Return(nil).Once() - s.registry.Register(s.system.Metrics().IngestionErrorRestartCounter) s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(nil).Once() @@ -76,7 +72,6 @@ func (s *ResumeTestTestSuite) TearDownTest() { s.historyAdapter.AssertExpectations(t) s.ledgerBackend.AssertExpectations(t) s.stellarCoreClient.AssertExpectations(t) - s.registry.Unregister(s.system.Metrics().IngestionErrorRestartCounter) } func (s *ResumeTestTestSuite) TestInvalidParam() { @@ -91,7 +86,6 @@ func (s *ResumeTestTestSuite) TestInvalidParam() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) - assertErrorRestartMetrics(s.registry, "start", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() { @@ -109,7 +103,6 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedFailPrepare() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) - assertErrorRestartMetrics(s.registry, "start", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() { @@ -125,7 +118,6 @@ func (s *ResumeTestTestSuite) TestRangeNotPreparedSuccessPrepareGetLedgerFail() s.Assert().Error(err) s.Assert().EqualError(err, "error getting ledger blocking: my error") s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next) - assertErrorRestartMetrics(s.registry, "start", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestBeginReturnsError() { @@ -144,7 +136,6 @@ func (s *ResumeTestTestSuite) TestBeginReturnsError() { }, next, ) - assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestGetLastLedgerIngestReturnsError() { @@ -161,7 +152,6 @@ func (s *ResumeTestTestSuite) TestGetLastLedgerIngestReturnsError() { }, next, ) - assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestGetLatestLedgerLessThanCurrent() { @@ -175,7 +165,6 @@ func (s *ResumeTestTestSuite) TestGetLatestLedgerLessThanCurrent() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) - assertErrorRestartMetrics(s.registry, "start", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestGetIngestionVersionError() { @@ -193,7 +182,6 @@ func (s *ResumeTestTestSuite) TestGetIngestionVersionError() { }, next, ) - assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestIngestionVersionLessThanCurrentVersion() { @@ -207,7 +195,6 @@ func (s *ResumeTestTestSuite) TestIngestionVersionLessThanCurrentVersion() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) - assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestIngestionVersionGreaterThanCurrentVersion() { @@ -221,7 +208,6 @@ func (s *ResumeTestTestSuite) TestIngestionVersionGreaterThanCurrentVersion() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) - assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestGetLatestLedgerError() { @@ -240,7 +226,6 @@ func (s *ResumeTestTestSuite) TestGetLatestLedgerError() { }, next, ) - assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestLatestHistoryLedgerLessThanIngestLedger() { @@ -255,7 +240,6 @@ func (s *ResumeTestTestSuite) TestLatestHistoryLedgerLessThanIngestLedger() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) - assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestLatestHistoryLedgerGreaterThanIngestLedger() { @@ -270,7 +254,6 @@ func (s *ResumeTestTestSuite) TestLatestHistoryLedgerGreaterThanIngestLedger() { transition{node: startState{}, sleepDuration: defaultSleep}, next, ) - assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { @@ -330,7 +313,6 @@ func (s *ResumeTestTestSuite) TestBumpIngestLedger() { }, next, ) - assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestIngestAllMasterNode() { @@ -345,7 +327,6 @@ func (s *ResumeTestTestSuite) TestIngestAllMasterNode() { }, next, ) - assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestRebuildTradeAggregationBucketsError() { @@ -376,28 +357,6 @@ func (s *ResumeTestTestSuite) TestRebuildTradeAggregationBucketsError() { }, next, ) - assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) -} - -func (s *ResumeTestTestSuite) TestRunAllProcessorsError() { - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() - s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() - s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(100), nil) - - s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")). - Return(ledgerStats{}, errors.New("processor error")).Once() - - next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) - s.Assert().ErrorContains(err, "processor error") - s.Assert().Equal( - transition{ - node: resumeState{latestSuccessfullyProcessedLedger: 100}, - sleepDuration: defaultSleep, - }, - next, - ) - assertErrorRestartMetrics(s.registry, "retry", 1, s.Assert()) } func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { @@ -439,7 +398,6 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { }, next, ) - assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) } func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { @@ -490,31 +448,4 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { }, next, ) - assertErrorRestartMetrics(s.registry, "", 0, s.Assert()) -} - -func assertErrorRestartMetrics(reg *prometheus.Registry, assertRestartType string, assertRestartCount float64, assert *assert.Assertions) { - - metrics, err := reg.Gather() - assert.NoError(err) - - for _, metricFamily := range metrics { - if metricFamily.GetName() == "horizon_ingest_error_restarts" { - assert.Len(metricFamily.GetMetric(), 1) - assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), assertRestartCount) - var restartType = "" - for _, label := range metricFamily.GetMetric()[0].GetLabel() { - if label.GetName() == "type" { - restartType = label.GetValue() - } - } - - assert.Equal(restartType, assertRestartType) - return - } - } - - if assertRestartCount > 0.0 { - assert.Fail("horizon_ingest_restarts metrics were not correct") - } } From f05e1fdf7d40dc1f52381d22277b678b8e0af4c0 Mon Sep 17 00:00:00 2001 From: shawn Date: Thu, 9 May 2024 16:42:56 -0700 Subject: [PATCH 3/4] update metric description, review feedback Co-authored-by: tamirms --- services/horizon/internal/ingest/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 49a59425e0..7e109c391a 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -452,7 +452,7 @@ func (s *system) initMetrics() { prometheus.CounterOpts{ Namespace: "horizon", Subsystem: "ingest", Name: "errors_total", Help: "Counters of the number of times the live/forward ingestion state machine encountered an error. " + - "'current_state' label has the name of the state that error occurred. " + + "'current_state' label has the name of the state where the error occurred. " + "'next_state' label has the name of the next state requested from the current_state.", }, []string{"current_state", "next_state"}, From 110ab7d3ec20ba11b724a56dac801d395d3bc0f1 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 9 May 2024 16:45:13 -0700 Subject: [PATCH 4/4] #5256: removed unused return param names, review feedback --- services/horizon/internal/ingest/fsm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index a1fb743fd6..1b51518391 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -431,7 +431,7 @@ func (resumeState) GetState() State { return Resume } -func (r resumeState) run(s *system) (transitionResult transition, errorResult error) { +func (r resumeState) run(s *system) (transition, error) { if r.latestSuccessfullyProcessedLedger == 0 { return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value") }