diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index d1365740c6..1dbbed2a9e 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stellar/go/historyarchive" + "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -21,15 +22,15 @@ type readResult struct { // snapshot. The Changes produced by a CheckpointChangeReader reflect the state of the Stellar // network at a particular checkpoint ledger sequence. type CheckpointChangeReader struct { - ctx context.Context - has *historyarchive.HistoryArchiveState - archive historyarchive.ArchiveInterface - tempStore tempSet - sequence uint32 - readChan chan readResult - streamOnce sync.Once - closeOnce sync.Once - done chan bool + ctx context.Context + has *historyarchive.HistoryArchiveState + archive historyarchive.ArchiveInterface + visitedLedgerKeys set.Set[string] + sequence uint32 + readChan chan readResult + streamOnce sync.Once + closeOnce sync.Once + done chan bool readBytesMutex sync.RWMutex totalRead int64 @@ -45,32 +46,11 @@ type CheckpointChangeReader struct { // Ensure CheckpointChangeReader implements ChangeReader var _ ChangeReader = &CheckpointChangeReader{} -// tempSet is an interface that must be implemented by stores that -// hold temporary set of objects for state reader. The implementation -// does not need to be thread-safe. -type tempSet interface { - Open() error - // Preload batch-loads keys into internal cache (if a store has any) to - // improve execution time by removing many round-trips. - Preload(keys []string) error - // Add adds key to the store. - Add(key string) error - // Exist returns value true if the value is found in the store. - // If the value has not been set, it should return false. - Exist(key string) (bool, error) - Close() error -} - const ( // maxStreamRetries defines how many times should we retry when there are errors in // the xdr stream returned by GetXdrStreamForHash(). maxStreamRetries = 3 msrBufferSize = 50000 - - // preloadedEntries defines a number of bucket entries to preload from a - // bucket in a single run. This is done to allow preloading keys from - // temp set. - preloadedEntries = 20000 ) // NewCheckpointChangeReader constructs a new CheckpointChangeReader instance. @@ -102,24 +82,18 @@ func NewCheckpointChangeReader( return nil, errors.Wrapf(err, "unable to get checkpoint HAS at ledger sequence %d", sequence) } - tempStore := &memoryTempSet{} - err = tempStore.Open() - if err != nil { - return nil, errors.Wrap(err, "unable to get open temp store") - } - return &CheckpointChangeReader{ - ctx: ctx, - has: &has, - archive: archive, - tempStore: tempStore, - sequence: sequence, - readChan: make(chan readResult, msrBufferSize), - streamOnce: sync.Once{}, - closeOnce: sync.Once{}, - done: make(chan bool), - encodingBuffer: xdr.NewEncodingBuffer(), - sleep: time.Sleep, + ctx: ctx, + has: &has, + archive: archive, + visitedLedgerKeys: set.Set[string]{}, + sequence: sequence, + readChan: make(chan readResult, msrBufferSize), + streamOnce: sync.Once{}, + closeOnce: sync.Once{}, + done: make(chan bool), + encodingBuffer: xdr.NewEncodingBuffer(), + sleep: time.Sleep, }, nil } @@ -141,21 +115,18 @@ func (r *CheckpointChangeReader) bucketExists(hash historyarchive.Hash) (bool, e // However, we can modify this algorithm to work from newest to oldest ledgers: // // 1. For each `INITENTRY`/`LIVEENTRY` we check if we've seen the key before -// (stored in `tempStore`). If the key hasn't been seen, we write that bucket -// entry to the stream and add it to the `tempStore` (we don't mark `INITENTRY`, +// (stored in `visitedLedgerKeys`). If the key hasn't been seen, we write that bucket +// entry to the stream and add it to the `visitedLedgerKeys` (we don't mark `INITENTRY`, // see the inline comment or CAP-20). // 2. For each `DEADENTRY` we keep track of removed bucket entries in -// `tempStore` map. +// `visitedLedgerKeys` map. // // In such algorithm we just need to store a set of keys that require much less space. // The memory requirements will be lowered when CAP-0020 is live and older buckets are // rewritten. Then, we will only need to keep track of `DEADENTRY`. func (r *CheckpointChangeReader) streamBuckets() { defer func() { - err := r.tempStore.Close() - if err != nil { - r.readChan <- r.error(errors.New("Error closing tempStore")) - } + r.visitedLedgerKeys = nil r.closeOnce.Do(r.close) close(r.readChan) @@ -305,96 +276,21 @@ func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash, // No METAENTRY means that bucket originates from before protocol version 11. bucketProtocolVersion := uint32(0) - n := -1 - var batch []xdr.BucketEntry - lastBatch := false - - preloadKeys := make([]string, 0, preloadedEntries) - -LoopBucketEntry: - for { - // Preload entries for faster retrieve from temp store. - if len(batch) == 0 { - if lastBatch { + for n := 0; ; n++ { + var entry xdr.BucketEntry + entry, e = r.readBucketEntry(rdr, hash) + if e != nil { + if e == io.EOF { + // No entries loaded for this batch, nothing more to process return true } - batch = make([]xdr.BucketEntry, 0, preloadedEntries) - - // reset the content of the preloadKeys - preloadKeys = preloadKeys[:0] - - for i := 0; i < preloadedEntries; i++ { - var entry xdr.BucketEntry - entry, e = r.readBucketEntry(rdr, hash) - if e != nil { - if e == io.EOF { - if len(batch) == 0 { - // No entries loaded for this batch, nothing more to process - return true - } - lastBatch = true - break - } - r.readChan <- r.error( - errors.Wrapf(e, "Error on XDR record %d of hash '%s'", n, hash.String()), - ) - return false - } - - batch = append(batch, entry) - - // Generate a key - var key xdr.LedgerKey - var err error - - switch entry.Type { - case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: - liveEntry := entry.MustLiveEntry() - key, err = liveEntry.LedgerKey() - if err != nil { - r.readChan <- r.error( - errors.Wrapf(err, "Error generating ledger key for XDR record %d of hash '%s'", n, hash.String()), - ) - return false - } - case xdr.BucketEntryTypeDeadentry: - key = entry.MustDeadEntry() - default: - // No ledger key associated with this entry, continue to the next one. - continue - } - - // We're using compressed keys here - // safe, since we are converting to string right away - keyBytes, e := r.encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key) - if e != nil { - r.readChan <- r.error( - errors.Wrapf(e, "Error marshaling XDR record %d of hash '%s'", n, hash.String()), - ) - return false - } - - h := string(keyBytes) - preloadKeys = append(preloadKeys, h) - } - - err := r.tempStore.Preload(preloadKeys) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error preloading keys")) - return false - } + r.readChan <- r.error( + errors.Wrapf(e, "Error on XDR record %d of hash '%s'", n, hash.String()), + ) + return false } - var entry xdr.BucketEntry - entry, batch = batch[0], batch[1:] - - n++ - - var key xdr.LedgerKey - var err error - - switch entry.Type { - case xdr.BucketEntryTypeMetaentry: + if entry.Type == xdr.BucketEntryTypeMetaentry { if n != 0 { r.readChan <- r.error( errors.Errorf( @@ -407,7 +303,13 @@ LoopBucketEntry: // We can't use MustMetaEntry() here. Check: // https://github.com/golang/go/issues/32560 bucketProtocolVersion = uint32(entry.MetaEntry.LedgerVersion) - continue LoopBucketEntry + continue + } + + var key xdr.LedgerKey + var err error + + switch entry.Type { case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: liveEntry := entry.MustLiveEntry() key, err = liveEntry.LedgerKey() @@ -439,6 +341,12 @@ LoopBucketEntry: } h := string(keyBytes) + // claimable balances and offers have unique ids + // once a claimable balance or offer is created we can assume that + // the id can never be recreated again, unlike, for example, trustlines + // which can be deleted and then recreated + unique := key.Type == xdr.LedgerEntryTypeClaimableBalance || + key.Type == xdr.LedgerEntryTypeOffer switch entry.Type { case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: @@ -449,13 +357,7 @@ LoopBucketEntry: return false } - seen, err := r.tempStore.Exist(h) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error reading from tempStore")) - return false - } - - if !seen { + if !r.visitedLedgerKeys.Contains(h) { // Return LEDGER_ENTRY_STATE changes only now. liveEntry := entry.MustLiveEntry() entryChange := xdr.LedgerEntryChange{ @@ -464,32 +366,28 @@ LoopBucketEntry: } r.readChan <- readResult{entryChange, nil} - // We don't update `tempStore` for INITENTRY because CAP-20 says: + // We don't update `visitedLedgerKeys` for INITENTRY because CAP-20 says: // > a bucket entry marked INITENTRY implies that either no entry // > with the same ledger key exists in an older bucket, or else // > that the (chronologically) preceding entry with the same ledger // > key was DEADENTRY. if entry.Type == xdr.BucketEntryTypeLiveentry { - // We skip adding entries from the last bucket to tempStore because: + // We skip adding entries from the last bucket to visitedLedgerKeys because: // 1. Ledger keys are unique within a single bucket. // 2. This is the last bucket we process so there's no need to track // seen last entries in this bucket. if oldestBucket { continue } - err := r.tempStore.Add(h) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error updating to tempStore")) - return false - } + r.visitedLedgerKeys.Add(h) } + } else if entry.Type == xdr.BucketEntryTypeInitentry && unique { + // we can remove the ledger key because we know that it's unique in the ledger + // and cannot be recreated + r.visitedLedgerKeys.Remove(h) } case xdr.BucketEntryTypeDeadentry: - err := r.tempStore.Add(h) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error writing to tempStore")) - return false - } + r.visitedLedgerKeys.Add(h) default: r.readChan <- r.error( errors.Errorf("Unexpected entry type %d: %d@%s", entry.Type, n, hash.String()), diff --git a/ingest/checkpoint_change_reader_test.go b/ingest/checkpoint_change_reader_test.go index 958f3199ac..35ec2bb076 100644 --- a/ingest/checkpoint_change_reader_test.go +++ b/ingest/checkpoint_change_reader_test.go @@ -10,11 +10,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/stellar/go/historyarchive" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" ) func TestSingleLedgerStateReaderTestSuite(t *testing.T) { @@ -220,6 +221,141 @@ func (s *SingleLedgerStateReaderTestSuite) TestEnsureLatestLiveEntry() { s.Require().Equal(err, io.EOF) } +func (s *SingleLedgerStateReaderTestSuite) TestUniqueInitEntryOptimization() { + curr1 := createXdrStream( + metaEntry(20), + entryAccount(xdr.BucketEntryTypeLiveentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + entryCB(xdr.BucketEntryTypeDeadentry, xdr.Hash{1, 2, 3}, 100), + entryOffer(xdr.BucketEntryTypeDeadentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 20), + ) + + snap1 := createXdrStream( + metaEntry(20), + entryAccount(xdr.BucketEntryTypeInitentry, "GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB", 1), + entryAccount(xdr.BucketEntryTypeInitentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + entryAccount(xdr.BucketEntryTypeInitentry, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", 1), + entryCB(xdr.BucketEntryTypeInitentry, xdr.Hash{1, 2, 3}, 100), + entryOffer(xdr.BucketEntryTypeInitentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 20), + entryAccount(xdr.BucketEntryTypeInitentry, "GAP2KHWUMOHY7IO37UJY7SEBIITJIDZS5DRIIQRPEUT4VUKHZQGIRWS4", 1), + entryAccount(xdr.BucketEntryTypeInitentry, "GAIH3ULLFQ4DGSECF2AR555KZ4KNDGEKN4AFI4SU2M7B43MGK3QJZNSR", 1), + ) + + nextBucket := s.getNextBucketChannel() + + // Return curr1 and snap1 stream for the first two bucket... + s.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + s.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(snap1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + s.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Once() + } + + // replace readChan with an unbuffered channel so we can test behavior of when items are added / removed + // from visitedLedgerKeys + s.reader.readChan = make(chan readResult, 0) + + change, err := s.reader.Read() + s.Require().NoError(err) + key, err := change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")}, + }), + ) + + change, err = s.reader.Read() + s.Require().NoError(err) + key, err = change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB")}, + }), + ) + s.Require().Equal(len(s.reader.visitedLedgerKeys), 3) + s.assertVisitedLedgerKeysContains(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")}, + }) + s.assertVisitedLedgerKeysContains(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.LedgerKeyOffer{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: 20, + }, + }) + s.assertVisitedLedgerKeysContains(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &xdr.LedgerKeyClaimableBalance{ + BalanceId: xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{1, 2, 3}, + }, + }, + }) + + change, err = s.reader.Read() + s.Require().NoError(err) + key, err = change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")}, + }), + ) + + change, err = s.reader.Read() + s.Require().NoError(err) + key, err = change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GAP2KHWUMOHY7IO37UJY7SEBIITJIDZS5DRIIQRPEUT4VUKHZQGIRWS4")}, + }), + ) + // the offer and cb ledger keys should now be removed from visitedLedgerKeys + // because we encountered the init entries in the bucket + s.Require().Equal(len(s.reader.visitedLedgerKeys), 1) + s.assertVisitedLedgerKeysContains(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")}, + }) + + change, err = s.reader.Read() + s.Require().NoError(err) + key, err = change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GAIH3ULLFQ4DGSECF2AR555KZ4KNDGEKN4AFI4SU2M7B43MGK3QJZNSR")}, + }), + ) + + _, err = s.reader.Read() + s.Require().Equal(err, io.EOF) +} + +func (s *SingleLedgerStateReaderTestSuite) assertVisitedLedgerKeysContains(key xdr.LedgerKey) { + encodingBuffer := xdr.NewEncodingBuffer() + keyBytes, err := encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key) + s.Require().NoError(err) + s.Require().True(s.reader.visitedLedgerKeys.Contains(string(keyBytes))) +} + // TestMalformedProtocol11Bucket tests a buggy protocol 11 bucket (meta not the first entry) func (s *SingleLedgerStateReaderTestSuite) TestMalformedProtocol11Bucket() { curr1 := createXdrStream( @@ -733,6 +869,78 @@ func entryAccount(t xdr.BucketEntryType, id string, balance uint32) xdr.BucketEn } } +func entryCB(t xdr.BucketEntryType, id xdr.Hash, balance xdr.Int64) xdr.BucketEntry { + switch t { + case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: + return xdr.BucketEntry{ + Type: t, + LiveEntry: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &xdr.ClaimableBalanceEntry{ + BalanceId: xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &id, + }, + Asset: xdr.MustNewNativeAsset(), + Amount: balance, + }, + }, + }, + } + case xdr.BucketEntryTypeDeadentry: + return xdr.BucketEntry{ + Type: xdr.BucketEntryTypeDeadentry, + DeadEntry: &xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &xdr.LedgerKeyClaimableBalance{ + BalanceId: xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &id, + }, + }, + }, + } + default: + panic("Unknown entry type") + } +} + +func entryOffer(t xdr.BucketEntryType, seller string, id xdr.Int64) xdr.BucketEntry { + switch t { + case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: + return xdr.BucketEntry{ + Type: t, + LiveEntry: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + OfferId: id, + SellerId: xdr.MustAddress(seller), + Selling: xdr.MustNewNativeAsset(), + Buying: xdr.MustNewCreditAsset("USD", "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7"), + Amount: 100, + Price: xdr.Price{1, 1}, + }, + }, + }, + } + case xdr.BucketEntryTypeDeadentry: + return xdr.BucketEntry{ + Type: xdr.BucketEntryTypeDeadentry, + DeadEntry: &xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.LedgerKeyOffer{ + OfferId: id, + SellerId: xdr.MustAddress(seller), + }, + }, + } + default: + panic("Unknown entry type") + } +} + type errCloser struct { io.Reader err error diff --git a/ingest/memory_temp_set.go b/ingest/memory_temp_set.go deleted file mode 100644 index 26096e57e6..0000000000 --- a/ingest/memory_temp_set.go +++ /dev/null @@ -1,38 +0,0 @@ -package ingest - -// memoryTempSet is an in-memory implementation of TempSet interface. -// As of July 2019 this requires up to ~4GB of memory for pubnet ledger -// state processing. The internal structure is dereferenced after the -// store is closed. -type memoryTempSet struct { - m map[string]bool -} - -// Open initialize internals data structure. -func (s *memoryTempSet) Open() error { - s.m = make(map[string]bool) - return nil -} - -// Add adds a key to TempSet. -func (s *memoryTempSet) Add(key string) error { - s.m[key] = true - return nil -} - -// Preload does not do anything. This TempSet keeps everything in memory -// so no preloading needed. -func (s *memoryTempSet) Preload(keys []string) error { - return nil -} - -// Exist check if the key exists in a TempSet. -func (s *memoryTempSet) Exist(key string) (bool, error) { - return s.m[key], nil -} - -// Close removes reference to internal data structure. -func (s *memoryTempSet) Close() error { - s.m = nil - return nil -} diff --git a/ingest/memory_temp_set_test.go b/ingest/memory_temp_set_test.go deleted file mode 100644 index 8f56c0f86f..0000000000 --- a/ingest/memory_temp_set_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package ingest - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMemoryTempSet(t *testing.T) { - s := memoryTempSet{} - assert.Nil(t, s.m) - err := s.Open() - assert.NoError(t, err) - assert.NotNil(t, s.m) - - err = s.Add("a") - assert.NoError(t, err) - - err = s.Add("b") - assert.NoError(t, err) - - v, err := s.Exist("a") - assert.NoError(t, err) - assert.True(t, v) - - v, err = s.Exist("b") - assert.NoError(t, err) - assert.True(t, v) - - // Get for not-set key should return false - v, err = s.Exist("c") - assert.NoError(t, err) - assert.False(t, v) - - err = s.Close() - assert.NoError(t, err) - assert.Nil(t, s.m) -}