From ccdc7da9d16aca519b23626dacba3bcf9626c9e9 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Fri, 22 May 2020 16:15:54 +0200 Subject: [PATCH 1/2] exp/ingest/io: Skip storing live entries seen in the oldest bucket --- exp/ingest/io/single_ledger_state_reader.go | 36 +++++++++++++-------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/exp/ingest/io/single_ledger_state_reader.go b/exp/ingest/io/single_ledger_state_reader.go index e9e432cc6c..03623c6d73 100644 --- a/exp/ingest/io/single_ledger_state_reader.go +++ b/exp/ingest/io/single_ledger_state_reader.go @@ -160,23 +160,25 @@ func (msr *SingleLedgerStateReader) streamBuckets() { close(msr.readChan) }() - var buckets []string + var buckets []historyarchive.Hash for i := 0; i < len(msr.has.CurrentBuckets); i++ { b := msr.has.CurrentBuckets[i] - buckets = append(buckets, b.Curr, b.Snap) - } + for _, hashString := range []string{b.Curr, b.Snap} { + hash, err := historyarchive.DecodeHash(hashString) + if err != nil { + msr.readChan <- msr.error(errors.Wrap(err, "Error decoding bucket hash")) + return + } - for _, hashString := range buckets { - hash, err := historyarchive.DecodeHash(hashString) - if err != nil { - msr.readChan <- msr.error(errors.Wrap(err, "Error decoding bucket hash")) - return - } + if hash.IsZero() { + continue + } - if hash.IsZero() { - continue + buckets = append(buckets, hash) } + } + for i, hash := range buckets { exists, err := msr.bucketExists(hash) if err != nil { msr.readChan <- msr.error( @@ -192,7 +194,8 @@ func (msr *SingleLedgerStateReader) streamBuckets() { return } - if shouldContinue := msr.streamBucketContents(hash); !shouldContinue { + oldestBucket := i == len(buckets)-1 + if shouldContinue := msr.streamBucketContents(hash, oldestBucket); !shouldContinue { break } } @@ -261,7 +264,7 @@ func (msr *SingleLedgerStateReader) newXDRStream(hash historyarchive.Hash) ( } // streamBucketContents pushes value onto the read channel, returning false when the channel needs to be closed otherwise true -func (msr *SingleLedgerStateReader) streamBucketContents(hash historyarchive.Hash) bool { +func (msr *SingleLedgerStateReader) streamBucketContents(hash historyarchive.Hash, oldestBucket bool) bool { rdr, e := msr.newXDRStream(hash) if e != nil { msr.readChan <- msr.error( @@ -428,6 +431,13 @@ LoopBucketEntry: // > that the (chronologically) preceding entry with the same ledger // > key was DEADENTRY. if entry.Type == xdr.BucketEntryTypeLiveentry { + // We skip adding entries from the last bucket to tempStore because: + // 1. Ledger keys are unique within a single bucket. + // 2. This is the last bucket we process so there's no need to track + // seen last entries in this bucket. + if oldestBucket { + continue + } err := msr.tempStore.Add(h) if err != nil { msr.readChan <- msr.error(errors.Wrap(err, "Error updating to tempStore")) From e2a7a98d7320d4058de6d616a8d195eb0c0c098e Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Fri, 22 May 2020 16:44:17 +0200 Subject: [PATCH 2/2] Horizon changelog --- services/horizon/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 58b14ed9b9..1b66ec126e 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -6,6 +6,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased * Replace `SequenceProvider` implementation with one which queries the Horizon DB for sequence numbers instead of the Stellar Core DB. +* Decreased a memory usage of initial state ingestion stage and state verifier ([#2618](https://github.com/stellar/go/pull/2618)). ## v1.3.0