Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Decouple meta pipe buffering in CaptiveStellarCore backend #3187

Merged
merged 5 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions ingest/ledgerbackend/buffered_meta_pipe_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package ledgerbackend

import (
"bufio"
"io"
"time"

"github.com/pkg/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

const (
// The constants below define sizes of metaPipeBufferSize (binary) and
// ledgerReadAheadBufferSize (in ledgers). In general:
//
// metaPipeBufferSize >=
// ledgerReadAheadBufferSize * max over networks (average ledger size in bytes)
//
// so that meta pipe buffer always have binary data that can be unmarshaled into
// ledger buffer.
// After checking a few latest ledgers in pubnet and testnet the average size
// is: 100,000 and 5,000 bytes respectively.

// metaPipeBufferSize defines the meta pipe buffer size. We need at least
// a couple MB to ensure there are at least a few ledgers captive core can
// unmarshal into read-ahead buffer while waiting for client to finish
// processing previous ledgers.
metaPipeBufferSize = 10 * 1024 * 1024
// ledgerReadAheadBufferSize defines the size (in ledgers) of read ahead
// buffer that stores unmarshalled ledgers. This is especially important in
// an online mode when GetLedger calls are not blocking. In such case, clients
// usually wait for a specific time duration before checking if the ledger is
// available. When catching up and small buffer this can increase the overall
// time because ledgers are not available.
ledgerReadAheadBufferSize = 100
Copy link
Contributor

@tamirms tamirms Nov 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the worst case scenario we can have 1000mb of ledgers in the queue. that scenario is unlikely to happen. however, if you're interested, it should be possible to ensure the queued ledgers don't exceed a fixed threshold of memory usage by using a circular buffer:

diff --git a/ingest/ledgerbackend/buffered_meta_pipe_reader.go b/ingest/ledgerbackend/buffered_meta_pipe_reader.go
index 14707c76..0619e16e 100644
--- a/ingest/ledgerbackend/buffered_meta_pipe_reader.go
+++ b/ingest/ledgerbackend/buffered_meta_pipe_reader.go
@@ -2,6 +2,7 @@ package ledgerbackend
 
 import (
 	"bufio"
+	"fmt"
 	"io"
 	"time"
 
@@ -34,8 +35,59 @@ const (
 	// available. When catching up and small buffer this can increase the overall
 	// time because ledgers are not available.
 	ledgerReadAheadBufferSize = 100
+	maxBytesBuffered          = 100 * 1024 * 1024 // 100mb
 )
 
+type stream struct {
+	queue  []uint32
+	length int
+	sum    uint32
+	idx    int
+}
+
+func newStream(capacity int) *stream {
+	return &stream{
+		queue: make([]uint32, capacity),
+	}
+}
+
+func (s *stream) seek(num int) error {
+	if num > s.length {
+		return fmt.Errorf("seek %v exceeds queue length %v", num, s.length)
+	}
+
+	for num > 0 {
+		s.sum -= s.queue[s.idx]
+		s.length--
+		s.idx = (s.idx + 1) % len(s.queue)
+	}
+	return nil
+}
+
+func (s *stream) add(val uint32) {
+	s.sum += val
+	tail := (s.idx + s.length) % len(s.queue)
+	if s.length == len(s.queue) {
+		s.sum -= s.queue[s.idx]
+		s.idx = (s.idx + 1) % len(s.queue)
+	} else {
+		s.length++
+	}
+	s.queue[tail] = val
+}
+
+func (s *stream) tailSum(n int) (uint32, error) {
+	if n > s.length {
+		return 0, fmt.Errorf("tail %v exceeds queue length %v", n, s.length)
+	}
+	if n < s.length {
+		if err := s.seek(s.length - n); err != nil {
+			return 0, err
+		}
+	}
+	return s.sum, nil
+}
+
 type metaResult struct {
 	*xdr.LedgerCloseMeta
 	err error
@@ -64,6 +116,7 @@ type metaResult struct {
 type bufferedLedgerMetaReader struct {
 	r      *bufio.Reader
 	c      chan metaResult
+	queue  *stream
 	runner stellarCoreRunnerInterface
 }
 
@@ -72,6 +125,7 @@ type bufferedLedgerMetaReader struct {
 func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) bufferedLedgerMetaReader {
 	return bufferedLedgerMetaReader{
 		c:      make(chan metaResult, ledgerReadAheadBufferSize),
+		queue:  newStream(ledgerReadAheadBufferSize),
 		r:      bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize),
 		runner: runner,
 	}
@@ -93,7 +147,14 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
 		}
 	}
 
-	for frameLength > metaPipeBufferSize && len(b.c) > 0 {
+	for {
+		totalBytes, err := b.queue.tailSum(len(b.c))
+		if err != nil {
+			return nil, errors.Wrap(err, "could not obtain buffer size")
+		}
+		if totalBytes == 0 || totalBytes+frameLength <= maxBytesBuffered {
+			break
+		}
 		// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
 		select {
 		case <-b.runner.getProcessExitChan():
@@ -117,6 +178,8 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
 			return nil, err
 		}
 	}
+
+	b.queue.add(frameLength)
 	return &xlcm, nil
 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense but because this is unlikely I'd vote for simply making the ledgerReadAheadBufferSize smaller, say, 20 (so the maximum RAM usage is 200 MB). We can open an issue to do it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in ee3226c.

)

type metaResult struct {
*xdr.LedgerCloseMeta
err error
}

// bufferedLedgerMetaReader is responsible for buffering meta pipe data in a
// fast and safe manner and unmarshaling it into XDR objects.
//
// It solves the following issues:
//
// * Decouples buffering from stellarCoreRunner so it can focus on running core.
// * Decouples unmarshalling and buffering of LedgerCloseMeta's from CaptiveCore.
// * By adding buffering it allows unmarshaling the ledgers available in Stellar-Core
// while previous ledger are being processed.
// * Limits memory usage in case of large ledgers are closed by the network.
//
// Internally, it keeps two buffers: bufio.Reader with binary ledger data and
// buffered channel with unmarshaled xdr.LedgerCloseMeta objects ready for
// processing. The first buffer removes overhead time connected to reading from
// a file. The second buffer allows unmarshaling binary data into XDR objects
// (which can be a bottleneck) while clients are processing previous ledgers.
//
// Finally, when a large ledger (larger than binary buffer) is closed it waits
// until xdr.LedgerCloseMeta objects channel is empty. This prevents memory
// exhaustion when network closes a series a large ledgers.
type bufferedLedgerMetaReader struct {
r *bufio.Reader
c chan metaResult
runner stellarCoreRunnerInterface
}

// newBufferedLedgerMetaReader creates a new meta reader that will shutdown
// when stellar-core terminates.
func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) bufferedLedgerMetaReader {
return bufferedLedgerMetaReader{
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize),
runner: runner,
}
}

// readLedgerMetaFromPipe unmarshalls the next ledger from meta pipe.
// It can block for two reasons:
// * Meta pipe buffer is full so it will wait until it refills.
// * The next ledger available in the buffer exceeds the meta pipe buffer size.
// In such case the method will block until LedgerCloseMeta buffer is empty.
func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) {
frameLength, err := xdr.ReadFrameLength(b.r)
if err != nil {
select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
default:
return nil, errors.Wrap(err, "error reading frame length")
}
}

