Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4902 Add mutex for concurrent access in GetLatestLedgerSequence #4903

Merged
merged 3 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ type CaptiveStellarCore struct {
// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta

// ledgerSequenceLock mutex is used to protect the member variables used in the
// read-only GetLatestLedgerSequence method from concurrent write operations.
// This is required when GetLatestLedgerSequence is called from other goroutine
// such as writing Prometheus metric captive_stellar_core_latest_ledger.
ledgerSequenceLock sync.RWMutex

prepared *Range // non-nil if any range is prepared
closed bool // False until the core is closed
nextLedger uint32 // next ledger expected, error w/ restart if not seen
Expand Down Expand Up @@ -307,9 +313,12 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error
// The next ledger should be the first ledger of the checkpoint containing
// the requested ledger
ran := BoundedRange(from, to)
c.ledgerSequenceLock.Lock()
c.prepared = &ran
c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from)
c.lastLedger = &to
c.ledgerSequenceLock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think should use go's defer to run the Unlock, to follow the idiomatic go approach to achieve 'finally' semantics and ensure lock is not left inconsistent/deadlocked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.


c.previousLedgerHash = nil

return nil
Expand All @@ -330,10 +339,13 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
// In the online mode we update nextLedger after streaming the first ledger.
// This is to support versions before and after/including v17.1.0 that
// introduced minimal persistent DB.
c.ledgerSequenceLock.Lock()
c.nextLedger = 0
ran := UnboundedRange(from)
c.prepared = &ran
c.lastLedger = nil
c.ledgerSequenceLock.Unlock()

c.previousLedgerHash = nil

return nil
Expand Down Expand Up @@ -647,7 +659,10 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe
)
}

c.ledgerSequenceLock.Lock()
c.nextLedger = result.LedgerSequence() + 1
c.ledgerSequenceLock.Unlock()

currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString()
c.previousLedgerHash = &currentLedgerHash

Expand Down Expand Up @@ -708,6 +723,9 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
c.stellarCoreLock.RLock()
defer c.stellarCoreLock.RUnlock()

c.ledgerSequenceLock.RLock()
defer c.ledgerSequenceLock.RUnlock()

if c.closed {
return 0, errors.New("stellar-core is no longer usable")
}
Expand Down
66 changes: 66 additions & 0 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"os"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -645,6 +646,71 @@ func TestGetLatestLedgerSequence(t *testing.T) {
mockRunner.AssertExpectations(t)
}

func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) {
var fromSeq uint32 = 64
var toSeq uint32 = 400
metaChan := make(chan metaResult, toSeq)

for i := fromSeq; i <= toSeq; i++ {
meta := buildLedgerCloseMeta(testLedgerHeader{sequence: i})
metaChan <- metaResult{
LedgerCloseMeta: &meta,
}
}
ctx, cancel := context.WithCancel(context.Background())
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("runFrom", mock.Anything, mock.Anything).Return(nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetRootHAS").
Return(historyarchive.HistoryArchiveState{
CurrentLedger: toSeq * 2,
}, nil)

mockArchive.
On("GetLedgerHeader", mock.Anything).
Return(xdr.LedgerHeaderHistoryEntry{}, nil)

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(10),
}

ledgerRange := UnboundedRange(fromSeq)
err := captiveBackend.PrepareRange(ctx, ledgerRange)
assert.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)

go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
_, _ = captiveBackend.GetLatestLedgerSequence(ctx)
}
}
}(ctx)

for i := fromSeq; i < toSeq; i++ {
_, err = captiveBackend.GetLedger(ctx, i)
sreuland marked this conversation as resolved.
Show resolved Hide resolved
assert.NoError(t, err)
}

cancel()

wg.Wait()
}

func TestCaptiveGetLedger(t *testing.T) {
tt := assert.New(t)
metaChan := make(chan metaResult, 300)
Expand Down