Skip to content

Commit

Permalink
4902 Add mutex for concurrent access in GetLatestLedgerSequence (#4903)
Browse files Browse the repository at this point in the history
  • Loading branch information
urvisavla authored Jun 13, 2023
1 parent 9ff77c2 commit e20e8bc
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
18 changes: 18 additions & 0 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ type CaptiveStellarCore struct {
// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta

// ledgerSequenceLock mutex is used to protect the member variables used in the
// read-only GetLatestLedgerSequence method from concurrent write operations.
// This is required when GetLatestLedgerSequence is called from other goroutine
// such as writing Prometheus metric captive_stellar_core_latest_ledger.
ledgerSequenceLock sync.RWMutex

prepared *Range // non-nil if any range is prepared
closed bool // False until the core is closed
nextLedger uint32 // next ledger expected, error w/ restart if not seen
Expand Down Expand Up @@ -307,6 +313,9 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error
// The next ledger should be the first ledger of the checkpoint containing
// the requested ledger
ran := BoundedRange(from, to)
c.ledgerSequenceLock.Lock()
defer c.ledgerSequenceLock.Unlock()

c.prepared = &ran
c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from)
c.lastLedger = &to
Expand All @@ -330,6 +339,9 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
// In the online mode we update nextLedger after streaming the first ledger.
// This is to support versions before and after/including v17.1.0 that
// introduced minimal persistent DB.
c.ledgerSequenceLock.Lock()
defer c.ledgerSequenceLock.Unlock()

c.nextLedger = 0
ran := UnboundedRange(from)
c.prepared = &ran
Expand Down Expand Up @@ -647,7 +659,10 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe
)
}

c.ledgerSequenceLock.Lock()
c.nextLedger = result.LedgerSequence() + 1
c.ledgerSequenceLock.Unlock()

currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString()
c.previousLedgerHash = &currentLedgerHash

Expand Down Expand Up @@ -708,6 +723,9 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
c.stellarCoreLock.RLock()
defer c.stellarCoreLock.RUnlock()

c.ledgerSequenceLock.RLock()
defer c.ledgerSequenceLock.RUnlock()

if c.closed {
return 0, errors.New("stellar-core is no longer usable")
}
Expand Down
66 changes: 66 additions & 0 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"os"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -645,6 +646,71 @@ func TestGetLatestLedgerSequence(t *testing.T) {
mockRunner.AssertExpectations(t)
}

func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) {
var fromSeq uint32 = 64
var toSeq uint32 = 400
metaChan := make(chan metaResult, toSeq)

for i := fromSeq; i <= toSeq; i++ {
meta := buildLedgerCloseMeta(testLedgerHeader{sequence: i})
metaChan <- metaResult{
LedgerCloseMeta: &meta,
}
}
ctx, cancel := context.WithCancel(context.Background())
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("runFrom", mock.Anything, mock.Anything).Return(nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetRootHAS").
Return(historyarchive.HistoryArchiveState{
CurrentLedger: toSeq * 2,
}, nil)

mockArchive.
On("GetLedgerHeader", mock.Anything).
Return(xdr.LedgerHeaderHistoryEntry{}, nil)

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(10),
}

ledgerRange := UnboundedRange(fromSeq)
err := captiveBackend.PrepareRange(ctx, ledgerRange)
assert.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)

go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
_, _ = captiveBackend.GetLatestLedgerSequence(ctx)
}
}
}(ctx)

for i := fromSeq; i < toSeq; i++ {
_, err = captiveBackend.GetLedger(ctx, i)
assert.NoError(t, err)
}

cancel()

wg.Wait()
}

func TestCaptiveGetLedger(t *testing.T) {
tt := assert.New(t)
metaChan := make(chan metaResult, 300)
Expand Down

0 comments on commit e20e8bc

Please sign in to comment.