Skip to content

Commit

Permalink
ingest/ledgerbackend: Fix flaky Captive Core tests (#3213)
Browse files Browse the repository at this point in the history
This commit fixes two issues that cause Captive Core tests flaky:
* It was possible that `bufferedLedgerMetaReader` go routine was still
running after calling `CaptiveStellarCore.Close()` because it returned
only when getting an exit signal from `stellarCoreRunner` (could happen
after `CaptiveStellarCore.Close()`). To fix it a new `WaitForClose()`
method was created that blocks until go routine returns.
* `bufferedLedgerMetaReader.readLedgerMetaFromPipe()` was retuning error
on Stellar-Core process graceful exit. Now it returns an error only when
Stellar-Core was closed with an error.
  • Loading branch information
bartekn authored Nov 16, 2020
1 parent 21e076a commit f9ab742
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 60 deletions.
47 changes: 35 additions & 12 deletions ingest/ledgerbackend/buffered_meta_pipe_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ type bufferedLedgerMetaReader struct {
r *bufio.Reader
c chan metaResult
runner stellarCoreRunnerInterface
closed chan struct{}
}

// newBufferedLedgerMetaReader creates a new meta reader that will shutdown
// when stellar-core terminates.
func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) bufferedLedgerMetaReader {
return bufferedLedgerMetaReader{
func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) *bufferedLedgerMetaReader {
return &bufferedLedgerMetaReader{
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize),
runner: runner,
closed: make(chan struct{}),
}
}

Expand All @@ -87,7 +89,7 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
if err != nil {
select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
return nil, wrapStellarCoreRunnerError(b.runner)
default:
return nil, errors.Wrap(err, "error reading frame length")
}
Expand All @@ -97,7 +99,7 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
return nil, wrapStellarCoreRunnerError(b.runner)
case <-time.After(time.Second):
}
}
Expand All @@ -112,24 +114,46 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet

select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
return nil, wrapStellarCoreRunnerError(b.runner)
default:
return nil, err
}
}
return &xlcm, nil
}

