Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: reduce memory consumption of CheckpointChangeReader #5270

Merged
merged 9 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 58 additions & 160 deletions ingest/checkpoint_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand All @@ -21,15 +22,15 @@ type readResult struct {
// snapshot. The Changes produced by a CheckpointChangeReader reflect the state of the Stellar
// network at a particular checkpoint ledger sequence.
type CheckpointChangeReader struct {
ctx context.Context
has *historyarchive.HistoryArchiveState
archive historyarchive.ArchiveInterface
tempStore tempSet
sequence uint32
readChan chan readResult
streamOnce sync.Once
closeOnce sync.Once
done chan bool
ctx context.Context
has *historyarchive.HistoryArchiveState
archive historyarchive.ArchiveInterface
visitedLedgerKeys set.Set[string]
sequence uint32
readChan chan readResult
streamOnce sync.Once
closeOnce sync.Once
done chan bool

readBytesMutex sync.RWMutex
totalRead int64
Expand All @@ -45,32 +46,11 @@ type CheckpointChangeReader struct {
// Ensure CheckpointChangeReader implements ChangeReader
var _ ChangeReader = &CheckpointChangeReader{}

// tempSet is an interface that must be implemented by stores that
// hold temporary set of objects for state reader. The implementation
// does not need to be thread-safe.
type tempSet interface {
Open() error
// Preload batch-loads keys into internal cache (if a store has any) to
// improve execution time by removing many round-trips.
Preload(keys []string) error
// Add adds key to the store.
Add(key string) error
// Exist returns value true if the value is found in the store.
// If the value has not been set, it should return false.
Exist(key string) (bool, error)
Close() error
}

const (
// maxStreamRetries defines how many times should we retry when there are errors in
// the xdr stream returned by GetXdrStreamForHash().
maxStreamRetries = 3
msrBufferSize = 50000

// preloadedEntries defines a number of bucket entries to preload from a
// bucket in a single run. This is done to allow preloading keys from
// temp set.
preloadedEntries = 20000
)

// NewCheckpointChangeReader constructs a new CheckpointChangeReader instance.
Expand Down Expand Up @@ -102,24 +82,18 @@ func NewCheckpointChangeReader(
return nil, errors.Wrapf(err, "unable to get checkpoint HAS at ledger sequence %d", sequence)
}

tempStore := &memoryTempSet{}
err = tempStore.Open()
if err != nil {
return nil, errors.Wrap(err, "unable to get open temp store")
}

return &CheckpointChangeReader{
ctx: ctx,
has: &has,
archive: archive,
tempStore: tempStore,
sequence: sequence,
readChan: make(chan readResult, msrBufferSize),
streamOnce: sync.Once{},
closeOnce: sync.Once{},
done: make(chan bool),
encodingBuffer: xdr.NewEncodingBuffer(),
sleep: time.Sleep,
ctx: ctx,
has: &has,
archive: archive,
visitedLedgerKeys: set.Set[string]{},
sequence: sequence,
readChan: make(chan readResult, msrBufferSize),
streamOnce: sync.Once{},
closeOnce: sync.Once{},
done: make(chan bool),
encodingBuffer: xdr.NewEncodingBuffer(),
sleep: time.Sleep,
}, nil
}

Expand All @@ -141,21 +115,18 @@ func (r *CheckpointChangeReader) bucketExists(hash historyarchive.Hash) (bool, e
// However, we can modify this algorithm to work from newest to oldest ledgers:
//
// 1. For each `INITENTRY`/`LIVEENTRY` we check if we've seen the key before
// (stored in `tempStore`). If the key hasn't been seen, we write that bucket
// entry to the stream and add it to the `tempStore` (we don't mark `INITENTRY`,
// (stored in `visitedLedgerKeys`). If the key hasn't been seen, we write that bucket
// entry to the stream and add it to the `visitedLedgerKeys` (we don't mark `INITENTRY`,
// see the inline comment or CAP-20).
// 2. For each `DEADENTRY` we keep track of removed bucket entries in
// `tempStore` map.
// `visitedLedgerKeys` map.
//
// In such algorithm we just need to store a set of keys that require much less space.
// The memory requirements will be lowered when CAP-0020 is live and older buckets are
// rewritten. Then, we will only need to keep track of `DEADENTRY`.
func (r *CheckpointChangeReader) streamBuckets() {
defer func() {
err := r.tempStore.Close()
if err != nil {
r.readChan <- r.error(errors.New("Error closing tempStore"))
}
r.visitedLedgerKeys = nil

r.closeOnce.Do(r.close)
close(r.readChan)
Expand Down Expand Up @@ -305,96 +276,21 @@ func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash,
// No METAENTRY means that bucket originates from before protocol version 11.
bucketProtocolVersion := uint32(0)

n := -1
var batch []xdr.BucketEntry
lastBatch := false

preloadKeys := make([]string, 0, preloadedEntries)

LoopBucketEntry:
for {
// Preload entries for faster retrieve from temp store.
if len(batch) == 0 {
if lastBatch {
for n := 0; ; n++ {
var entry xdr.BucketEntry
entry, e = r.readBucketEntry(rdr, hash)
if e != nil {
if e == io.EOF {
// No entries loaded for this batch, nothing more to process
return true
}
batch = make([]xdr.BucketEntry, 0, preloadedEntries)

// reset the content of the preloadKeys
preloadKeys = preloadKeys[:0]

for i := 0; i < preloadedEntries; i++ {
var entry xdr.BucketEntry
entry, e = r.readBucketEntry(rdr, hash)
if e != nil {
if e == io.EOF {
if len(batch) == 0 {
// No entries loaded for this batch, nothing more to process
return true
}
lastBatch = true
break
}
r.readChan <- r.error(
errors.Wrapf(e, "Error on XDR record %d of hash '%s'", n, hash.String()),
)
return false
}

batch = append(batch, entry)

// Generate a key
var key xdr.LedgerKey
var err error

switch entry.Type {
case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry:
liveEntry := entry.MustLiveEntry()
key, err = liveEntry.LedgerKey()
if err != nil {
r.readChan <- r.error(
errors.Wrapf(err, "Error generating ledger key for XDR record %d of hash '%s'", n, hash.String()),
)
return false
}
case xdr.BucketEntryTypeDeadentry:
key = entry.MustDeadEntry()
default:
// No ledger key associated with this entry, continue to the next one.
continue
}

// We're using compressed keys here
// safe, since we are converting to string right away
keyBytes, e := r.encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key)
if e != nil {
r.readChan <- r.error(
errors.Wrapf(e, "Error marshaling XDR record %d of hash '%s'", n, hash.String()),
)
return false
}

h := string(keyBytes)
preloadKeys = append(preloadKeys, h)
}

err := r.tempStore.Preload(preloadKeys)
if err != nil {
r.readChan <- r.error(errors.Wrap(err, "Error preloading keys"))
return false
}
r.readChan <- r.error(
errors.Wrapf(e, "Error on XDR record %d of hash '%s'", n, hash.String()),
)
return false
}

var entry xdr.BucketEntry
entry, batch = batch[0], batch[1:]

n++

var key xdr.LedgerKey
var err error

switch entry.Type {
case xdr.BucketEntryTypeMetaentry:
if entry.Type == xdr.BucketEntryTypeMetaentry {
if n != 0 {
r.readChan <- r.error(
errors.Errorf(
Expand All @@ -407,7 +303,13 @@ LoopBucketEntry:
// We can't use MustMetaEntry() here. Check:
// https://github.com/golang/go/issues/32560
bucketProtocolVersion = uint32(entry.MetaEntry.LedgerVersion)
continue LoopBucketEntry
continue
}

var key xdr.LedgerKey
var err error

switch entry.Type {
case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry:
liveEntry := entry.MustLiveEntry()
key, err = liveEntry.LedgerKey()
Expand Down Expand Up @@ -439,6 +341,12 @@ LoopBucketEntry:
}

h := string(keyBytes)
// claimable balances and offers have unique ids
// once a claimable balance or offer is created we can assume that
// the id can never be recreated again, unlike, for example, trustlines
// which can be deleted and then recreated
unique := key.Type == xdr.LedgerEntryTypeClaimableBalance ||
key.Type == xdr.LedgerEntryTypeOffer

switch entry.Type {
case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry:
Expand All @@ -449,13 +357,7 @@ LoopBucketEntry:
return false
}

seen, err := r.tempStore.Exist(h)
if err != nil {
r.readChan <- r.error(errors.Wrap(err, "Error reading from tempStore"))
return false
}

if !seen {
if !r.visitedLedgerKeys.Contains(h) {
// Return LEDGER_ENTRY_STATE changes only now.
liveEntry := entry.MustLiveEntry()
entryChange := xdr.LedgerEntryChange{
Expand All @@ -464,32 +366,28 @@ LoopBucketEntry:
}
r.readChan <- readResult{entryChange, nil}

// We don't update `tempStore` for INITENTRY because CAP-20 says:
// We don't update `visitedLedgerKeys` for INITENTRY because CAP-20 says:
// > a bucket entry marked INITENTRY implies that either no entry
// > with the same ledger key exists in an older bucket, or else
// > that the (chronologically) preceding entry with the same ledger
// > key was DEADENTRY.
if entry.Type == xdr.BucketEntryTypeLiveentry {
// We skip adding entries from the last bucket to tempStore because:
// We skip adding entries from the last bucket to visitedLedgerKeys because:
// 1. Ledger keys are unique within a single bucket.
// 2. This is the last bucket we process so there's no need to track
// seen last entries in this bucket.
if oldestBucket {
continue
}
err := r.tempStore.Add(h)
if err != nil {
r.readChan <- r.error(errors.Wrap(err, "Error updating to tempStore"))
return false
}
r.visitedLedgerKeys.Add(h)
}
} else if entry.Type == xdr.BucketEntryTypeInitentry && unique {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
// we can remove the ledger key because we know that it's unique in the ledger
// and cannot be recreated
r.visitedLedgerKeys.Remove(h)
}
case xdr.BucketEntryTypeDeadentry:
err := r.tempStore.Add(h)
if err != nil {
r.readChan <- r.error(errors.Wrap(err, "Error writing to tempStore"))
return false
}
r.visitedLedgerKeys.Add(h)
default:
r.readChan <- r.error(
errors.Errorf("Unexpected entry type %d: %d@%s", entry.Type, n, hash.String()),
Expand Down
Loading
Loading