diff --git a/go.mod b/go.mod index fb02c94507..bb6a691f40 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/spf13/viper v1.17.0 github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 github.com/xdrpp/goxdr v0.1.1 google.golang.org/api v0.170.0 @@ -136,7 +136,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca // indirect github.com/spf13/cast v1.5.1 // indirect - github.com/stretchr/objx v0.5.1 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.34.0 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20151027082146-e0fe6f683076 // indirect diff --git a/go.sum b/go.sum index 6d8713de6d..6f4c2d37b2 100644 --- a/go.sum +++ b/go.sum @@ -419,8 +419,8 @@ github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/g github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0= -github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= @@ -428,9 +428,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 h1:g3yQGZK+G6dfF/mw/SOwsTMzUVkpT4hB8pHxpbTXkKw= diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 5dce974e35..1b51518391 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -46,6 +46,32 @@ const ( ReingestHistoryRange ) +// provide a name represention for a state +func (state State) Name() string { + switch state { + case Start: + return "start" + case Stop: + return "stop" + case Build: + return "build" + case Resume: + return "resume" + case WaitForCheckpoint: + return "waitforcheckpoint" + case StressTest: + return "stresstest" + case VerifyRange: + return "verifyrange" + case HistoryRange: + return "historyrange" + case ReingestHistoryRange: + return "reingesthistoryrange" + default: + return "none" + } +} + type stateMachineNode interface { run(*system) (transition, error) String() string diff --git a/services/horizon/internal/ingest/ingest_history_range_state_test.go b/services/horizon/internal/ingest/ingest_history_range_state_test.go index cf248a6a7d..cf89ce4ab2 100644 --- a/services/horizon/internal/ingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/ingest/ingest_history_range_state_test.go @@ -291,6 +291,7 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { } s.historyQ.On("GetTx").Return(nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once() + s.system.initMetrics() } func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() { diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index d88cc3a3ce..7e109c391a 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -173,6 +173,10 @@ type Metrics struct { // ArchiveRequestCounter counts how many http requests are sent to history server HistoryArchiveStatsCounter *prometheus.CounterVec + + // IngestionErrorCounter counts the number of times the live/forward ingestion state machine + // encounters an error condition. + IngestionErrorCounter *prometheus.CounterVec } type System interface { @@ -443,6 +447,16 @@ func (s *system) initMetrics() { }, []string{"source", "type"}, ) + + s.metrics.IngestionErrorCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "errors_total", + Help: "Counters of the number of times the live/forward ingestion state machine encountered an error. " + + "'current_state' label has the name of the state where the error occurred. " + + "'next_state' label has the name of the next state requested from the current_state.", + }, + []string{"current_state", "next_state"}, + ) } func (s *system) GetCurrentState() State { @@ -471,6 +485,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(s.metrics.LoadersStatsSummary) registry.MustRegister(s.metrics.StateVerifyLedgerEntriesCount) registry.MustRegister(s.metrics.HistoryArchiveStatsCounter) + registry.MustRegister(s.metrics.IngestionErrorCounter) s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon") } @@ -643,6 +658,11 @@ func (s *system) runStateMachine(cur stateMachineNode) error { // so we log these errors using the info log level logger.Info("Error in ingestion state machine") } else { + s.Metrics().IngestionErrorCounter. + With(prometheus.Labels{ + "current_state": cur.GetState().Name(), + "next_state": next.node.GetState().Name(), + }).Inc() logger.Error("Error in ingestion state machine") } } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 6f5d89bd6a..3c7c587aa2 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "database/sql" + "sync" "testing" "time" @@ -113,10 +114,13 @@ func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) { historyQ: historyQ, ctx: context.Background(), } + reg := setupMetrics(system) historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() - assert.PanicsWithValue(t, "unexpected transaction", func() { + defer func() { + assertErrorRestartMetrics(reg, "", "", 0, t) + }() system.Run() }) } @@ -127,12 +131,17 @@ func TestStateMachineTransition(t *testing.T) { historyQ: historyQ, ctx: context.Background(), } + reg := setupMetrics(system) historyQ.On("GetTx").Return(nil).Once() historyQ.On("Begin", mock.Anything).Return(errors.New("my error")).Once() historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() assert.PanicsWithValue(t, "unexpected transaction", func() { + defer func() { + // the test triggers error in the first start state exec, so metric is added + assertErrorRestartMetrics(reg, "start", "start", 1, t) + }() system.Run() }) } @@ -144,12 +153,14 @@ func TestContextCancel(t *testing.T) { historyQ: historyQ, ctx: ctx, } + reg := setupMetrics(system) historyQ.On("GetTx").Return(nil).Once() - historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(errors.New("my error")).Once() + historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(context.Canceled).Once() cancel() assert.NoError(t, system.runStateMachine(startState{})) + assertErrorRestartMetrics(reg, "", "", 0, t) } // TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError checks if the @@ -162,12 +173,61 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing. ctx: context.Background(), historyQ: historyQ, } + reg := setupMetrics(system) historyQ.On("GetTx").Return(nil).Once() err := system.runStateMachine(verifyRangeState{}) assert.Error(t, err) assert.EqualError(t, err, "invalid range: [0, 0]") + assertErrorRestartMetrics(reg, "verifyrange", "stop", 1, t) +} + +func TestStateMachineRestartEmitsMetric(t *testing.T) { + historyQ := &mockDBQ{} + ledgerBackend := &mockLedgerBackend{} + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + system := &system{ + ctx: ctx, + historyQ: historyQ, + ledgerBackend: ledgerBackend, + } + + ledgerBackend.On("IsPrepared", system.ctx, ledgerbackend.UnboundedRange(101)).Return(true, nil) + ledgerBackend.On("GetLedger", system.ctx, uint32(101)).Return(xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: 101, + LedgerVersion: xdr.Uint32(MaxSupportedProtocolVersion), + BucketListHash: xdr.Hash{1, 2, 3}, + }, + }, + }, + }, nil) + + reg := setupMetrics(system) + + historyQ.On("GetTx").Return(nil) + historyQ.On("Begin", system.ctx).Return(errors.New("stop state machine")) + + wg.Add(1) + go func() { + defer wg.Done() + system.runStateMachine(resumeState{latestSuccessfullyProcessedLedger: 100}) + }() + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + // this checks every 50ms up to 10s total, for at least 3 fsm retries based on a db Begin error + // this condition should be met as the fsm retries every second. + assertErrorRestartMetrics(reg, "resume", "resume", 3, c) + }, 10*time.Second, 50*time.Millisecond, "horizon_ingest_errors_total metric was not incremented on a fsm error") } func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) { @@ -248,6 +308,7 @@ func TestCurrentStateRaceCondition(t *testing.T) { historyQ: historyQ, ctx: context.Background(), } + reg := setupMetrics(s) historyQ.On("GetTx").Return(nil) historyQ.On("Begin", s.ctx).Return(nil) @@ -280,6 +341,45 @@ loop: } close(getCh) <-doneCh + assertErrorRestartMetrics(reg, "", "", 0, t) +} + +func setupMetrics(system *system) *prometheus.Registry { + registry := prometheus.NewRegistry() + system.initMetrics() + registry.Register(system.Metrics().IngestionErrorCounter) + return registry +} + +func assertErrorRestartMetrics(reg *prometheus.Registry, assertCurrentState string, assertNextState string, assertRestartCount float64, t assert.TestingT) { + assert := assert.New(t) + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "horizon_ingest_errors_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), assertRestartCount) + var metricCurrentState = "" + var metricNextState = "" + for _, label := range metricFamily.GetMetric()[0].GetLabel() { + if label.GetName() == "current_state" { + metricCurrentState = label.GetValue() + } + if label.GetName() == "next_state" { + metricNextState = label.GetValue() + } + } + + assert.Equal(metricCurrentState, assertCurrentState) + assert.Equal(metricNextState, assertNextState) + return + } + } + + if assertRestartCount > 0.0 { + assert.Fail("horizon_ingest_errors_total metrics were not correct") + } } type mockDBQ struct {