Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/ingest/io: Skip storing live entries seen in the oldest bucket #2618

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions exp/ingest/io/single_ledger_state_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,25 @@ func (msr *SingleLedgerStateReader) streamBuckets() {
close(msr.readChan)
}()

var buckets []string
var buckets []historyarchive.Hash
for i := 0; i < len(msr.has.CurrentBuckets); i++ {
b := msr.has.CurrentBuckets[i]
buckets = append(buckets, b.Curr, b.Snap)
}
for _, hashString := range []string{b.Curr, b.Snap} {
hash, err := historyarchive.DecodeHash(hashString)
if err != nil {
msr.readChan <- msr.error(errors.Wrap(err, "Error decoding bucket hash"))
return
}

for _, hashString := range buckets {
hash, err := historyarchive.DecodeHash(hashString)
if err != nil {
msr.readChan <- msr.error(errors.Wrap(err, "Error decoding bucket hash"))
return
}
if hash.IsZero() {
continue
}

if hash.IsZero() {
continue
buckets = append(buckets, hash)
}
}

for i, hash := range buckets {
exists, err := msr.bucketExists(hash)
if err != nil {
msr.readChan <- msr.error(
Expand All @@ -192,7 +194,8 @@ func (msr *SingleLedgerStateReader) streamBuckets() {
return
}

if shouldContinue := msr.streamBucketContents(hash); !shouldContinue {
oldestBucket := i == len(buckets)-1
if shouldContinue := msr.streamBucketContents(hash, oldestBucket); !shouldContinue {
break
}
}
Expand Down Expand Up @@ -261,7 +264,7 @@ func (msr *SingleLedgerStateReader) newXDRStream(hash historyarchive.Hash) (
}

// streamBucketContents pushes value onto the read channel, returning false when the channel needs to be closed otherwise true
func (msr *SingleLedgerStateReader) streamBucketContents(hash historyarchive.Hash) bool {
func (msr *SingleLedgerStateReader) streamBucketContents(hash historyarchive.Hash, oldestBucket bool) bool {
rdr, e := msr.newXDRStream(hash)
if e != nil {
msr.readChan <- msr.error(
Expand Down Expand Up @@ -428,6 +431,13 @@ LoopBucketEntry:
// > that the (chronologically) preceding entry with the same ledger
// > key was DEADENTRY.
if entry.Type == xdr.BucketEntryTypeLiveentry {
// We skip adding entries from the last bucket to tempStore because:
// 1. Ledger keys are unique within a single bucket.
// 2. This is the last bucket we process so there's no need to track
// seen last entries in this bucket.
if oldestBucket {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can apply the same logic to the entry.Type == xdr.BucketEntryTypeDeadentry case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The oldest bucket does not contain any dead entries. @jonjove is it always true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to call msr.tempStore.Exist(h) on the last bucket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because it's possible that live or dead entry for a given ledger key has been seen in one of the previous buckets.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bartekn It is always true that the oldest bucket does not contain any dead entries.

continue
}
err := msr.tempStore.Add(h)
if err != nil {
msr.readChan <- msr.error(errors.Wrap(err, "Error updating to tempStore"))
Expand Down
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
* Drop support for MuxedAccounts strkeys (spec'ed in [SEP23](https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0023.md)).
SEP23 is still a draft and we don't want to encourage storing strkeys which may not be definite.
* Replace `SequenceProvider` implementation with one which queries the Horizon DB for sequence numbers instead of the Stellar Core DB.
* Decreased a memory usage of initial state ingestion stage and state verifier ([#2618](https://github.com/stellar/go/pull/2618)).

## v1.3.0

Expand Down