for frameLength > metaPipeBufferSize && len(b.c) > 0 {
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
case <-time.After(time.Second):
}
}

var xlcm xdr.LedgerCloseMeta
_, err = xdr.Unmarshal(b.r, &xlcm)
if err != nil {
if err == io.EOF {
err = errors.Wrap(err, "got EOF from subprocess")
}
err = errors.Wrap(err, "unmarshalling framed LedgerCloseMeta")

select {
case <-b.runner.getProcessExitChan():
return nil, errors.New("stellar-core process not-running")
default:
return nil, err
}
}
return &xlcm, nil
}

func (b *bufferedLedgerMetaReader) GetChannel() <-chan metaResult {
return b.c
}

func (b *bufferedLedgerMetaReader) Start(untilSequence uint32) {
printBufferOccupation := time.NewTicker(5 * time.Second)
defer printBufferOccupation.Stop()
for {
select {
case <-b.runner.getProcessExitChan():
return
case <-printBufferOccupation.C:
log.Debug("captive core read-ahead buffer occupation:", len(b.c))
default:
}

meta, err := b.readLedgerMetaFromPipe()
if err != nil {
// When `GetLedger` sees the error it will close the backend. We shouldn't
// close it now because there may be some ledgers in a buffer.
select {
case b.c <- metaResult{nil, err}:
case <-b.runner.getProcessExitChan():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can simplify the shutdown logic further by using contexts and passing them down to each component down to the stellarCoreRunner instance. Within stellarCoreRunner we can use https://golang.org/pkg/os/exec/#CommandContext to execute stellar core. Perhaps we can make an issue separate from this PR to investigate if using contexts would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into using CommandContext and I think it won't help much. We still need close() method to do some cleanup and we can't do it with context alone. But we can definitely investigate how to write a shutdown code for more complicated object connections. For example, to figure out how to close CaptiveCore, bufferedLedgerMetaReader and stellarCoreRunner I drew a graph to understand the dependencies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: #3200.

}
return
}
select {
case b.c <- metaResult{meta, nil}:
case <-b.runner.getProcessExitChan():
return
}

if untilSequence != 0 {
if meta.LedgerSequence() >= untilSequence {
// we are done
return
}
}
}
}
Loading