diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index ab4184202f..dc6dc8dd46 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -230,7 +230,8 @@ type system struct { reapOffsets map[string]int64 - currentState State + currentStateMutex sync.Mutex + currentState State } func NewSystem(config Config) (System, error) { @@ -484,6 +485,8 @@ func (s *system) initMetrics() { } func (s *system) GetCurrentState() State { + s.currentStateMutex.Lock() + defer s.currentStateMutex.Unlock() return s.currentState } @@ -652,7 +655,10 @@ func (s *system) runStateMachine(cur stateMachineNode) error { panic("unexpected transaction") } + s.currentStateMutex.Lock() s.currentState = cur.GetState() + s.currentStateMutex.Unlock() + next, err := cur.run(s) if err != nil { logger := log.WithFields(logpkg.F{ diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 65c9f47e25..9dd902287c 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -5,6 +5,7 @@ import ( "context" "database/sql" "testing" + "time" "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" @@ -242,6 +243,46 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { historyQ.AssertExpectations(t) } +func TestCurrentStateRaceCondition(t *testing.T) { + historyQ := &mockDBQ{} + s := &system{ + historyQ: historyQ, + ctx: context.Background(), + } + + historyQ.On("GetTx").Return(nil) + historyQ.On("Begin").Return(nil) + historyQ.On("Rollback").Return(nil) + historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(1), nil) + historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil) + + timer := time.NewTimer(2000 * time.Millisecond) + getCh := make(chan bool, 1) + doneCh := make(chan bool, 1) + go func() { + var state = buildState{checkpointLedger: 8, + skipChecks: true, + stop: true} + for range getCh { + _ = s.runStateMachine(state) + } + close(doneCh) + }() + +loop: + for { + s.GetCurrentState() + select { + case <-timer.C: + break loop + default: + } + getCh <- true + } + close(getCh) + <-doneCh +} + type mockDBQ struct { mock.Mock