Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Remove returning error on Stellar-Core process exit during catchup #3260

Merged
merged 2 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions ingest/ledgerbackend/buffered_meta_pipe_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) *bufferedLed
// * Meta pipe buffer is full so it will wait until it refills.
// * The next ledger available in the buffer exceeds the meta pipe buffer size.
// In such case the method will block until LedgerCloseMeta buffer is empty.
func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) {
func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe(untilSequence uint32) (*xdr.LedgerCloseMeta, error) {
frameLength, err := xdr.ReadFrameLength(b.r)
if err != nil {
select {
Expand All @@ -99,6 +99,17 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
select {
case <-b.runner.getProcessExitChan():
if untilSequence != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also handle <-b.runner.getProcessExitChan a few lines below. Should we also have a untilSequence != 0 check in that block of code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like getProcessExitChan() is handled a few lines above as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only handle the other instances of getProcessExitChan if an error occurs (so it ignores cases when core is done but there are still ledgers in a buffer). However, I think there's a bug. In for frameLength > metaPipeBufferSize && len(b.c) > 0 we should return an error if core was closed with an error. Going to add it in a new commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK fixed in aecfd75.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to worry about the case where len(b.c) > 0 but frameLength <= metaPipeBufferSize ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such case (and on Core exit) we won't enter into a for loop but the xdr.Unmarshal below will error and we will handle the error there.

// If untilSequence != 0 it's possible that Stellar-Core process
// exits but there are still ledgers in a buffer (catchup). In such
// case we ignore cases when Stellar-Core exited with no errors.
processErr := b.runner.getProcessExitError()
if processErr != nil {
return nil, errors.Wrap(processErr, "stellar-core process exited with an error")
}
time.Sleep(time.Second)
continue
}
return nil, wrapStellarCoreRunnerError(b.runner)
case <-time.After(time.Second):
}
Expand Down Expand Up @@ -152,28 +163,20 @@ func (b *bufferedLedgerMetaReader) start(untilSequence uint32) {

for {
select {
case <-b.runner.getProcessExitChan():
b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)}
return
case <-printBufferOccupation.C:
log.Debug("captive core read-ahead buffer occupation:", len(b.c))
default:
}

meta, err := b.readLedgerMetaFromPipe()
meta, err := b.readLedgerMetaFromPipe(untilSequence)
if err != nil {
// When `GetLedger` sees the error it will close the backend. We shouldn't
// close it now because there may be some ledgers in a buffer.
b.c <- metaResult{nil, err}
return
}

select {
case b.c <- metaResult{meta, nil}:
case <-b.runner.getProcessExitChan():
b.c <- metaResult{nil, wrapStellarCoreRunnerError(b.runner)}
return
}
b.c <- metaResult{meta, nil}

if untilSequence != 0 {
if meta.LedgerSequence() >= untilSequence {
Expand Down
7 changes: 6 additions & 1 deletion ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,12 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error {
for {
select {
case <-c.stellarCoreRunner.getProcessExitChan():
return wrapStellarCoreRunnerError(c.stellarCoreRunner)
// Return only in case of Stellar-Core process error. Normal exit
// is expected in catchup when all ledgers sent to a buffer.
processErr := c.stellarCoreRunner.getProcessExitError()
if processErr != nil {
return errors.Wrap(processErr, "stellar-core process exited with an error")
}
default:
}
// Wait for the first ledger or an error
Expand Down
3 changes: 1 addition & 2 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) {
}

err := captiveBackend.PrepareRange(BoundedRange(100, 200))
assert.Error(t, err)
assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error")
assert.NoError(t, err)
}

func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
// If there's a logger, we attempt to extract metadata about the log
// entry, then redirect it to the logger. Otherwise, we just use stdout.
if r.Log == nil {
fmt.Print(line)
fmt.Println(line)
continue
}

Expand Down