Skip to content

Commit

Permalink
services/horizon: Protect 'currentState' variable using Mutex to prev…
Browse files Browse the repository at this point in the history
…ent race condition. (#4889)
  • Loading branch information
urvisavla authored Jun 5, 2023
1 parent 7a7b140 commit b060996
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
8 changes: 7 additions & 1 deletion services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ type system struct {

reapOffsets map[string]int64

currentState State
currentStateMutex sync.Mutex
currentState State
}

func NewSystem(config Config) (System, error) {
Expand Down Expand Up @@ -484,6 +485,8 @@ func (s *system) initMetrics() {
}

func (s *system) GetCurrentState() State {
s.currentStateMutex.Lock()
defer s.currentStateMutex.Unlock()
return s.currentState
}

Expand Down Expand Up @@ -652,7 +655,10 @@ func (s *system) runStateMachine(cur stateMachineNode) error {
panic("unexpected transaction")
}

s.currentStateMutex.Lock()
s.currentState = cur.GetState()
s.currentStateMutex.Unlock()

next, err := cur.run(s)
if err != nil {
logger := log.WithFields(logpkg.F{
Expand Down
41 changes: 41 additions & 0 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"database/sql"
"testing"
"time"

"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -242,6 +243,46 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) {
historyQ.AssertExpectations(t)
}

func TestCurrentStateRaceCondition(t *testing.T) {
historyQ := &mockDBQ{}
s := &system{
historyQ: historyQ,
ctx: context.Background(),
}

historyQ.On("GetTx").Return(nil)
historyQ.On("Begin").Return(nil)
historyQ.On("Rollback").Return(nil)
historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(1), nil)
historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil)

timer := time.NewTimer(2000 * time.Millisecond)
getCh := make(chan bool, 1)
doneCh := make(chan bool, 1)
go func() {
var state = buildState{checkpointLedger: 8,
skipChecks: true,
stop: true}
for range getCh {
_ = s.runStateMachine(state)
}
close(doneCh)
}()

loop:
for {
s.GetCurrentState()
select {
case <-timer.C:
break loop
default:
}
getCh <- true
}
close(getCh)
<-doneCh
}

type mockDBQ struct {
mock.Mock

Expand Down

0 comments on commit b060996

Please sign in to comment.