diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index fb3d2d181d..b95fe31c33 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -416,6 +416,14 @@ func (r resumeState) run(s *system) (transition, error) { log.WithField("ingestLedger", ingestLedger). WithField("lastIngestedLedger", lastIngestedLedger). Info("bumping ingest ledger to next ledger after ingested ledger in db") + + // Update cursor if there's more than one ingesting instance: either + // Captive-Core or DB ingestion connected to another Stellar-Core. + if err = s.updateCursor(lastIngestedLedger); err != nil { + // Don't return updateCursor error. + log.WithError(err).Warn("error updating stellar-core cursor") + } + return retryResume(resumeState{ latestSuccessfullyProcessedLedger: lastIngestedLedger, }), nil diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index d3b79a13d7..863c425094 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -304,6 +304,13 @@ func (s *ResumeTestTestSuite) TestBumpIngestLedger() { s.historyQ.On("Begin").Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(101), nil).Once() + s.stellarCoreClient.On( + "SetCursor", + mock.AnythingOfType("*context.timerCtx"), + defaultCoreCursorName, + int32(101), + ).Return(errors.New("my error")).Once() + next, err := resumeState{latestSuccessfullyProcessedLedger: 99}.run(s.system) s.Assert().NoError(err) s.Assert().Equal(