-
Notifications
You must be signed in to change notification settings - Fork 501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest/ledgerbackend: Decouple meta pipe buffering in CaptiveStellarCore backend #3187
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
package ledgerbackend | ||
|
||
import ( | ||
"bufio" | ||
"io" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/stellar/go/support/log" | ||
"github.com/stellar/go/xdr" | ||
) | ||
|
||
const ( | ||
// The constants below define sizes of metaPipeBufferSize (binary) and | ||
// ledgerReadAheadBufferSize (in ledgers). In general: | ||
// | ||
// metaPipeBufferSize >= | ||
// ledgerReadAheadBufferSize * max over networks (average ledger size in bytes) | ||
// | ||
// so that meta pipe buffer always have binary data that can be unmarshaled into | ||
// ledger buffer. | ||
// After checking a few latest ledgers in pubnet and testnet the average size | ||
// is: 100,000 and 5,000 bytes respectively. | ||
|
||
// metaPipeBufferSize defines the meta pipe buffer size. We need at least | ||
// a couple MB to ensure there are at least a few ledgers captive core can | ||
// unmarshal into read-ahead buffer while waiting for client to finish | ||
// processing previous ledgers. | ||
metaPipeBufferSize = 10 * 1024 * 1024 | ||
// ledgerReadAheadBufferSize defines the size (in ledgers) of read ahead | ||
// buffer that stores unmarshalled ledgers. This is especially important in | ||
// an online mode when GetLedger calls are not blocking. In such case, clients | ||
// usually wait for a specific time duration before checking if the ledger is | ||
// available. When catching up and small buffer this can increase the overall | ||
// time because ledgers are not available. | ||
ledgerReadAheadBufferSize = 20 | ||
) | ||
|
||
type metaResult struct { | ||
*xdr.LedgerCloseMeta | ||
err error | ||
} | ||
|
||
// bufferedLedgerMetaReader is responsible for buffering meta pipe data in a | ||
// fast and safe manner and unmarshaling it into XDR objects. | ||
// | ||
// It solves the following issues: | ||
// | ||
// * Decouples buffering from stellarCoreRunner so it can focus on running core. | ||
// * Decouples unmarshalling and buffering of LedgerCloseMeta's from CaptiveCore. | ||
// * By adding buffering it allows unmarshaling the ledgers available in Stellar-Core | ||
// while previous ledger are being processed. | ||
// * Limits memory usage in case of large ledgers are closed by the network. | ||
// | ||
// Internally, it keeps two buffers: bufio.Reader with binary ledger data and | ||
// buffered channel with unmarshaled xdr.LedgerCloseMeta objects ready for | ||
// processing. The first buffer removes overhead time connected to reading from | ||
// a file. The second buffer allows unmarshaling binary data into XDR objects | ||
// (which can be a bottleneck) while clients are processing previous ledgers. | ||
// | ||
// Finally, when a large ledger (larger than binary buffer) is closed it waits | ||
// until xdr.LedgerCloseMeta objects channel is empty. This prevents memory | ||
// exhaustion when network closes a series a large ledgers. | ||
type bufferedLedgerMetaReader struct { | ||
r *bufio.Reader | ||
c chan metaResult | ||
runner stellarCoreRunnerInterface | ||
} | ||
|
||
// newBufferedLedgerMetaReader creates a new meta reader that will shutdown | ||
// when stellar-core terminates. | ||
func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) bufferedLedgerMetaReader { | ||
return bufferedLedgerMetaReader{ | ||
c: make(chan metaResult, ledgerReadAheadBufferSize), | ||
r: bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize), | ||
runner: runner, | ||
} | ||
} | ||
|
||
// readLedgerMetaFromPipe unmarshalls the next ledger from meta pipe. | ||
// It can block for two reasons: | ||
// * Meta pipe buffer is full so it will wait until it refills. | ||
// * The next ledger available in the buffer exceeds the meta pipe buffer size. | ||
// In such case the method will block until LedgerCloseMeta buffer is empty. | ||
func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) { | ||
frameLength, err := xdr.ReadFrameLength(b.r) | ||
if err != nil { | ||
select { | ||
case <-b.runner.getProcessExitChan(): | ||
return nil, errors.New("stellar-core process not-running") | ||
default: | ||
return nil, errors.Wrap(err, "error reading frame length") | ||
} | ||
} | ||
|
||
for frameLength > metaPipeBufferSize && len(b.c) > 0 { | ||
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage. | ||
select { | ||
case <-b.runner.getProcessExitChan(): | ||
return nil, errors.New("stellar-core process not-running") | ||
case <-time.After(time.Second): | ||
} | ||
} | ||
|
||
var xlcm xdr.LedgerCloseMeta | ||
_, err = xdr.Unmarshal(b.r, &xlcm) | ||
if err != nil { | ||
if err == io.EOF { | ||
err = errors.Wrap(err, "got EOF from subprocess") | ||
} | ||
err = errors.Wrap(err, "unmarshalling framed LedgerCloseMeta") | ||
|
||
select { | ||
case <-b.runner.getProcessExitChan(): | ||
return nil, errors.New("stellar-core process not-running") | ||
default: | ||
return nil, err | ||
} | ||
} | ||
return &xlcm, nil | ||
} | ||
|
||
func (b *bufferedLedgerMetaReader) GetChannel() <-chan metaResult { | ||
return b.c | ||
} | ||
|
||
func (b *bufferedLedgerMetaReader) Start(untilSequence uint32) { | ||
printBufferOccupation := time.NewTicker(5 * time.Second) | ||
defer printBufferOccupation.Stop() | ||
for { | ||
select { | ||
case <-b.runner.getProcessExitChan(): | ||
return | ||
case <-printBufferOccupation.C: | ||
log.Debug("captive core read-ahead buffer occupation:", len(b.c)) | ||
default: | ||
} | ||
|
||
meta, err := b.readLedgerMetaFromPipe() | ||
if err != nil { | ||
// When `GetLedger` sees the error it will close the backend. We shouldn't | ||
// close it now because there may be some ledgers in a buffer. | ||
select { | ||
case b.c <- metaResult{nil, err}: | ||
case <-b.runner.getProcessExitChan(): | ||
} | ||
return | ||
} | ||
select { | ||
case b.c <- metaResult{meta, nil}: | ||
case <-b.runner.getProcessExitChan(): | ||
return | ||
} | ||
|
||
if untilSequence != 0 { | ||
if meta.LedgerSequence() >= untilSequence { | ||
// we are done | ||
return | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can simplify the shutdown logic further by using contexts and passing them down to each component down to the
stellarCoreRunner
instance. WithinstellarCoreRunner
we can use https://golang.org/pkg/os/exec/#CommandContext to execute stellar core. Perhaps we can make an issue separate from this PR to investigate if using contexts would help.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into using
CommandContext
and I think it won't help much. We still needclose()
method to do some cleanup and we can't do it with context alone. But we can definitely investigate how to write a shutdown code for more complicated object connections. For example, to figure out how to closeCaptiveCore
,bufferedLedgerMetaReader
andstellarCoreRunner
I drew a graph to understand the dependencies.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue: #3200.