Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Use context to handle termination and cleanup of captive core #3278

Merged
merged 21 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions exp/tools/captive-core-start-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ func main() {
func check(ledger uint32) bool {
c, err := ledgerbackend.NewCaptive(
ledgerbackend.CaptiveCoreConfig{
BinaryPath: "stellar-core",
ConfigAppendPath: "stellar-core-standalone2.cfg",
NetworkPassphrase: "Standalone Network ; February 2017",
HistoryArchiveURLs: []string{"http://localhost:1570"},
CheckpointFrequency: 64,
BinaryPath: "stellar-core",
ConfigAppendPath: "stellar-core-standalone2.cfg",
NetworkPassphrase: "Standalone Network ; February 2017",
HistoryArchiveURLs: []string{"http://localhost:1570"},
},
)
if err != nil {
Expand Down
9 changes: 4 additions & 5 deletions ingest/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,10 @@ func Example_changes() {
// Requires Stellar-Core 13.2.0+
backend, err := ledgerbackend.NewCaptive(
ledgerbackend.CaptiveCoreConfig{
BinaryPath: "/bin/stellar-core",
ConfigAppendPath: "/opt/stellar-core.cfg",
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: []string{archiveURL},
CheckpointFrequency: 64,
BinaryPath: "/bin/stellar-core",
ConfigAppendPath: "/opt/stellar-core.cfg",
NetworkPassphrase: networkPassphrase,
HistoryArchiveURLs: []string{archiveURL},
},
)
if err != nil {
Expand Down
92 changes: 21 additions & 71 deletions ingest/ledgerbackend/buffered_meta_pipe_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledgerbackend

import (
"bufio"
"context"
"io"
"time"

Expand Down Expand Up @@ -62,20 +63,18 @@ type metaResult struct {
// until xdr.LedgerCloseMeta objects channel is empty. This prevents memory
// exhaustion when network closes a series a large ledgers.
type bufferedLedgerMetaReader struct {
r *bufio.Reader
c chan metaResult
runner stellarCoreRunnerInterface
closed chan struct{}
r *bufio.Reader
c chan metaResult
ctx context.Context
}

// newBufferedLedgerMetaReader creates a new meta reader that will shutdown
// when stellar-core terminates.
func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) *bufferedLedgerMetaReader {
func newBufferedLedgerMetaReader(ctx context.Context, reader io.ReadCloser) *bufferedLedgerMetaReader {
return &bufferedLedgerMetaReader{
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize),
runner: runner,
closed: make(chan struct{}),
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: bufio.NewReaderSize(reader, metaPipeBufferSize),
ctx: ctx,
tamirms marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -84,51 +83,25 @@ func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) *bufferedLed
// * Meta pipe buffer is full so it will wait until it refills.
// * The next ledger available in the buffer exceeds the meta pipe buffer size.
// In such case the method will block until LedgerCloseMeta buffer is empty.
func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe(untilSequence uint32) (*xdr.LedgerCloseMeta, error) {
func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) {
frameLength, err := xdr.ReadFrameLength(b.r)
if err != nil {
select {
case <-b.runner.getProcessExitChan():
return nil, wrapStellarCoreRunnerError(b.runner)
default:
return nil, errors.Wrap(err, "error reading frame length")
}
return nil, errors.Wrap(err, "error reading frame length")
}

for frameLength > metaPipeBufferSize && len(b.c) > 0 {
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
select {
case <-b.runner.getProcessExitChan():
if untilSequence != 0 {
// If untilSequence != 0 it's possible that Stellar-Core process
// exits but there are still ledgers in a buffer (catchup). In such
// case we ignore cases when Stellar-Core exited with no errors.
processErr := b.runner.getProcessExitError()
if processErr != nil {
return nil, errors.Wrap(processErr, "stellar-core process exited with an error")
}
time.Sleep(time.Second)
continue
}
return nil, wrapStellarCoreRunnerError(b.runner)
case <-b.ctx.Done():
return nil, b.ctx.Err()
case <-time.After(time.Second):
}
}

var xlcm xdr.LedgerCloseMeta
_, err = xdr.Unmarshal(b.r, &xlcm)
if err != nil {
if err == io.EOF {
err = errors.Wrap(err, "got EOF from subprocess")
}
err = errors.Wrap(err, "unmarshalling framed LedgerCloseMeta")

select {
case <-b.runner.getProcessExitChan():
return nil, wrapStellarCoreRunnerError(b.runner)
default:
return nil, err
}
return nil, errors.Wrap(err, "unmarshalling framed LedgerCloseMeta")
}
return &xlcm, nil
}
Expand All @@ -137,52 +110,29 @@ func (b *bufferedLedgerMetaReader) getChannel() <-chan metaResult {
return b.c
}

func (b *bufferedLedgerMetaReader) waitForClose() {
// If buffer is full, keep reading to make sure it receives
// a shutdown signal from stellarCoreRunner.
loop:
for {
select {
case <-b.c:
case <-b.closed:
break loop
}
}
}

// Start starts an internal go routine that reads binary ledger data into
// internal buffers. The go routine returns when Stellar-Core process exits
// however it won't happen instantly when data is read. A blocking method:
// waitForClose() can be used to block until go routine returns.
func (b *bufferedLedgerMetaReader) start(untilSequence uint32) {
// internal buffers. The go routine returns when it encounters an error (including io.EOF)
// or its context is terminated.
func (b *bufferedLedgerMetaReader) start() {
printBufferOccupation := time.NewTicker(5 * time.Second)
defer func() {
printBufferOccupation.Stop()
close(b.closed)
}()
defer printBufferOccupation.Stop()
defer close(b.c)

for {
select {
case <-printBufferOccupation.C:
log.Debug("captive core read-ahead buffer occupation:", len(b.c))
case <-b.ctx.Done():
return
default:
}

meta, err := b.readLedgerMetaFromPipe(untilSequence)
meta, err := b.readLedgerMetaFromPipe()
if err != nil {
// When `GetLedger` sees the error it will close the backend. We shouldn't
// close it now because there may be some ledgers in a buffer.
b.c <- metaResult{nil, err}
return
}

b.c <- metaResult{meta, nil}

if untilSequence != 0 {
if meta.LedgerSequence() >= untilSequence {
// we are done
return
}
}
}
}
Loading