From 6a8c2f9f25c45469cd7bb3c1112378cb60a93fe5 Mon Sep 17 00:00:00 2001 From: tamirms Date: Sat, 30 Mar 2024 17:17:41 +0000 Subject: [PATCH 1/8] improve checkpoint change reader --- ingest/checkpoint_change_reader.go | 118 +++++++---------------------- ingest/memory_temp_set.go | 12 +-- 2 files changed, 34 insertions(+), 96 deletions(-) diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index d1365740c6..ba2ccabe9c 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -50,11 +50,10 @@ var _ ChangeReader = &CheckpointChangeReader{} // does not need to be thread-safe. type tempSet interface { Open() error - // Preload batch-loads keys into internal cache (if a store has any) to - // improve execution time by removing many round-trips. - Preload(keys []string) error // Add adds key to the store. Add(key string) error + // Remove removes a key from the store + Remove(key string) error // Exist returns value true if the value is found in the store. // If the value has not been set, it should return false. Exist(key string) (bool, error) @@ -305,96 +304,21 @@ func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash, // No METAENTRY means that bucket originates from before protocol version 11. bucketProtocolVersion := uint32(0) - n := -1 - var batch []xdr.BucketEntry - lastBatch := false - - preloadKeys := make([]string, 0, preloadedEntries) - -LoopBucketEntry: - for { - // Preload entries for faster retrieve from temp store. - if len(batch) == 0 { - if lastBatch { + for n := 0; ; n++ { + var entry xdr.BucketEntry + entry, e = r.readBucketEntry(rdr, hash) + if e != nil { + if e == io.EOF { + // No entries loaded for this batch, nothing more to process return true } - batch = make([]xdr.BucketEntry, 0, preloadedEntries) - - // reset the content of the preloadKeys - preloadKeys = preloadKeys[:0] - - for i := 0; i < preloadedEntries; i++ { - var entry xdr.BucketEntry - entry, e = r.readBucketEntry(rdr, hash) - if e != nil { - if e == io.EOF { - if len(batch) == 0 { - // No entries loaded for this batch, nothing more to process - return true - } - lastBatch = true - break - } - r.readChan <- r.error( - errors.Wrapf(e, "Error on XDR record %d of hash '%s'", n, hash.String()), - ) - return false - } - - batch = append(batch, entry) - - // Generate a key - var key xdr.LedgerKey - var err error - - switch entry.Type { - case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: - liveEntry := entry.MustLiveEntry() - key, err = liveEntry.LedgerKey() - if err != nil { - r.readChan <- r.error( - errors.Wrapf(err, "Error generating ledger key for XDR record %d of hash '%s'", n, hash.String()), - ) - return false - } - case xdr.BucketEntryTypeDeadentry: - key = entry.MustDeadEntry() - default: - // No ledger key associated with this entry, continue to the next one. - continue - } - - // We're using compressed keys here - // safe, since we are converting to string right away - keyBytes, e := r.encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key) - if e != nil { - r.readChan <- r.error( - errors.Wrapf(e, "Error marshaling XDR record %d of hash '%s'", n, hash.String()), - ) - return false - } - - h := string(keyBytes) - preloadKeys = append(preloadKeys, h) - } - - err := r.tempStore.Preload(preloadKeys) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error preloading keys")) - return false - } + r.readChan <- r.error( + errors.Wrapf(e, "Error on XDR record %d of hash '%s'", n, hash.String()), + ) + return false } - var entry xdr.BucketEntry - entry, batch = batch[0], batch[1:] - - n++ - - var key xdr.LedgerKey - var err error - - switch entry.Type { - case xdr.BucketEntryTypeMetaentry: + if entry.Type == xdr.BucketEntryTypeMetaentry { if n != 0 { r.readChan <- r.error( errors.Errorf( @@ -407,7 +331,13 @@ LoopBucketEntry: // We can't use MustMetaEntry() here. Check: // https://github.com/golang/go/issues/32560 bucketProtocolVersion = uint32(entry.MetaEntry.LedgerVersion) - continue LoopBucketEntry + continue + } + + var key xdr.LedgerKey + var err error + + switch entry.Type { case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: liveEntry := entry.MustLiveEntry() key, err = liveEntry.LedgerKey() @@ -439,6 +369,8 @@ LoopBucketEntry: } h := string(keyBytes) + unique := key.Type == xdr.LedgerEntryTypeClaimableBalance || + key.Type == xdr.LedgerEntryTypeOffer switch entry.Type { case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: @@ -483,6 +415,12 @@ LoopBucketEntry: return false } } + } else if entry.Type == xdr.BucketEntryTypeInitentry && unique { + err := r.tempStore.Remove(h) + if err != nil { + r.readChan <- r.error(errors.Wrap(err, "Error removing key from tempStore")) + return false + } } case xdr.BucketEntryTypeDeadentry: err := r.tempStore.Add(h) diff --git a/ingest/memory_temp_set.go b/ingest/memory_temp_set.go index 26096e57e6..bb65b30933 100644 --- a/ingest/memory_temp_set.go +++ b/ingest/memory_temp_set.go @@ -20,17 +20,17 @@ func (s *memoryTempSet) Add(key string) error { return nil } -// Preload does not do anything. This TempSet keeps everything in memory -// so no preloading needed. -func (s *memoryTempSet) Preload(keys []string) error { - return nil -} - // Exist check if the key exists in a TempSet. func (s *memoryTempSet) Exist(key string) (bool, error) { return s.m[key], nil } +// Remove removes a key from the TempSet +func (s *memoryTempSet) Remove(key string) error { + delete(s.m, key) + return nil +} + // Close removes reference to internal data structure. func (s *memoryTempSet) Close() error { s.m = nil From ba2a0bcba6ad574b5a47bab133d0399a82fc3e9d Mon Sep 17 00:00:00 2001 From: tamirms Date: Sun, 31 Mar 2024 08:06:21 +0100 Subject: [PATCH 2/8] use map[string]struct{} to save more space --- ingest/memory_temp_set.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ingest/memory_temp_set.go b/ingest/memory_temp_set.go index bb65b30933..005253bb52 100644 --- a/ingest/memory_temp_set.go +++ b/ingest/memory_temp_set.go @@ -5,24 +5,25 @@ package ingest // state processing. The internal structure is dereferenced after the // store is closed. type memoryTempSet struct { - m map[string]bool + m map[string]struct{} } // Open initialize internals data structure. func (s *memoryTempSet) Open() error { - s.m = make(map[string]bool) + s.m = make(map[string]struct{}) return nil } // Add adds a key to TempSet. func (s *memoryTempSet) Add(key string) error { - s.m[key] = true + s.m[key] = struct{}{} return nil } // Exist check if the key exists in a TempSet. func (s *memoryTempSet) Exist(key string) (bool, error) { - return s.m[key], nil + _, exists := s.m[key] + return exists, nil } // Remove removes a key from the TempSet From 7eb20474d0001676abe6a4d808029d78e3c6df92 Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 15 Apr 2024 14:54:53 +0100 Subject: [PATCH 3/8] remove unused preloadedEntries const --- ingest/checkpoint_change_reader.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index ba2ccabe9c..de104825c8 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -65,11 +65,6 @@ const ( // the xdr stream returned by GetXdrStreamForHash(). maxStreamRetries = 3 msrBufferSize = 50000 - - // preloadedEntries defines a number of bucket entries to preload from a - // bucket in a single run. This is done to allow preloading keys from - // temp set. - preloadedEntries = 20000 ) // NewCheckpointChangeReader constructs a new CheckpointChangeReader instance. From 1d0dcaf6230e57c8d828c607d13f749bb31b17ac Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 15 Apr 2024 14:59:21 +0100 Subject: [PATCH 4/8] Add comment --- ingest/checkpoint_change_reader.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index de104825c8..0898b1a5a8 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -364,6 +364,10 @@ func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash, } h := string(keyBytes) + // claimable balances and offers have unique ids + // once a claimable balance or offer is created we can assume that + // the id can never be recreated again, unlike, for example, trustlines + // which can be deleted and then recreated unique := key.Type == xdr.LedgerEntryTypeClaimableBalance || key.Type == xdr.LedgerEntryTypeOffer @@ -411,6 +415,8 @@ func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash, } } } else if entry.Type == xdr.BucketEntryTypeInitentry && unique { + // we can remove the ledger key because we know that it's unique in the ledger + // and cannot be recreated err := r.tempStore.Remove(h) if err != nil { r.readChan <- r.error(errors.Wrap(err, "Error removing key from tempStore")) From 38e59d0c523af6ff9cadcc8f2827e14deac95c77 Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 15 Apr 2024 21:34:44 +0100 Subject: [PATCH 5/8] use set.Set --- ingest/checkpoint_change_reader.go | 103 +++++++++-------------------- ingest/memory_temp_set.go | 39 ----------- ingest/memory_temp_set_test.go | 38 ----------- 3 files changed, 31 insertions(+), 149 deletions(-) delete mode 100644 ingest/memory_temp_set.go delete mode 100644 ingest/memory_temp_set_test.go diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index 0898b1a5a8..1dbbed2a9e 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stellar/go/historyarchive" + "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -21,15 +22,15 @@ type readResult struct { // snapshot. The Changes produced by a CheckpointChangeReader reflect the state of the Stellar // network at a particular checkpoint ledger sequence. type CheckpointChangeReader struct { - ctx context.Context - has *historyarchive.HistoryArchiveState - archive historyarchive.ArchiveInterface - tempStore tempSet - sequence uint32 - readChan chan readResult - streamOnce sync.Once - closeOnce sync.Once - done chan bool + ctx context.Context + has *historyarchive.HistoryArchiveState + archive historyarchive.ArchiveInterface + visitedLedgerKeys set.Set[string] + sequence uint32 + readChan chan readResult + streamOnce sync.Once + closeOnce sync.Once + done chan bool readBytesMutex sync.RWMutex totalRead int64 @@ -45,21 +46,6 @@ type CheckpointChangeReader struct { // Ensure CheckpointChangeReader implements ChangeReader var _ ChangeReader = &CheckpointChangeReader{} -// tempSet is an interface that must be implemented by stores that -// hold temporary set of objects for state reader. The implementation -// does not need to be thread-safe. -type tempSet interface { - Open() error - // Add adds key to the store. - Add(key string) error - // Remove removes a key from the store - Remove(key string) error - // Exist returns value true if the value is found in the store. - // If the value has not been set, it should return false. - Exist(key string) (bool, error) - Close() error -} - const ( // maxStreamRetries defines how many times should we retry when there are errors in // the xdr stream returned by GetXdrStreamForHash(). @@ -96,24 +82,18 @@ func NewCheckpointChangeReader( return nil, errors.Wrapf(err, "unable to get checkpoint HAS at ledger sequence %d", sequence) } - tempStore := &memoryTempSet{} - err = tempStore.Open() - if err != nil { - return nil, errors.Wrap(err, "unable to get open temp store") - } - return &CheckpointChangeReader{ - ctx: ctx, - has: &has, - archive: archive, - tempStore: tempStore, - sequence: sequence, - readChan: make(chan readResult, msrBufferSize), - streamOnce: sync.Once{}, - closeOnce: sync.Once{}, - done: make(chan bool), - encodingBuffer: xdr.NewEncodingBuffer(), - sleep: time.Sleep, + ctx: ctx, + has: &has, + archive: archive, + visitedLedgerKeys: set.Set[string]{}, + sequence: sequence, + readChan: make(chan readResult, msrBufferSize), + streamOnce: sync.Once{}, + closeOnce: sync.Once{}, + done: make(chan bool), + encodingBuffer: xdr.NewEncodingBuffer(), + sleep: time.Sleep, }, nil } @@ -135,21 +115,18 @@ func (r *CheckpointChangeReader) bucketExists(hash historyarchive.Hash) (bool, e // However, we can modify this algorithm to work from newest to oldest ledgers: // // 1. For each `INITENTRY`/`LIVEENTRY` we check if we've seen the key before -// (stored in `tempStore`). If the key hasn't been seen, we write that bucket -// entry to the stream and add it to the `tempStore` (we don't mark `INITENTRY`, +// (stored in `visitedLedgerKeys`). If the key hasn't been seen, we write that bucket +// entry to the stream and add it to the `visitedLedgerKeys` (we don't mark `INITENTRY`, // see the inline comment or CAP-20). // 2. For each `DEADENTRY` we keep track of removed bucket entries in -// `tempStore` map. +// `visitedLedgerKeys` map. // // In such algorithm we just need to store a set of keys that require much less space. // The memory requirements will be lowered when CAP-0020 is live and older buckets are // rewritten. Then, we will only need to keep track of `DEADENTRY`. func (r *CheckpointChangeReader) streamBuckets() { defer func() { - err := r.tempStore.Close() - if err != nil { - r.readChan <- r.error(errors.New("Error closing tempStore")) - } + r.visitedLedgerKeys = nil r.closeOnce.Do(r.close) close(r.readChan) @@ -380,13 +357,7 @@ func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash, return false } - seen, err := r.tempStore.Exist(h) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error reading from tempStore")) - return false - } - - if !seen { + if !r.visitedLedgerKeys.Contains(h) { // Return LEDGER_ENTRY_STATE changes only now. liveEntry := entry.MustLiveEntry() entryChange := xdr.LedgerEntryChange{ @@ -395,40 +366,28 @@ func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash, } r.readChan <- readResult{entryChange, nil} - // We don't update `tempStore` for INITENTRY because CAP-20 says: + // We don't update `visitedLedgerKeys` for INITENTRY because CAP-20 says: // > a bucket entry marked INITENTRY implies that either no entry // > with the same ledger key exists in an older bucket, or else // > that the (chronologically) preceding entry with the same ledger // > key was DEADENTRY. if entry.Type == xdr.BucketEntryTypeLiveentry { - // We skip adding entries from the last bucket to tempStore because: + // We skip adding entries from the last bucket to visitedLedgerKeys because: // 1. Ledger keys are unique within a single bucket. // 2. This is the last bucket we process so there's no need to track // seen last entries in this bucket. if oldestBucket { continue } - err := r.tempStore.Add(h) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error updating to tempStore")) - return false - } + r.visitedLedgerKeys.Add(h) } } else if entry.Type == xdr.BucketEntryTypeInitentry && unique { // we can remove the ledger key because we know that it's unique in the ledger // and cannot be recreated - err := r.tempStore.Remove(h) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error removing key from tempStore")) - return false - } + r.visitedLedgerKeys.Remove(h) } case xdr.BucketEntryTypeDeadentry: - err := r.tempStore.Add(h) - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error writing to tempStore")) - return false - } + r.visitedLedgerKeys.Add(h) default: r.readChan <- r.error( errors.Errorf("Unexpected entry type %d: %d@%s", entry.Type, n, hash.String()), diff --git a/ingest/memory_temp_set.go b/ingest/memory_temp_set.go deleted file mode 100644 index 005253bb52..0000000000 --- a/ingest/memory_temp_set.go +++ /dev/null @@ -1,39 +0,0 @@ -package ingest - -// memoryTempSet is an in-memory implementation of TempSet interface. -// As of July 2019 this requires up to ~4GB of memory for pubnet ledger -// state processing. The internal structure is dereferenced after the -// store is closed. -type memoryTempSet struct { - m map[string]struct{} -} - -// Open initialize internals data structure. -func (s *memoryTempSet) Open() error { - s.m = make(map[string]struct{}) - return nil -} - -// Add adds a key to TempSet. -func (s *memoryTempSet) Add(key string) error { - s.m[key] = struct{}{} - return nil -} - -// Exist check if the key exists in a TempSet. -func (s *memoryTempSet) Exist(key string) (bool, error) { - _, exists := s.m[key] - return exists, nil -} - -// Remove removes a key from the TempSet -func (s *memoryTempSet) Remove(key string) error { - delete(s.m, key) - return nil -} - -// Close removes reference to internal data structure. -func (s *memoryTempSet) Close() error { - s.m = nil - return nil -} diff --git a/ingest/memory_temp_set_test.go b/ingest/memory_temp_set_test.go deleted file mode 100644 index 8f56c0f86f..0000000000 --- a/ingest/memory_temp_set_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package ingest - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMemoryTempSet(t *testing.T) { - s := memoryTempSet{} - assert.Nil(t, s.m) - err := s.Open() - assert.NoError(t, err) - assert.NotNil(t, s.m) - - err = s.Add("a") - assert.NoError(t, err) - - err = s.Add("b") - assert.NoError(t, err) - - v, err := s.Exist("a") - assert.NoError(t, err) - assert.True(t, v) - - v, err = s.Exist("b") - assert.NoError(t, err) - assert.True(t, v) - - // Get for not-set key should return false - v, err = s.Exist("c") - assert.NoError(t, err) - assert.False(t, v) - - err = s.Close() - assert.NoError(t, err) - assert.Nil(t, s.m) -} From a47e706d4c137bd3b7fb99a304a27650041bc07c Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 15 Apr 2024 23:17:26 +0100 Subject: [PATCH 6/8] add unit test --- ingest/checkpoint_change_reader_test.go | 188 +++++++++++++++++++++++- 1 file changed, 186 insertions(+), 2 deletions(-) diff --git a/ingest/checkpoint_change_reader_test.go b/ingest/checkpoint_change_reader_test.go index 958f3199ac..6aa9f10034 100644 --- a/ingest/checkpoint_change_reader_test.go +++ b/ingest/checkpoint_change_reader_test.go @@ -10,11 +10,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/stellar/go/historyarchive" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" ) func TestSingleLedgerStateReaderTestSuite(t *testing.T) { @@ -220,6 +221,117 @@ func (s *SingleLedgerStateReaderTestSuite) TestEnsureLatestLiveEntry() { s.Require().Equal(err, io.EOF) } +func (s *SingleLedgerStateReaderTestSuite) TestUniqueInitEntryOptimization() { + curr1 := createXdrStream( + metaEntry(20), + entryAccount(xdr.BucketEntryTypeLiveentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + entryCB(xdr.BucketEntryTypeDeadentry, xdr.Hash{1, 2, 3}, 100), + entryOffer(xdr.BucketEntryTypeDeadentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 20), + ) + + snap1 := createXdrStream( + metaEntry(20), + entryAccount(xdr.BucketEntryTypeInitentry, "GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB", 1), + entryAccount(xdr.BucketEntryTypeInitentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + entryCB(xdr.BucketEntryTypeInitentry, xdr.Hash{1, 2, 3}, 100), + entryOffer(xdr.BucketEntryTypeInitentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 20), + entryAccount(xdr.BucketEntryTypeInitentry, "GAP2KHWUMOHY7IO37UJY7SEBIITJIDZS5DRIIQRPEUT4VUKHZQGIRWS4", 1), + ) + + nextBucket := s.getNextBucketChannel() + + // Return curr1 and snap1 stream for the first two bucket... + s.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + s.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(snap1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + s.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Once() + } + + // replace readChan with an unbuffered channel so we can test behavior of when items are added / removed + // from visitedLedgerKeys + s.reader.readChan = make(chan readResult, 0) + + change, err := s.reader.Read() + s.Require().NoError(err) + key, err := change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")}, + }), + ) + + change, err = s.reader.Read() + s.Require().NoError(err) + key, err = change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB")}, + }), + ) + s.Require().Equal(len(s.reader.visitedLedgerKeys), 3) + s.assertVisitedLedgerKeysContains(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")}, + }) + s.assertVisitedLedgerKeysContains(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.LedgerKeyOffer{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: 20, + }, + }) + s.assertVisitedLedgerKeysContains(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &xdr.LedgerKeyClaimableBalance{ + BalanceId: xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{1, 2, 3}, + }, + }, + }) + + change, err = s.reader.Read() + s.Require().NoError(err) + key, err = change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GAP2KHWUMOHY7IO37UJY7SEBIITJIDZS5DRIIQRPEUT4VUKHZQGIRWS4")}, + }), + ) + // the offer and cb ledger keys should now be removed from visitedLedgerKeys + // because we encountered the init entries in the bucket + s.Require().Equal(len(s.reader.visitedLedgerKeys), 1) + s.assertVisitedLedgerKeysContains(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")}, + }) + + _, err = s.reader.Read() + s.Require().Equal(err, io.EOF) +} + +func (s *SingleLedgerStateReaderTestSuite) assertVisitedLedgerKeysContains(key xdr.LedgerKey) { + encodingBuffer := xdr.NewEncodingBuffer() + keyBytes, err := encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key) + s.Require().NoError(err) + s.Require().True(s.reader.visitedLedgerKeys.Contains(string(keyBytes))) +} + // TestMalformedProtocol11Bucket tests a buggy protocol 11 bucket (meta not the first entry) func (s *SingleLedgerStateReaderTestSuite) TestMalformedProtocol11Bucket() { curr1 := createXdrStream( @@ -733,6 +845,78 @@ func entryAccount(t xdr.BucketEntryType, id string, balance uint32) xdr.BucketEn } } +func entryCB(t xdr.BucketEntryType, id xdr.Hash, balance xdr.Int64) xdr.BucketEntry { + switch t { + case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: + return xdr.BucketEntry{ + Type: t, + LiveEntry: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &xdr.ClaimableBalanceEntry{ + BalanceId: xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &id, + }, + Asset: xdr.MustNewNativeAsset(), + Amount: balance, + }, + }, + }, + } + case xdr.BucketEntryTypeDeadentry: + return xdr.BucketEntry{ + Type: xdr.BucketEntryTypeDeadentry, + DeadEntry: &xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &xdr.LedgerKeyClaimableBalance{ + BalanceId: xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &id, + }, + }, + }, + } + default: + panic("Unknown entry type") + } +} + +func entryOffer(t xdr.BucketEntryType, seller string, id xdr.Int64) xdr.BucketEntry { + switch t { + case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: + return xdr.BucketEntry{ + Type: t, + LiveEntry: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + OfferId: id, + SellerId: xdr.MustAddress(seller), + Selling: xdr.MustNewNativeAsset(), + Buying: xdr.MustNewCreditAsset("USD", "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7"), + Amount: 100, + Price: xdr.Price{1, 1}, + }, + }, + }, + } + case xdr.BucketEntryTypeDeadentry: + return xdr.BucketEntry{ + Type: xdr.BucketEntryTypeDeadentry, + DeadEntry: &xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.LedgerKeyOffer{ + OfferId: id, + SellerId: xdr.MustAddress(seller), + }, + }, + } + default: + panic("Unknown entry type") + } +} + type errCloser struct { io.Reader err error From c8e87b34a6e630dd1b0a10f71541ca408d1d3e91 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 16 Apr 2024 00:03:30 +0100 Subject: [PATCH 7/8] fix data race in test --- ingest/checkpoint_change_reader_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/ingest/checkpoint_change_reader_test.go b/ingest/checkpoint_change_reader_test.go index 6aa9f10034..367f168941 100644 --- a/ingest/checkpoint_change_reader_test.go +++ b/ingest/checkpoint_change_reader_test.go @@ -233,9 +233,11 @@ func (s *SingleLedgerStateReaderTestSuite) TestUniqueInitEntryOptimization() { metaEntry(20), entryAccount(xdr.BucketEntryTypeInitentry, "GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB", 1), entryAccount(xdr.BucketEntryTypeInitentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + entryAccount(xdr.BucketEntryTypeInitentry, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", 1), entryCB(xdr.BucketEntryTypeInitentry, xdr.Hash{1, 2, 3}, 100), entryOffer(xdr.BucketEntryTypeInitentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 20), entryAccount(xdr.BucketEntryTypeInitentry, "GAP2KHWUMOHY7IO37UJY7SEBIITJIDZS5DRIIQRPEUT4VUKHZQGIRWS4", 1), + entryAccount(xdr.BucketEntryTypeInitentry, "GAIH3ULLFQ4DGSECF2AR555KZ4KNDGEKN4AFI4SU2M7B43MGK3QJZNSR", 1), ) nextBucket := s.getNextBucketChannel() @@ -303,6 +305,17 @@ func (s *SingleLedgerStateReaderTestSuite) TestUniqueInitEntryOptimization() { }, }) + change, err = s.reader.Read() + s.Require().NoError(err) + key, err = change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")}, + }), + ) + change, err = s.reader.Read() s.Require().NoError(err) key, err = change.Post.Data.LedgerKey() @@ -321,6 +334,17 @@ func (s *SingleLedgerStateReaderTestSuite) TestUniqueInitEntryOptimization() { Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML")}, }) + change, err = s.reader.Read() + s.Require().NoError(err) + key, err = change.Post.Data.LedgerKey() + s.Require().NoError(err) + s.Require().True( + key.Equals(xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GAIH3ULLFQ4DGSECF2AR555KZ4KNDGEKN4AFI4SU2M7B43MGK3QJZNSRcomm")}, + }), + ) + _, err = s.reader.Read() s.Require().Equal(err, io.EOF) } From b245ed2dadf5aa59d30c9b1e4bd39a3e9ae779fb Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 16 Apr 2024 00:33:24 +0100 Subject: [PATCH 8/8] fix typo --- ingest/checkpoint_change_reader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest/checkpoint_change_reader_test.go b/ingest/checkpoint_change_reader_test.go index 367f168941..35ec2bb076 100644 --- a/ingest/checkpoint_change_reader_test.go +++ b/ingest/checkpoint_change_reader_test.go @@ -341,7 +341,7 @@ func (s *SingleLedgerStateReaderTestSuite) TestUniqueInitEntryOptimization() { s.Require().True( key.Equals(xdr.LedgerKey{ Type: xdr.LedgerEntryTypeAccount, - Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GAIH3ULLFQ4DGSECF2AR555KZ4KNDGEKN4AFI4SU2M7B43MGK3QJZNSRcomm")}, + Account: &xdr.LedgerKeyAccount{AccountId: xdr.MustAddress("GAIH3ULLFQ4DGSECF2AR555KZ4KNDGEKN4AFI4SU2M7B43MGK3QJZNSR")}, }), )