diff --git a/ingest/ledgerbackend/buffered_meta_pipe_reader.go b/ingest/ledgerbackend/buffered_meta_pipe_reader.go index dff51eeae8..1964767d70 100644 --- a/ingest/ledgerbackend/buffered_meta_pipe_reader.go +++ b/ingest/ledgerbackend/buffered_meta_pipe_reader.go @@ -84,7 +84,7 @@ func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) *bufferedLed // * Meta pipe buffer is full so it will wait until it refills. // * The next ledger available in the buffer exceeds the meta pipe buffer size. // In such case the method will block until LedgerCloseMeta buffer is empty. -func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) { +func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe(untilSequence uint32) (*xdr.LedgerCloseMeta, error) { frameLength, err := xdr.ReadFrameLength(b.r) if err != nil { select { @@ -99,6 +99,17 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet // Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage. select { case <-b.runner.getProcessExitChan(): + if untilSequence != 0 { + // If untilSequence != 0 it's possible that Stellar-Core process + // exits but there are still ledgers in a buffer (catchup). In such + // case we ignore cases when Stellar-Core exited with no errors. + processErr := b.runner.getProcessExitError() + if processErr != nil { + return nil, errors.Wrap(processErr, "stellar-core process exited with an error") + } + time.Sleep(time.Second) + continue + } return nil, wrapStellarCoreRunnerError(b.runner) case <-time.After(time.Second): } @@ -152,15 +163,12 @@ func (b *bufferedLedgerMetaReader) start(untilSequence uint32) { for { select { - case <-b.runner.getProcessExitChan(): - b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)} - return case <-printBufferOccupation.C: log.Debug("captive core read-ahead buffer occupation:", len(b.c)) default: } - meta, err := b.readLedgerMetaFromPipe() + meta, err := b.readLedgerMetaFromPipe(untilSequence) if err != nil { // When `GetLedger` sees the error it will close the backend. We shouldn't // close it now because there may be some ledgers in a buffer. @@ -168,12 +176,7 @@ func (b *bufferedLedgerMetaReader) start(untilSequence uint32) { return } - select { - case b.c <- metaResult{meta, nil}: - case <-b.runner.getProcessExitChan(): - b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)} - return - } + b.c <- metaResult{meta, nil} if untilSequence != 0 { if meta.LedgerSequence() >= untilSequence { diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 917aa487a7..567de34112 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -388,7 +388,12 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { for { select { case <-c.stellarCoreRunner.getProcessExitChan(): - return wrapStellarCoreRunnerError(c.stellarCoreRunner) + // Return only in case of Stellar-Core process error. Normal exit + // is expected in catchup when all ledgers sent to a buffer. + processErr := c.stellarCoreRunner.getProcessExitError() + if processErr != nil { + return errors.Wrap(processErr, "stellar-core process exited with an error") + } default: } // Wait for the first ledger or an error diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index ff4729d8d6..2d364fbedd 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -264,8 +264,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { } err := captiveBackend.PrepareRange(BoundedRange(100, 200)) - assert.Error(t, err) - assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error") + assert.NoError(t, err) } func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) { diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 689230979f..de99c5e571 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -140,7 +140,7 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer { // If there's a logger, we attempt to extract metadata about the log // entry, then redirect it to the logger. Otherwise, we just use stdout. if r.Log == nil { - fmt.Print(line) + fmt.Println(line) continue }