func (b *bufferedLedgerMetaReader) GetChannel() <-chan metaResult {
func (b *bufferedLedgerMetaReader) getChannel() <-chan metaResult {
return b.c
}

func (b *bufferedLedgerMetaReader) Start(untilSequence uint32) {
func (b *bufferedLedgerMetaReader) waitForClose() {
// If buffer is full, keep reading to make sure it receives
// a shutdown signal from stellarCoreRunner.
loop:
for {
select {
case <-b.c:
case <-b.closed:
break loop
}
}
}

// Start starts an internal go routine that reads binary ledger data into
// internal buffers. The go routine returns when Stellar-Core process exits
// however it won't happen instantly when data is read. A blocking method:
// waitForClose() can be used to block until go routine returns.
func (b *bufferedLedgerMetaReader) start(untilSequence uint32) {
printBufferOccupation := time.NewTicker(5 * time.Second)
defer printBufferOccupation.Stop()
defer func() {
printBufferOccupation.Stop()
close(b.closed)
}()

for {
select {
case <-b.runner.getProcessExitChan():
b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)}
return
case <-printBufferOccupation.C:
log.Debug("captive core read-ahead buffer occupation:", len(b.c))
Expand All @@ -140,15 +164,14 @@ func (b *bufferedLedgerMetaReader) Start(untilSequence uint32) {
if err != nil {
// When `GetLedger` sees the error it will close the backend. We shouldn't
// close it now because there may be some ledgers in a buffer.
select {
case b.c <- metaResult{nil, err}:
case <-b.runner.getProcessExitChan():
}
b.c <- metaResult{nil, err}
return
}

select {
case b.c <- metaResult{meta, nil}:
case <-b.runner.getProcessExitChan():
b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)}
return
}

Expand Down
79 changes: 43 additions & 36 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,28 @@ type CaptiveStellarCore struct {
archive historyarchive.ArchiveInterface
ledgerHashStore TrustedLedgerHashStore

ledgerBuffer bufferedLedgerMetaReader
// Quick note on how shutdown works:
// If Stellar-Core exits, the exit signal is "catched" by bufferedLedgerMetaReader
// (which ends it's go routine) and later propagated via metaResult to
// CaptiveStellarCore.
// If user calls CaptiveStellarCore.Close(), it kills Stellar-Core process
// and the rest is handles by the process explained above.
ledgerBuffer *bufferedLedgerMetaReader
stellarCoreRunner stellarCoreRunnerInterface

// For testing
stellarCoreRunnerFactory func(configPath string) (stellarCoreRunnerInterface, error)

stellarCoreRunner stellarCoreRunnerInterface
cachedMeta *xdr.LedgerCloseMeta

// Defines if the blocking mode (off by default) is on or off. In blocking mode,
// calling GetLedger blocks until the requested ledger is available. This is useful
// for scenarios when Horizon consumes ledgers faster than Stellar-Core produces them
// and using `time.Sleep` when ledger is not available can actually slow entire
// ingestion process.
blocking bool

// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta

nextLedger uint32 // next ledger expected, error w/ restart if not seen
lastLedger *uint32 // end of current segment if offline, nil if online

Expand Down Expand Up @@ -203,7 +210,7 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error

// read-ahead buffer
c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner)
go c.ledgerBuffer.Start(to)
go c.ledgerBuffer.start(to)
return nil
}

Expand Down Expand Up @@ -262,7 +269,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {

// read-ahead buffer
c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner)
go c.ledgerBuffer.Start(0)
go c.ledgerBuffer.start(0)

// if nextLedger is behind - fast-forward until expected ledger
if c.nextLedger < from {
Expand Down Expand Up @@ -365,25 +372,14 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error {
return errors.Wrap(err, "opening subprocess")
}

metaPipe := c.stellarCoreRunner.getMetaPipe()
if metaPipe == nil {
return errors.New("missing metadata pipe")
}

for {
select {
case <-c.stellarCoreRunner.getProcessExitChan():
processErr := c.stellarCoreRunner.getProcessExitError()
if processErr != nil {
err = errors.Wrap(processErr, "stellar-core process exited with an error")
} else {
err = errors.New("stellar-core process exited unexpectedly without an error")
}
return err
return wrapStellarCoreRunnerError(c.stellarCoreRunner)
default:
}
// Wait for the first ledger or an error
if len(c.ledgerBuffer.GetChannel()) > 0 {
if len(c.ledgerBuffer.getChannel()) > 0 {
break
}
time.Sleep(c.waitIntervalPrepareRange)
Expand Down Expand Up @@ -449,26 +445,25 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe
)
}

if c.lastLedger != nil && sequence > *c.lastLedger {
return false, xdr.LedgerCloseMeta{}, errors.Errorf(
"reading past bounded range (requested sequence=%d, last ledger in range=%d)",
sequence,
*c.lastLedger,
)
}

// Now loop along the range until we find the ledger we want.
var errOut error
loop:
for {
if !c.blocking && len(c.ledgerBuffer.GetChannel()) == 0 {
if !c.blocking && len(c.ledgerBuffer.getChannel()) == 0 {
return false, xdr.LedgerCloseMeta{}, nil
}

var result metaResult
select {
case <-c.stellarCoreRunner.getProcessExitChan():
processErr := c.stellarCoreRunner.getProcessExitError()
if processErr != nil {
errOut = errors.Wrap(processErr, "stellar-core process exited with an error")
} else {
errOut = errors.New("stellar-core process exited unexpectedly without an error")
}
break loop
case result = <-c.ledgerBuffer.GetChannel():
}
// We don't have to handle getProcessExitChan() because this is handled
// in bufferedLedgerMetaReader (will send an error to the channel).
result := <-c.ledgerBuffer.getChannel()
if result.err != nil {
errOut = result.err
break loop
Expand Down Expand Up @@ -514,7 +509,7 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence() (uint32, error) {
}

if c.lastLedger == nil {
return c.nextLedger - 1 + uint32(len(c.ledgerBuffer.GetChannel())), nil
return c.nextLedger - 1 + uint32(len(c.ledgerBuffer.getChannel())), nil
}
return *c.lastLedger, nil
}
Expand All @@ -526,9 +521,6 @@ func (c *CaptiveStellarCore) isClosed() bool {
// Close closes existing Stellar-Core process, streaming sessions and removes all
// temporary files.
func (c *CaptiveStellarCore) Close() error {
c.nextLedger = 0
c.lastLedger = nil

if c.stellarCoreRunner != nil {
// Closing stellarCoreRunner will automatically close bufferedLedgerMetaReader
// because it's listening for getProcessExitChan().
Expand All @@ -537,7 +529,22 @@ func (c *CaptiveStellarCore) Close() error {
if err != nil {
return errors.Wrap(err, "error closing stellar-core subprocess")
}

// Wait for bufferedLedgerMetaReader go routine to return.
c.ledgerBuffer.waitForClose()
c.ledgerBuffer = nil
}

c.nextLedger = 0
c.lastLedger = nil

return nil
}

func wrapStellarCoreRunnerError(r stellarCoreRunnerInterface) error {
processErr := r.getProcessExitError()
if processErr != nil {
return errors.Wrap(processErr, "stellar-core process exited with an error")
}
return errors.New("stellar-core process exited unexpectedly without an error")
}
Loading

0 comments on commit f9ab742

Please sign in to comment.