diff --git a/exp/ingest/io/single_ledger_state_reader.go b/exp/ingest/io/single_ledger_state_reader.go index e9e432cc6c..03623c6d73 100644 --- a/exp/ingest/io/single_ledger_state_reader.go +++ b/exp/ingest/io/single_ledger_state_reader.go @@ -160,23 +160,25 @@ func (msr *SingleLedgerStateReader) streamBuckets() { close(msr.readChan) }() - var buckets []string + var buckets []historyarchive.Hash for i := 0; i < len(msr.has.CurrentBuckets); i++ { b := msr.has.CurrentBuckets[i] - buckets = append(buckets, b.Curr, b.Snap) - } + for _, hashString := range []string{b.Curr, b.Snap} { + hash, err := historyarchive.DecodeHash(hashString) + if err != nil { + msr.readChan <- msr.error(errors.Wrap(err, "Error decoding bucket hash")) + return + } - for _, hashString := range buckets { - hash, err := historyarchive.DecodeHash(hashString) - if err != nil { - msr.readChan <- msr.error(errors.Wrap(err, "Error decoding bucket hash")) - return - } + if hash.IsZero() { + continue + } - if hash.IsZero() { - continue + buckets = append(buckets, hash) } + } + for i, hash := range buckets { exists, err := msr.bucketExists(hash) if err != nil { msr.readChan <- msr.error( @@ -192,7 +194,8 @@ func (msr *SingleLedgerStateReader) streamBuckets() { return } - if shouldContinue := msr.streamBucketContents(hash); !shouldContinue { + oldestBucket := i == len(buckets)-1 + if shouldContinue := msr.streamBucketContents(hash, oldestBucket); !shouldContinue { break } } @@ -261,7 +264,7 @@ func (msr *SingleLedgerStateReader) newXDRStream(hash historyarchive.Hash) ( } // streamBucketContents pushes value onto the read channel, returning false when the channel needs to be closed otherwise true -func (msr *SingleLedgerStateReader) streamBucketContents(hash historyarchive.Hash) bool { +func (msr *SingleLedgerStateReader) streamBucketContents(hash historyarchive.Hash, oldestBucket bool) bool { rdr, e := msr.newXDRStream(hash) if e != nil { msr.readChan <- msr.error( @@ -428,6 +431,13 @@ LoopBucketEntry: // > that the (chronologically) preceding entry with the same ledger // > key was DEADENTRY. if entry.Type == xdr.BucketEntryTypeLiveentry { + // We skip adding entries from the last bucket to tempStore because: + // 1. Ledger keys are unique within a single bucket. + // 2. This is the last bucket we process so there's no need to track + // seen last entries in this bucket. + if oldestBucket { + continue + } err := msr.tempStore.Add(h) if err != nil { msr.readChan <- msr.error(errors.Wrap(err, "Error updating to tempStore")) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index ff7c188bb2..4a1ab05727 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -8,6 +8,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). * Drop support for MuxedAccounts strkeys (spec'ed in [SEP23](https://github.com/stellar/stellar-protocol/blob/master/ecosystem/sep-0023.md)). SEP23 is still a draft and we don't want to encourage storing strkeys which may not be definite. * Replace `SequenceProvider` implementation with one which queries the Horizon DB for sequence numbers instead of the Stellar Core DB. +* Decreased a memory usage of initial state ingestion stage and state verifier ([#2618](https://github.com/stellar/go/pull/2618)). ## v1.3.0