From a8a87b9744e7af8818a7fc73372846ab7c2f2e44 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Wed, 10 Apr 2024 16:46:06 -0700 Subject: [PATCH 1/9] Add untested prototype of an indexed TransactionReader variant --- ingest/lazy_transaction_reader.go | 96 +++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 ingest/lazy_transaction_reader.go diff --git a/ingest/lazy_transaction_reader.go b/ingest/lazy_transaction_reader.go new file mode 100644 index 0000000000..d4a5dd14d0 --- /dev/null +++ b/ingest/lazy_transaction_reader.go @@ -0,0 +1,96 @@ +package ingest + +import ( + "io" + + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +// LazyTransactionReader supports reading ranges of transactions from a raw +// LedgerCloseMeta instance in a "lazy" fashion, meaning the transaction +// structures are only created when you actually request a read for that +// particular index. +type LazyTransactionReader struct { + lcm xdr.LedgerCloseMeta + start int // read-only + + transactions []LedgerTransaction // cached for Rewind() calls + lastRead int // cycles through ^ +} + +// NewLazyTransactionReader creates a new reader instance from raw +// xdr.LedgerCloseMeta starting at a particular transaction index. Note that +// LazyTransactionReader is not thread safe and should not be shared by multiple +// goroutines. +func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) *LazyTransactionReader { + return &LazyTransactionReader{ + lcm: ledgerCloseMeta, + start: start, + lastRead: -1, // haven't started yet + } +} + +// GetSequence returns the sequence number of the ledger data stored by this object. +func (reader *LazyTransactionReader) GetSequence() uint32 { + return reader.lcm.LedgerSequence() +} + +// GetHeader returns the XDR Header data associated with the stored ledger. +func (reader *LazyTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { + return reader.lcm.LedgerHeaderHistoryEntry() +} + +// Read returns the next transaction in the ledger, ordered by tx number, each +// time it is called. When there are no more transactions to return, an EOF +// error is returned. Note that if you initialized the reader from a non-zero +// index, it will EOF when it cycles back to the start rather than when it +// reaches the end. +func (reader *LazyTransactionReader) Read() (LedgerTransaction, error) { + if reader.lcm.ProtocolVersion() < 20 { + return LedgerTransaction{}, errors.New("LazyTransactionReader only works from Protocol 20 onward.") + } + txCount := reader.lcm.CountTransactions() + + i := reader.start // assume first time + if reader.lastRead != -1 { + i = (reader.lastRead + 1) % txCount + if i == reader.start { // cycle, so rewind but mark as EOF + reader.Rewind() + return LedgerTransaction{}, io.EOF + } + } + + lcm := reader.lcm + reader.lastRead = i + envelope := lcm.TransactionEnvelopes()[i] + cachedIdx := (i - reader.start + txCount /* to fix negatives */) % txCount + + if cachedIdx < len(reader.transactions) { // cached? return immediately + return reader.transactions[cachedIdx], nil + } + + newTx := LedgerTransaction{ + Index: uint32(i + 1), // Transactions start at '1' + Envelope: envelope, + Result: lcm.TransactionResultPair(i), + UnsafeMeta: lcm.TxApplyProcessing(i), + FeeChanges: lcm.FeeProcessing(i), + LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), + } + reader.transactions = append(reader.transactions, newTx) + return newTx, nil +} + +// Rewind resets the reader back to the first transaction in the ledger +func (reader *LazyTransactionReader) Rewind() { + reader.lastRead = -1 +} + +// Close should be called when reading is finished. This is especially +// helpful when there are still some transactions available so reader can stop +// streaming them. +func (reader *LazyTransactionReader) Close() error { + reader.transactions = nil + return nil +} From 2e51ba2226c4e4b9ff07d09fedeca250bcbbb5c0 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 11 Apr 2024 12:20:53 -0700 Subject: [PATCH 2/9] Add test suite --- ingest/lazy_transaction_reader.go | 15 ++-- ingest/lazy_transaction_reader_test.go | 117 +++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 ingest/lazy_transaction_reader_test.go diff --git a/ingest/lazy_transaction_reader.go b/ingest/lazy_transaction_reader.go index d4a5dd14d0..201785b3dd 100644 --- a/ingest/lazy_transaction_reader.go +++ b/ingest/lazy_transaction_reader.go @@ -23,12 +23,20 @@ type LazyTransactionReader struct { // xdr.LedgerCloseMeta starting at a particular transaction index. Note that // LazyTransactionReader is not thread safe and should not be shared by multiple // goroutines. -func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) *LazyTransactionReader { +func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) (*LazyTransactionReader, error) { + if start >= ledgerCloseMeta.CountTransactions() || start < 0 { + return nil, errors.New("'start' index exceeds ledger transaction count") + } + + if ledgerCloseMeta.ProtocolVersion() < 20 { + return nil, errors.New("LazyTransactionReader only works from Protocol 20 onward") + } + return &LazyTransactionReader{ lcm: ledgerCloseMeta, start: start, lastRead: -1, // haven't started yet - } + }, nil } // GetSequence returns the sequence number of the ledger data stored by this object. @@ -47,9 +55,6 @@ func (reader *LazyTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { // index, it will EOF when it cycles back to the start rather than when it // reaches the end. func (reader *LazyTransactionReader) Read() (LedgerTransaction, error) { - if reader.lcm.ProtocolVersion() < 20 { - return LedgerTransaction{}, errors.New("LazyTransactionReader only works from Protocol 20 onward.") - } txCount := reader.lcm.CountTransactions() i := reader.start // assume first time diff --git a/ingest/lazy_transaction_reader_test.go b/ingest/lazy_transaction_reader_test.go new file mode 100644 index 0000000000..3b8c488814 --- /dev/null +++ b/ingest/lazy_transaction_reader_test.go @@ -0,0 +1,117 @@ +package ingest + +import ( + "io" + "testing" + + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var txMeta = xdr.TransactionResultMeta{ + TxApplyProcessing: xdr.TransactionMeta{ + V: 3, + V3: &xdr.TransactionMetaV3{}, + }, +} +var txEnv = xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{}, + }, +} + +// barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs +var ledgerCloseMeta = xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + Ext: xdr.ExtensionPoint{V: 0}, + TxProcessing: []xdr.TransactionResultMeta{ + txMeta, + txMeta, + txMeta, + txMeta, + txMeta, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{ + Phases: []xdr.TransactionPhase{{ + V: 0, + V0Components: &[]xdr.TxSetComponent{{ + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{ + txEnv, + txEnv, + txEnv, + txEnv, + txEnv, + }, + }, + }}, + }}, + }, + }, + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{LedgerVersion: 20}, + }, + }, +} + +func TestLazyTransactionReader(t *testing.T) { + require.True(t, true) + + // simplest case: read from start + + fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, 0) + require.NoError(t, err) + for i := 0; i < 5; i++ { + tx, ierr := fromZero.Read() + require.NoError(t, ierr) + assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i) + } + _, err = fromZero.Read() + require.ErrorIs(t, err, io.EOF) + + // start reading from the middle set of txs + + fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, 2) + require.NoError(t, err) + for i := 0; i < 5; i++ { + tx, ierr := fromMiddle.Read() + require.NoError(t, ierr) + assert.EqualValues(t, 1+((i+2)%5), tx.Index, "iteration i=%d", i) + } + _, err = fromMiddle.Read() + require.ErrorIs(t, err, io.EOF) + + // edge case: start from the last tx + + fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, 4) + require.NoError(t, err) + for i := 0; i < 5; i++ { + tx, ierr := fromEnd.Read() + require.NoError(t, ierr) + assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) + } + _, err = fromEnd.Read() + require.ErrorIs(t, err, io.EOF) + + // ensure that rewinds work after EOF + + fromEnd.Rewind() + for i := 0; i < 5; i++ { + tx, ierr := fromEnd.Read() + require.NoError(t, ierr) + assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) + } + _, err = fromEnd.Read() + require.ErrorIs(t, err, io.EOF) + + // error case: too far or too close + for i := range []int{-1, 5, 6} { + _, err = NewLazyTransactionReader(ledgerCloseMeta, i) + require.Error(t, err) + } +} From 71408554a742ed03cac72544abf60c7fc5433718 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 11 Apr 2024 12:39:07 -0700 Subject: [PATCH 3/9] Add details to changelog (we haven't released in ages lol) and readme --- ingest/CHANGELOG.md | 19 ++++++++++++++++--- ingest/README.md | 26 +++++++++++++------------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index edd428a62f..8e960cea7a 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -5,14 +5,27 @@ All notable changes to this project will be documented in this file. This projec ## Unreleased -* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050) - ### New Features -* **Performance improvement**: the Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later. +* Support for Soroban and Protocol 20! +* There is now a `LazyTransactionReader` that provides transactions starting from any part of the ledger and will only transform the ones that are being used [5274](https://github.com/stellar/go/pull/5274). +* `Change` now has a canonical stringification and a set of them is deterministically sortable. +* `NewCompactingChangeReader` will give you a wrapped `ChangeReader` that compacts the changes. +* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050). + +### Performance Improvements +* The Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later. +* There have been miscallaneous memory and processing speed improvements. ### Bug Fixes * The Stellar Core runner now parses logs from its underlying subprocess better [#3746](https://github.com/stellar/go/pull/3746). +* Ensures that the underlying Stellar Core is terminated before restarting. +* Backends will now connect with a user agent. +* Better handling of various error and restart scenarios. +### Breaking Changes +* **Captive Core is now the only available backend.** +* The Captive Core configuration should be provided via a TOML file. +* `Change.AccountSignersChanged` has been removed. ## v2.0.0 diff --git a/ingest/README.md b/ingest/README.md index cf3d38f8da..f05dbc7ed0 100644 --- a/ingest/README.md +++ b/ingest/README.md @@ -10,22 +10,22 @@ From a high level, the ingestion library is broken down into a few modular compo ``` [ Processors ] - | - / \ - / \ - / \ - [Change] [Transaction] - | | - |---+---| | - Checkpoint Ledger Ledger - Change Change Transaction - Reader Reader Reader + _|_ + / \ + / \ + / \ + [Change] [Transaction] + | | + |---+---| |-------+----| + Checkpoint Ledger Ledger Lazy + Change Change Transaction Transaction + Reader Reader Reader Reader [ Ledger Backend ] | | - Captive - Core + Captive + Core ``` This is described in a little more detail in [`doc.go`](./doc.go), its accompanying examples, the documentation within this package, and the rest of this tutorial. @@ -290,7 +290,7 @@ First thing's first: we need to establish a connection to a history archive. ``` ## Tracking Changes -Each history archive contains the current cumulative state of the entire network. +Each history archive contains the current cumulative state of the entire network. Now we can use the history archive to actually read in all of the changes that have accumulated in the entire network by a particular checkpoint. From 19d5f27e864b7ebc9b81568c5ca991a77486d2e5 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 11 Apr 2024 12:51:01 -0700 Subject: [PATCH 4/9] Add comments plus hash-based envelope lookup mapping Halve memory usage by keeping refs to envelopes, instead --- ingest/lazy_transaction_reader.go | 88 +++++++++---- ingest/lazy_transaction_reader_test.go | 175 +++++++++++++++++-------- 2 files changed, 183 insertions(+), 80 deletions(-) diff --git a/ingest/lazy_transaction_reader.go b/ingest/lazy_transaction_reader.go index 201785b3dd..262d7cf186 100644 --- a/ingest/lazy_transaction_reader.go +++ b/ingest/lazy_transaction_reader.go @@ -3,6 +3,7 @@ package ingest import ( "io" + "github.com/stellar/go/network" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -12,18 +13,27 @@ import ( // structures are only created when you actually request a read for that // particular index. type LazyTransactionReader struct { - lcm xdr.LedgerCloseMeta - start int // read-only - - transactions []LedgerTransaction // cached for Rewind() calls - lastRead int // cycles through ^ + lcm xdr.LedgerCloseMeta + start int // read-only + passphrase string // read-only + + // we keep a mapping of hashes to envelope refs in the ledger meta, this is + // fine since both have the same scope and we don't want to unnecessarily + // keep two copies + envelopesByHash map[xdr.Hash]*xdr.TransactionEnvelope + transactions []LedgerTransaction // cached for Rewind() calls + lastRead int // cycles through ^ } // NewLazyTransactionReader creates a new reader instance from raw -// xdr.LedgerCloseMeta starting at a particular transaction index. Note that -// LazyTransactionReader is not thread safe and should not be shared by multiple -// goroutines. -func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) (*LazyTransactionReader, error) { +// xdr.LedgerCloseMeta starting at a particular transaction index (0-based). +// Note that LazyTransactionReader is not thread safe and should not be shared +// by multiple goroutines. +func NewLazyTransactionReader( + ledgerCloseMeta xdr.LedgerCloseMeta, + passphrase string, + start int, +) (*LazyTransactionReader, error) { if start >= ledgerCloseMeta.CountTransactions() || start < 0 { return nil, errors.New("'start' index exceeds ledger transaction count") } @@ -32,11 +42,30 @@ func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) (* return nil, errors.New("LazyTransactionReader only works from Protocol 20 onward") } - return &LazyTransactionReader{ - lcm: ledgerCloseMeta, - start: start, - lastRead: -1, // haven't started yet - }, nil + lazy := &LazyTransactionReader{ + lcm: ledgerCloseMeta, + passphrase: passphrase, + start: start, + lastRead: -1, // haven't started yet + + envelopesByHash: make( + map[xdr.Hash]*xdr.TransactionEnvelope, + ledgerCloseMeta.CountTransactions(), + ), + } + + // See https://github.com/stellar/go/pull/2720: envelopes in the meta (which + // just come straight from the agreed-upon transaction set) are not in the + // same order as the actual list of metas (which are sorted by hash), so we + // need to hash the envelopes *first* to properly associate them with their + // metas. + for _, txEnv := range ledgerCloseMeta.TransactionEnvelopes() { + // we know that these are proper envelopes so errors aren't possible + hash, _ := network.HashTransactionInEnvelope(txEnv, passphrase) + lazy.envelopesByHash[xdr.Hash(hash)] = &txEnv + } + + return lazy, nil } // GetSequence returns the sequence number of the ledger data stored by this object. @@ -66,36 +95,39 @@ func (reader *LazyTransactionReader) Read() (LedgerTransaction, error) { } } - lcm := reader.lcm + // if it doesn't exist we're in BIG trouble elsewhere anyway + envelope := reader.envelopesByHash[reader.lcm.TransactionHash(i)] reader.lastRead = i - envelope := lcm.TransactionEnvelopes()[i] - cachedIdx := (i - reader.start + txCount /* to fix negatives */) % txCount + // Caching begins from `start`, so we need to properly offset into the + // cached array to correlate the actual transaction index, which is by doing + // `i-start`. We also have to have +txCount to fix negative offsets: mod (%) + // in Go does not make it positive. + cachedIdx := (i - reader.start + txCount) % txCount if cachedIdx < len(reader.transactions) { // cached? return immediately return reader.transactions[cachedIdx], nil } newTx := LedgerTransaction{ Index: uint32(i + 1), // Transactions start at '1' - Envelope: envelope, - Result: lcm.TransactionResultPair(i), - UnsafeMeta: lcm.TxApplyProcessing(i), - FeeChanges: lcm.FeeProcessing(i), - LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), + Envelope: *envelope, + Result: reader.lcm.TransactionResultPair(i), + UnsafeMeta: reader.lcm.TxApplyProcessing(i), + FeeChanges: reader.lcm.FeeProcessing(i), + LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), } reader.transactions = append(reader.transactions, newTx) return newTx, nil } -// Rewind resets the reader back to the first transaction in the ledger +// Rewind resets the reader back to the first transaction in the ledger, +// or to `start` if you did not initialize the instance with 0. func (reader *LazyTransactionReader) Rewind() { reader.lastRead = -1 } -// Close should be called when reading is finished. This is especially -// helpful when there are still some transactions available so reader can stop -// streaming them. -func (reader *LazyTransactionReader) Close() error { +// Close should be called when reading is finished to clean up memory. +func (reader *LazyTransactionReader) Close() { + reader.Rewind() reader.transactions = nil - return nil } diff --git a/ingest/lazy_transaction_reader_test.go b/ingest/lazy_transaction_reader_test.go index 3b8c488814..596116b17a 100644 --- a/ingest/lazy_transaction_reader_test.go +++ b/ingest/lazy_transaction_reader_test.go @@ -4,96 +4,167 @@ import ( "io" "testing" + "github.com/stellar/go/network" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -var txMeta = xdr.TransactionResultMeta{ - TxApplyProcessing: xdr.TransactionMeta{ - V: 3, - V3: &xdr.TransactionMetaV3{}, - }, -} -var txEnv = xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{}, - }, -} - -// barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs -var ledgerCloseMeta = xdr.LedgerCloseMeta{ - V: 1, - V1: &xdr.LedgerCloseMetaV1{ - Ext: xdr.ExtensionPoint{V: 0}, - TxProcessing: []xdr.TransactionResultMeta{ - txMeta, - txMeta, - txMeta, - txMeta, - txMeta, +var ( + passphrase = network.TestNetworkPassphrase + // Test prep: + // - two different envelopes which resolve to two different hashes + // - two basically-empty metas that contain the corresponding hashes + // - a ledger that has 5 txs with metas corresponding to these two envs + // - specifically, in the order [first, first, second, second, second] + // + // This tests both hash <--> envelope mapping and indexed iteration. + txEnv1 = xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Ext: xdr.TransactionExt{V: 0}, + SourceAccount: xdr.MustMuxedAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Operations: []xdr.Operation{}, + Fee: 123, + SeqNum: 0, + }, + Signatures: []xdr.DecoratedSignature{}, }, - TxSet: xdr.GeneralizedTransactionSet{ - V: 1, - V1TxSet: &xdr.TransactionSetV1{ - Phases: []xdr.TransactionPhase{{ - V: 0, - V0Components: &[]xdr.TxSetComponent{{ - TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ - Txs: []xdr.TransactionEnvelope{ - txEnv, - txEnv, - txEnv, - txEnv, - txEnv, + } + txEnv2 = xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Ext: xdr.TransactionExt{V: 0}, + SourceAccount: xdr.MustMuxedAddress("GCO26ZSBD63TKYX45H2C7D2WOFWOUSG5BMTNC3BG4QMXM3PAYI6WHKVZ"), + Operations: []xdr.Operation{}, + Fee: 456, + SeqNum: 0, + }, + Signatures: []xdr.DecoratedSignature{}, + }, + } + txHash1, _ = network.HashTransactionInEnvelope(txEnv1, passphrase) + txHash2, _ = network.HashTransactionInEnvelope(txEnv2, passphrase) + txMeta1 = xdr.TransactionResultMeta{ + Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash1)}, + TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, + } + txMeta2 = xdr.TransactionResultMeta{ + Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash2)}, + TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, + } + // barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs + ledgerCloseMeta = xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + Ext: xdr.ExtensionPoint{V: 0}, + TxProcessing: []xdr.TransactionResultMeta{ + txMeta1, + txMeta1, + txMeta2, + txMeta2, + txMeta2, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{ + Phases: []xdr.TransactionPhase{{ + V: 0, + V0Components: &[]xdr.TxSetComponent{{ + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{ + txEnv1, + txEnv1, + txEnv2, + txEnv2, + txEnv2, + }, }, - }, + }}, }}, - }}, + }, + }, + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{LedgerVersion: 20}, }, }, - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{LedgerVersion: 20}, - }, - }, -} + } +) func TestLazyTransactionReader(t *testing.T) { - require.True(t, true) + require.NotEqual(t, + txHash1, txHash2, + "precondition of different hashes violated: env1=%+v, env2=%+v", + txEnv1, txEnv2) // simplest case: read from start - fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, 0) + fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 0) require.NoError(t, err) for i := 0; i < 5; i++ { tx, ierr := fromZero.Read() require.NoError(t, ierr) assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + if tx.Index >= 3 { + assert.Equal(t, txEnv2, tx.Envelope) + assert.Equal(t, txHash2, thisHash) + } else { + assert.Equal(t, txEnv1, tx.Envelope) + assert.Equal(t, txHash1, thisHash) + } } _, err = fromZero.Read() require.ErrorIs(t, err, io.EOF) // start reading from the middle set of txs - fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, 2) + fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 2) require.NoError(t, err) for i := 0; i < 5; i++ { tx, ierr := fromMiddle.Read() require.NoError(t, ierr) - assert.EqualValues(t, 1+((i+2)%5), tx.Index, "iteration i=%d", i) + assert.EqualValues(t, + /* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */ + 1+(i+2)%5, + tx.Index, + "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + if tx.Index >= 3 { + assert.Equal(t, txEnv2, tx.Envelope) + assert.Equal(t, txHash2, thisHash) + } else { + assert.Equal(t, txEnv1, tx.Envelope) + assert.Equal(t, txHash1, thisHash) + } } _, err = fromMiddle.Read() require.ErrorIs(t, err, io.EOF) // edge case: start from the last tx - fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, 4) + fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 4) require.NoError(t, err) for i := 0; i < 5; i++ { tx, ierr := fromEnd.Read() require.NoError(t, ierr) assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + if tx.Index >= 3 { + assert.Equal(t, txEnv2, tx.Envelope) + assert.Equal(t, txHash2, thisHash) + } else { + assert.Equal(t, txEnv1, tx.Envelope) + assert.Equal(t, txHash1, thisHash) + } } _, err = fromEnd.Read() require.ErrorIs(t, err, io.EOF) @@ -110,8 +181,8 @@ func TestLazyTransactionReader(t *testing.T) { require.ErrorIs(t, err, io.EOF) // error case: too far or too close - for i := range []int{-1, 5, 6} { - _, err = NewLazyTransactionReader(ledgerCloseMeta, i) - require.Error(t, err) + for _, idx := range []int{-1, 5, 6} { + _, err = NewLazyTransactionReader(ledgerCloseMeta, passphrase, idx) + require.Error(t, err, "no error when trying start=%d", idx) } } From 2cd0d688108b4274a94fbf6cc27f56aa9503aa95 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 15 Apr 2024 15:45:03 -0700 Subject: [PATCH 5/9] Adapt LedgerTransactionReader to support seeking and on-the-fly reads --- ingest/lazy_transaction_reader.go | 133 ----------------- ingest/lazy_transaction_reader_test.go | 188 ------------------------- ingest/ledger_change_reader.go | 6 +- ingest/ledger_transaction_reader.go | 146 ++++++++++++------- 4 files changed, 95 insertions(+), 378 deletions(-) delete mode 100644 ingest/lazy_transaction_reader.go delete mode 100644 ingest/lazy_transaction_reader_test.go diff --git a/ingest/lazy_transaction_reader.go b/ingest/lazy_transaction_reader.go deleted file mode 100644 index 262d7cf186..0000000000 --- a/ingest/lazy_transaction_reader.go +++ /dev/null @@ -1,133 +0,0 @@ -package ingest - -import ( - "io" - - "github.com/stellar/go/network" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" -) - -// LazyTransactionReader supports reading ranges of transactions from a raw -// LedgerCloseMeta instance in a "lazy" fashion, meaning the transaction -// structures are only created when you actually request a read for that -// particular index. -type LazyTransactionReader struct { - lcm xdr.LedgerCloseMeta - start int // read-only - passphrase string // read-only - - // we keep a mapping of hashes to envelope refs in the ledger meta, this is - // fine since both have the same scope and we don't want to unnecessarily - // keep two copies - envelopesByHash map[xdr.Hash]*xdr.TransactionEnvelope - transactions []LedgerTransaction // cached for Rewind() calls - lastRead int // cycles through ^ -} - -// NewLazyTransactionReader creates a new reader instance from raw -// xdr.LedgerCloseMeta starting at a particular transaction index (0-based). -// Note that LazyTransactionReader is not thread safe and should not be shared -// by multiple goroutines. -func NewLazyTransactionReader( - ledgerCloseMeta xdr.LedgerCloseMeta, - passphrase string, - start int, -) (*LazyTransactionReader, error) { - if start >= ledgerCloseMeta.CountTransactions() || start < 0 { - return nil, errors.New("'start' index exceeds ledger transaction count") - } - - if ledgerCloseMeta.ProtocolVersion() < 20 { - return nil, errors.New("LazyTransactionReader only works from Protocol 20 onward") - } - - lazy := &LazyTransactionReader{ - lcm: ledgerCloseMeta, - passphrase: passphrase, - start: start, - lastRead: -1, // haven't started yet - - envelopesByHash: make( - map[xdr.Hash]*xdr.TransactionEnvelope, - ledgerCloseMeta.CountTransactions(), - ), - } - - // See https://github.com/stellar/go/pull/2720: envelopes in the meta (which - // just come straight from the agreed-upon transaction set) are not in the - // same order as the actual list of metas (which are sorted by hash), so we - // need to hash the envelopes *first* to properly associate them with their - // metas. - for _, txEnv := range ledgerCloseMeta.TransactionEnvelopes() { - // we know that these are proper envelopes so errors aren't possible - hash, _ := network.HashTransactionInEnvelope(txEnv, passphrase) - lazy.envelopesByHash[xdr.Hash(hash)] = &txEnv - } - - return lazy, nil -} - -// GetSequence returns the sequence number of the ledger data stored by this object. -func (reader *LazyTransactionReader) GetSequence() uint32 { - return reader.lcm.LedgerSequence() -} - -// GetHeader returns the XDR Header data associated with the stored ledger. -func (reader *LazyTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return reader.lcm.LedgerHeaderHistoryEntry() -} - -// Read returns the next transaction in the ledger, ordered by tx number, each -// time it is called. When there are no more transactions to return, an EOF -// error is returned. Note that if you initialized the reader from a non-zero -// index, it will EOF when it cycles back to the start rather than when it -// reaches the end. -func (reader *LazyTransactionReader) Read() (LedgerTransaction, error) { - txCount := reader.lcm.CountTransactions() - - i := reader.start // assume first time - if reader.lastRead != -1 { - i = (reader.lastRead + 1) % txCount - if i == reader.start { // cycle, so rewind but mark as EOF - reader.Rewind() - return LedgerTransaction{}, io.EOF - } - } - - // if it doesn't exist we're in BIG trouble elsewhere anyway - envelope := reader.envelopesByHash[reader.lcm.TransactionHash(i)] - reader.lastRead = i - - // Caching begins from `start`, so we need to properly offset into the - // cached array to correlate the actual transaction index, which is by doing - // `i-start`. We also have to have +txCount to fix negative offsets: mod (%) - // in Go does not make it positive. - cachedIdx := (i - reader.start + txCount) % txCount - if cachedIdx < len(reader.transactions) { // cached? return immediately - return reader.transactions[cachedIdx], nil - } - - newTx := LedgerTransaction{ - Index: uint32(i + 1), // Transactions start at '1' - Envelope: *envelope, - Result: reader.lcm.TransactionResultPair(i), - UnsafeMeta: reader.lcm.TxApplyProcessing(i), - FeeChanges: reader.lcm.FeeProcessing(i), - LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), - } - reader.transactions = append(reader.transactions, newTx) - return newTx, nil -} - -// Rewind resets the reader back to the first transaction in the ledger, -// or to `start` if you did not initialize the instance with 0. -func (reader *LazyTransactionReader) Rewind() { - reader.lastRead = -1 -} - -// Close should be called when reading is finished to clean up memory. -func (reader *LazyTransactionReader) Close() { - reader.Rewind() - reader.transactions = nil -} diff --git a/ingest/lazy_transaction_reader_test.go b/ingest/lazy_transaction_reader_test.go deleted file mode 100644 index 596116b17a..0000000000 --- a/ingest/lazy_transaction_reader_test.go +++ /dev/null @@ -1,188 +0,0 @@ -package ingest - -import ( - "io" - "testing" - - "github.com/stellar/go/network" - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var ( - passphrase = network.TestNetworkPassphrase - // Test prep: - // - two different envelopes which resolve to two different hashes - // - two basically-empty metas that contain the corresponding hashes - // - a ledger that has 5 txs with metas corresponding to these two envs - // - specifically, in the order [first, first, second, second, second] - // - // This tests both hash <--> envelope mapping and indexed iteration. - txEnv1 = xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - Ext: xdr.TransactionExt{V: 0}, - SourceAccount: xdr.MustMuxedAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), - Operations: []xdr.Operation{}, - Fee: 123, - SeqNum: 0, - }, - Signatures: []xdr.DecoratedSignature{}, - }, - } - txEnv2 = xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - Ext: xdr.TransactionExt{V: 0}, - SourceAccount: xdr.MustMuxedAddress("GCO26ZSBD63TKYX45H2C7D2WOFWOUSG5BMTNC3BG4QMXM3PAYI6WHKVZ"), - Operations: []xdr.Operation{}, - Fee: 456, - SeqNum: 0, - }, - Signatures: []xdr.DecoratedSignature{}, - }, - } - txHash1, _ = network.HashTransactionInEnvelope(txEnv1, passphrase) - txHash2, _ = network.HashTransactionInEnvelope(txEnv2, passphrase) - txMeta1 = xdr.TransactionResultMeta{ - Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash1)}, - TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, - } - txMeta2 = xdr.TransactionResultMeta{ - Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash2)}, - TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, - } - // barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs - ledgerCloseMeta = xdr.LedgerCloseMeta{ - V: 1, - V1: &xdr.LedgerCloseMetaV1{ - Ext: xdr.ExtensionPoint{V: 0}, - TxProcessing: []xdr.TransactionResultMeta{ - txMeta1, - txMeta1, - txMeta2, - txMeta2, - txMeta2, - }, - TxSet: xdr.GeneralizedTransactionSet{ - V: 1, - V1TxSet: &xdr.TransactionSetV1{ - Phases: []xdr.TransactionPhase{{ - V: 0, - V0Components: &[]xdr.TxSetComponent{{ - TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ - Txs: []xdr.TransactionEnvelope{ - txEnv1, - txEnv1, - txEnv2, - txEnv2, - txEnv2, - }, - }, - }}, - }}, - }, - }, - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{LedgerVersion: 20}, - }, - }, - } -) - -func TestLazyTransactionReader(t *testing.T) { - require.NotEqual(t, - txHash1, txHash2, - "precondition of different hashes violated: env1=%+v, env2=%+v", - txEnv1, txEnv2) - - // simplest case: read from start - - fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 0) - require.NoError(t, err) - for i := 0; i < 5; i++ { - tx, ierr := fromZero.Read() - require.NoError(t, ierr) - assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i) - - thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) - require.NoError(t, ierr) - if tx.Index >= 3 { - assert.Equal(t, txEnv2, tx.Envelope) - assert.Equal(t, txHash2, thisHash) - } else { - assert.Equal(t, txEnv1, tx.Envelope) - assert.Equal(t, txHash1, thisHash) - } - } - _, err = fromZero.Read() - require.ErrorIs(t, err, io.EOF) - - // start reading from the middle set of txs - - fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 2) - require.NoError(t, err) - for i := 0; i < 5; i++ { - tx, ierr := fromMiddle.Read() - require.NoError(t, ierr) - assert.EqualValues(t, - /* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */ - 1+(i+2)%5, - tx.Index, - "iteration i=%d", i) - - thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) - require.NoError(t, ierr) - if tx.Index >= 3 { - assert.Equal(t, txEnv2, tx.Envelope) - assert.Equal(t, txHash2, thisHash) - } else { - assert.Equal(t, txEnv1, tx.Envelope) - assert.Equal(t, txHash1, thisHash) - } - } - _, err = fromMiddle.Read() - require.ErrorIs(t, err, io.EOF) - - // edge case: start from the last tx - - fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 4) - require.NoError(t, err) - for i := 0; i < 5; i++ { - tx, ierr := fromEnd.Read() - require.NoError(t, ierr) - assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) - - thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) - require.NoError(t, ierr) - if tx.Index >= 3 { - assert.Equal(t, txEnv2, tx.Envelope) - assert.Equal(t, txHash2, thisHash) - } else { - assert.Equal(t, txEnv1, tx.Envelope) - assert.Equal(t, txHash1, thisHash) - } - } - _, err = fromEnd.Read() - require.ErrorIs(t, err, io.EOF) - - // ensure that rewinds work after EOF - - fromEnd.Rewind() - for i := 0; i < 5; i++ { - tx, ierr := fromEnd.Read() - require.NoError(t, ierr) - assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) - } - _, err = fromEnd.Read() - require.ErrorIs(t, err, io.EOF) - - // error case: too far or too close - for _, idx := range []int{-1, 5, 6} { - _, err = NewLazyTransactionReader(ledgerCloseMeta, passphrase, idx) - require.Error(t, err, "no error when trying start=%d", idx) - } -} diff --git a/ingest/ledger_change_reader.go b/ingest/ledger_change_reader.go index a282a04d87..496dc98b40 100644 --- a/ingest/ledger_change_reader.go +++ b/ingest/ledger_change_reader.go @@ -176,7 +176,7 @@ func (r *LedgerChangeReader) Read() (Change, error) { } return r.Read() case evictionChangesState: - entries, err := r.ledgerCloseMeta.EvictedPersistentLedgerEntries() + entries, err := r.lcm.EvictedPersistentLedgerEntries() if err != nil { return Change{}, err } @@ -196,9 +196,9 @@ func (r *LedgerChangeReader) Read() (Change, error) { return r.Read() case upgradeChangesState: // Get upgrade changes - if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()) { + if r.upgradeIndex < len(r.LedgerTransactionReader.lcm.UpgradesProcessing()) { changes := GetChangesFromLedgerEntryChanges( - r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()[r.upgradeIndex].Changes, + r.LedgerTransactionReader.lcm.UpgradesProcessing()[r.upgradeIndex].Changes, ) r.pending = append(r.pending, changes...) r.upgradeIndex++ diff --git a/ingest/ledger_transaction_reader.go b/ingest/ledger_transaction_reader.go index 8199309944..67afde86c0 100644 --- a/ingest/ledger_transaction_reader.go +++ b/ingest/ledger_transaction_reader.go @@ -11,17 +11,25 @@ import ( "github.com/stellar/go/xdr" ) -// LedgerTransactionReader reads transactions for a given ledger sequence from a backend. -// Use NewTransactionReader to create a new instance. +// LedgerTransactionReader reads transactions for a given ledger sequence from a +// backend. Use NewTransactionReader to create a new instance. type LedgerTransactionReader struct { - ledgerCloseMeta xdr.LedgerCloseMeta - transactions []LedgerTransaction - readIdx int + lcm xdr.LedgerCloseMeta // read-only + envelopesByHash map[xdr.Hash]xdr.TransactionEnvelope // set once + + txByIdx map[int]LedgerTransaction // cache + readIdx int // tracks iteration & seeking } -// NewLedgerTransactionReader creates a new TransactionReader instance. -// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines. -func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) { +// NewLedgerTransactionReader creates a new TransactionReader instance. Note +// that TransactionReader is not thread safe and should not be shared by +// multiple goroutines. +func NewLedgerTransactionReader( + ctx context.Context, + backend ledgerbackend.LedgerBackend, + networkPassphrase string, + sequence uint32, +) (*LedgerTransactionReader, error) { ledgerCloseMeta, err := backend.GetLedger(ctx, sequence) if err != nil { return nil, errors.Wrap(err, "error getting ledger from the backend") @@ -30,11 +38,21 @@ func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.Ledge return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) } -// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader instance from xdr.LedgerCloseMeta. -// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines. -func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error) { - reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta} - if err := reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil { +// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader +// instance from xdr.LedgerCloseMeta. Note that TransactionReader is not thread +// safe and should not be shared by multiple goroutines. +func NewLedgerTransactionReaderFromLedgerCloseMeta( + networkPassphrase string, + ledgerCloseMeta xdr.LedgerCloseMeta, +) (*LedgerTransactionReader, error) { + reader := &LedgerTransactionReader{ + lcm: ledgerCloseMeta, + txByIdx: make(map[int]LedgerTransaction, ledgerCloseMeta.CountTransactions()), + envelopesByHash: make(map[xdr.Hash]xdr.TransactionEnvelope, ledgerCloseMeta.CountTransactions()), + readIdx: 0, + } + + if err := reader.storeTransactions(networkPassphrase); err != nil { return nil, errors.Wrap(err, "error extracting transactions from ledger close meta") } return reader, nil @@ -42,68 +60,87 @@ func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, led // GetSequence returns the sequence number of the ledger data stored by this object. func (reader *LedgerTransactionReader) GetSequence() uint32 { - return reader.ledgerCloseMeta.LedgerSequence() + return reader.lcm.LedgerSequence() } // GetHeader returns the XDR Header data associated with the stored ledger. func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return reader.ledgerCloseMeta.LedgerHeaderHistoryEntry() + return reader.lcm.LedgerHeaderHistoryEntry() } // Read returns the next transaction in the ledger, ordered by tx number, each time // it is called. When there are no more transactions to return, an EOF error is returned. func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { - if reader.readIdx < len(reader.transactions) { - reader.readIdx++ - return reader.transactions[reader.readIdx-1], nil + i := reader.readIdx + if i >= reader.lcm.CountTransactions() { + return LedgerTransaction{}, io.EOF + } + reader.readIdx++ + + hash := reader.lcm.TransactionHash(i) + envelope, ok := reader.envelopesByHash[hash] + if !ok { + hexHash := hex.EncodeToString(hash[:]) + return LedgerTransaction{}, errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash) + } + + // We check the version only if FeeProcessing are non empty because some backends + // (like HistoryArchiveBackend) do not return meta. + if reader.lcm.ProtocolVersion() < 10 && reader.lcm.TxApplyProcessing(i).V < 2 && + len(reader.lcm.FeeProcessing(i)) > 0 { + return LedgerTransaction{}, errors.New( + "TransactionMeta.V=2 is required in protocol version older than version 10. " + + "Please process ledgers again using the latest stellar-core version.", + ) + } + + if ledgerTx, ok := reader.txByIdx[i]; ok { + return ledgerTx, nil + } + // generate and cache if not found + ledgerTx := LedgerTransaction{ + Index: uint32(i + 1), // Transactions start at '1' + Envelope: envelope, + Result: reader.lcm.TransactionResultPair(i), + UnsafeMeta: reader.lcm.TxApplyProcessing(i), + FeeChanges: reader.lcm.FeeProcessing(i), + LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), } - return LedgerTransaction{}, io.EOF + reader.txByIdx[i] = ledgerTx + return ledgerTx, nil } // Rewind resets the reader back to the first transaction in the ledger func (reader *LedgerTransactionReader) Rewind() { - reader.readIdx = 0 + reader.Seek(0) } -// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide -// a per-transaction view of the data when Read() is called. -func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta, networkPassphrase string) error { - byHash := map[xdr.Hash]xdr.TransactionEnvelope{} - for i, tx := range lcm.TransactionEnvelopes() { +// Seek sets the reader back to a specific transaction in the ledger +func (reader *LedgerTransactionReader) Seek(index int) error { + if index >= reader.lcm.CountTransactions() || index < 0 { + return io.EOF + } + + reader.readIdx = index + return nil +} + +// storeHashes creates a mapping between hashes and envelopes in order to +// correctly provide a per-transaction view on-the-fly when Read() is called. +func (reader *LedgerTransactionReader) storeTransactions(networkPassphrase string) error { + // See https://github.com/stellar/go/pull/2720: envelopes in the meta (which + // just come straight from the agreed-upon transaction set) are not in the + // same order as the actual list of metas (which are sorted by hash), so we + // need to hash the envelopes *first* to properly associate them with their + // metas. + for i, tx := range reader.lcm.TransactionEnvelopes() { hash, err := network.HashTransactionInEnvelope(tx, networkPassphrase) if err != nil { return errors.Wrapf(err, "could not hash transaction %d in TxSet", i) } - byHash[hash] = tx + reader.envelopesByHash[xdr.Hash(hash)] = tx } - for i := 0; i < lcm.CountTransactions(); i++ { - hash := lcm.TransactionHash(i) - envelope, ok := byHash[hash] - if !ok { - hexHash := hex.EncodeToString(hash[:]) - return errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash) - } - - // We check the version only if FeeProcessing are non empty because some backends - // (like HistoryArchiveBackend) do not return meta. - if lcm.ProtocolVersion() < 10 && lcm.TxApplyProcessing(i).V < 2 && - len(lcm.FeeProcessing(i)) > 0 { - return errors.New( - "TransactionMeta.V=2 is required in protocol version older than version 10. " + - "Please process ledgers again using the latest stellar-core version.", - ) - } - - reader.transactions = append(reader.transactions, LedgerTransaction{ - Index: uint32(i + 1), // Transactions start at '1' - Envelope: envelope, - Result: lcm.TransactionResultPair(i), - UnsafeMeta: lcm.TxApplyProcessing(i), - FeeChanges: lcm.FeeProcessing(i), - LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), - }) - } return nil } @@ -111,6 +148,7 @@ func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta // helpful when there are still some transactions available so reader can stop // streaming them. func (reader *LedgerTransactionReader) Close() error { - reader.transactions = nil + reader.envelopesByHash = nil + reader.txByIdx = nil return nil } From 592f2ba14dc7da7cc75c78c8b9972ba8429e69a8 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 15 Apr 2024 15:54:20 -0700 Subject: [PATCH 6/9] Add back the test suite (missed it in git) --- ingest/ledger_transaction_reader_test.go | 159 +++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 ingest/ledger_transaction_reader_test.go diff --git a/ingest/ledger_transaction_reader_test.go b/ingest/ledger_transaction_reader_test.go new file mode 100644 index 0000000000..e391e015d9 --- /dev/null +++ b/ingest/ledger_transaction_reader_test.go @@ -0,0 +1,159 @@ +package ingest + +import ( + "io" + "testing" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/network" + "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + passphrase = network.TestNetworkPassphrase + // Test prep: + // - two different envelopes which resolve to two different hashes + // - two basically-empty metas that contain the corresponding hashes + // - a ledger that has 5 txs with metas corresponding to these two envs + // - specifically, in the order [first, first, second, second, second] + // + // This tests both hash <--> envelope mapping and indexed iteration. + txEnvs, txHashes, txMetas = makeTransactions(5) + // barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs + ledgerCloseMeta = xdr.LedgerCloseMeta{V: 1, + V1: &xdr.LedgerCloseMetaV1{ + TxProcessing: txMetas, + TxSet: xdr.GeneralizedTransactionSet{V: 1, + V1TxSet: &xdr.TransactionSetV1{ + Phases: []xdr.TransactionPhase{{V: 0, + V0Components: &[]xdr.TxSetComponent{{ + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: txEnvs, + }}, + }, + }}, + }, + }, + }, + } +) + +func TestTransactionReader(t *testing.T) { + s := set.NewSet[xdr.Hash](5) + for _, hash := range txHashes { + s.Add(hash) + } + require.Lenf(t, s, len(txHashes), "precondition: hashes aren't unique, envs: %+v", txEnvs) + + // simplest case: read from start + + fromZero, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) + require.NoError(t, err) + for i := 0; i < 5; i++ { + tx, ierr := fromZero.Read() + require.NoError(t, ierr) + assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope) + assert.Equal(t, txHashes[tx.Index-1], thisHash) + } + _, err = fromZero.Read() + require.ErrorIs(t, err, io.EOF) + + // start reading from the middle set of txs + + fromMiddle, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) + fromMiddle.Seek(2) + require.NoError(t, err) + for i := 0; i < 2; i++ { + tx, ierr := fromMiddle.Read() + require.NoError(t, ierr) + assert.EqualValues(t, + /* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */ + 1+(i+2)%5, + tx.Index, + "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope) + assert.Equal(t, txHashes[tx.Index-1], thisHash) + } + _, err = fromMiddle.Read() + require.ErrorIs(t, err, io.EOF) + + // edge case: start from the last tx + + fromEnd, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) + fromEnd.Seek(4) + require.NoError(t, err) + tx, ierr := fromEnd.Read() + require.NoError(t, ierr) + assert.EqualValues(t, 5, tx.Index) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + assert.Equal(t, txEnvs[4], tx.Envelope) + assert.Equal(t, txHashes[4], thisHash) + _, err = fromEnd.Read() + require.ErrorIs(t, err, io.EOF) + + // ensure that rewinds work after EOF + + fromEnd.Rewind() + for i := 0; i < 5; i++ { + tx, ierr := fromEnd.Read() + require.NoError(t, ierr) + assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) + assert.Equal(t, txEnvs[i], tx.Envelope) + } + _, err = fromEnd.Read() + require.ErrorIs(t, err, io.EOF) + + // error case: too far or too close + for _, idx := range []int{-1, 5, 6} { + rdr, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) + require.NoError(t, err) + require.Error(t, rdr.Seek(idx), "no error when trying seek=%d", idx) + } +} + +func makeTransactions(count int) ( + envs []xdr.TransactionEnvelope, + hashes []xdr.Hash, + metas []xdr.TransactionResultMeta, +) { + seqNum := 123_456 + for i := 0; i < count; i++ { + txEnv := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Ext: xdr.TransactionExt{V: 0}, + SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), + Operations: []xdr.Operation{}, + Fee: xdr.Uint32(seqNum + i), + SeqNum: xdr.SequenceNumber(seqNum + i), + }, + Signatures: []xdr.DecoratedSignature{}, + }, + } + + txHash, _ := network.HashTransactionInEnvelope(txEnv, passphrase) + txMeta := xdr.TransactionResultMeta{ + Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash)}, + TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, + } + + envs = append(envs, txEnv) + hashes = append(hashes, txHash) + metas = append(metas, txMeta) + } + + return +} From 91d9e4d95fa1194fc84257fb959d253ccdd59b71 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 15 Apr 2024 17:11:24 -0700 Subject: [PATCH 7/9] Drop the on-the-fly conversion cache --- ingest/ledger_transaction_reader.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/ingest/ledger_transaction_reader.go b/ingest/ledger_transaction_reader.go index 67afde86c0..41903683a7 100644 --- a/ingest/ledger_transaction_reader.go +++ b/ingest/ledger_transaction_reader.go @@ -17,8 +17,7 @@ type LedgerTransactionReader struct { lcm xdr.LedgerCloseMeta // read-only envelopesByHash map[xdr.Hash]xdr.TransactionEnvelope // set once - txByIdx map[int]LedgerTransaction // cache - readIdx int // tracks iteration & seeking + readIdx int // tracks iteration & seeking } // NewLedgerTransactionReader creates a new TransactionReader instance. Note @@ -47,7 +46,6 @@ func NewLedgerTransactionReaderFromLedgerCloseMeta( ) (*LedgerTransactionReader, error) { reader := &LedgerTransactionReader{ lcm: ledgerCloseMeta, - txByIdx: make(map[int]LedgerTransaction, ledgerCloseMeta.CountTransactions()), envelopesByHash: make(map[xdr.Hash]xdr.TransactionEnvelope, ledgerCloseMeta.CountTransactions()), readIdx: 0, } @@ -71,11 +69,11 @@ func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry // Read returns the next transaction in the ledger, ordered by tx number, each time // it is called. When there are no more transactions to return, an EOF error is returned. func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { - i := reader.readIdx - if i >= reader.lcm.CountTransactions() { + if reader.readIdx >= reader.lcm.CountTransactions() { return LedgerTransaction{}, io.EOF } - reader.readIdx++ + i := reader.readIdx + reader.readIdx++ // next read will advance even on error hash := reader.lcm.TransactionHash(i) envelope, ok := reader.envelopesByHash[hash] @@ -94,20 +92,14 @@ func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { ) } - if ledgerTx, ok := reader.txByIdx[i]; ok { - return ledgerTx, nil - } - // generate and cache if not found - ledgerTx := LedgerTransaction{ + return LedgerTransaction{ Index: uint32(i + 1), // Transactions start at '1' Envelope: envelope, Result: reader.lcm.TransactionResultPair(i), UnsafeMeta: reader.lcm.TxApplyProcessing(i), FeeChanges: reader.lcm.FeeProcessing(i), LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), - } - reader.txByIdx[i] = ledgerTx - return ledgerTx, nil + }, nil } // Rewind resets the reader back to the first transaction in the ledger @@ -149,6 +141,5 @@ func (reader *LedgerTransactionReader) storeTransactions(networkPassphrase strin // streaming them. func (reader *LedgerTransactionReader) Close() error { reader.envelopesByHash = nil - reader.txByIdx = nil return nil } From da8aafa42566f5c20b543769cc3b81865c78e30a Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 15 Apr 2024 17:29:38 -0700 Subject: [PATCH 8/9] Move error check earlier, align tests with new behavior --- ingest/ledger_transaction_reader.go | 25 +++++++++------ ingest/ledger_transaction_reader_test.go | 40 +++++++----------------- 2 files changed, 27 insertions(+), 38 deletions(-) diff --git a/ingest/ledger_transaction_reader.go b/ingest/ledger_transaction_reader.go index 41903683a7..5d2ad1d237 100644 --- a/ingest/ledger_transaction_reader.go +++ b/ingest/ledger_transaction_reader.go @@ -11,6 +11,11 @@ import ( "github.com/stellar/go/xdr" ) +var badMetaVersionErr = errors.New( + "TransactionMeta.V=2 is required in protocol version older than version 10. " + + "Please process ledgers again using the latest stellar-core version.", +) + // LedgerTransactionReader reads transactions for a given ledger sequence from a // backend. Use NewTransactionReader to create a new instance. type LedgerTransactionReader struct { @@ -82,16 +87,6 @@ func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { return LedgerTransaction{}, errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash) } - // We check the version only if FeeProcessing are non empty because some backends - // (like HistoryArchiveBackend) do not return meta. - if reader.lcm.ProtocolVersion() < 10 && reader.lcm.TxApplyProcessing(i).V < 2 && - len(reader.lcm.FeeProcessing(i)) > 0 { - return LedgerTransaction{}, errors.New( - "TransactionMeta.V=2 is required in protocol version older than version 10. " + - "Please process ledgers again using the latest stellar-core version.", - ) - } - return LedgerTransaction{ Index: uint32(i + 1), // Transactions start at '1' Envelope: envelope, @@ -131,6 +126,16 @@ func (reader *LedgerTransactionReader) storeTransactions(networkPassphrase strin return errors.Wrapf(err, "could not hash transaction %d in TxSet", i) } reader.envelopesByHash[xdr.Hash(hash)] = tx + + // We check the version only if FeeProcessing is non-empty, because some + // backends (like HistoryArchiveBackend) do not return meta. + // + // Note that the ordering differences are irrelevant here because all we + // care about is checking every meta for this condition. + if reader.lcm.ProtocolVersion() < 10 && reader.lcm.TxApplyProcessing(i).V < 2 && + len(reader.lcm.FeeProcessing(i)) > 0 { + return badMetaVersionErr + } } return nil diff --git a/ingest/ledger_transaction_reader_test.go b/ingest/ledger_transaction_reader_test.go index e391e015d9..041f6a9aae 100644 --- a/ingest/ledger_transaction_reader_test.go +++ b/ingest/ledger_transaction_reader_test.go @@ -50,10 +50,11 @@ func TestTransactionReader(t *testing.T) { // simplest case: read from start - fromZero, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) + reader, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) require.NoError(t, err) + for i := 0; i < 5; i++ { - tx, ierr := fromZero.Read() + tx, ierr := reader.Read() require.NoError(t, ierr) assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i) @@ -62,16 +63,14 @@ func TestTransactionReader(t *testing.T) { assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope) assert.Equal(t, txHashes[tx.Index-1], thisHash) } - _, err = fromZero.Read() + _, err = reader.Read() require.ErrorIs(t, err, io.EOF) // start reading from the middle set of txs - fromMiddle, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) - fromMiddle.Seek(2) - require.NoError(t, err) - for i := 0; i < 2; i++ { - tx, ierr := fromMiddle.Read() + require.NoError(t, reader.Seek(2)) + for i := 0; i < 3; i++ { + tx, ierr := reader.Read() require.NoError(t, ierr) assert.EqualValues(t, /* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */ @@ -84,15 +83,12 @@ func TestTransactionReader(t *testing.T) { assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope) assert.Equal(t, txHashes[tx.Index-1], thisHash) } - _, err = fromMiddle.Read() + _, err = reader.Read() require.ErrorIs(t, err, io.EOF) // edge case: start from the last tx - - fromEnd, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) - fromEnd.Seek(4) - require.NoError(t, err) - tx, ierr := fromEnd.Read() + require.NoError(t, reader.Seek(4)) + tx, ierr := reader.Read() require.NoError(t, ierr) assert.EqualValues(t, 5, tx.Index) @@ -100,19 +96,7 @@ func TestTransactionReader(t *testing.T) { require.NoError(t, ierr) assert.Equal(t, txEnvs[4], tx.Envelope) assert.Equal(t, txHashes[4], thisHash) - _, err = fromEnd.Read() - require.ErrorIs(t, err, io.EOF) - - // ensure that rewinds work after EOF - - fromEnd.Rewind() - for i := 0; i < 5; i++ { - tx, ierr := fromEnd.Read() - require.NoError(t, ierr) - assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) - assert.Equal(t, txEnvs[i], tx.Envelope) - } - _, err = fromEnd.Read() + _, err = reader.Read() require.ErrorIs(t, err, io.EOF) // error case: too far or too close @@ -125,7 +109,7 @@ func TestTransactionReader(t *testing.T) { func makeTransactions(count int) ( envs []xdr.TransactionEnvelope, - hashes []xdr.Hash, + hashes [][32]byte, metas []xdr.TransactionResultMeta, ) { seqNum := 123_456 From d74bb98faa8ff84351960a993e9f66bc704c62c8 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Tue, 16 Apr 2024 09:07:15 -0700 Subject: [PATCH 9/9] Revert certain doc changes related to the old LazyReader --- ingest/CHANGELOG.md | 2 +- ingest/README.md | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index 8e960cea7a..1aab4ae542 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -7,7 +7,7 @@ All notable changes to this project will be documented in this file. This projec ### New Features * Support for Soroban and Protocol 20! -* There is now a `LazyTransactionReader` that provides transactions starting from any part of the ledger and will only transform the ones that are being used [5274](https://github.com/stellar/go/pull/5274). +* The `LedgerTransactionReader` now has a `Seek(index int)` method to provide reading from arbitrary parts of the ledger [5274](https://github.com/stellar/go/pull/5274). * `Change` now has a canonical stringification and a set of them is deterministically sortable. * `NewCompactingChangeReader` will give you a wrapped `ChangeReader` that compacts the changes. * Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050). diff --git a/ingest/README.md b/ingest/README.md index f05dbc7ed0..cf3d38f8da 100644 --- a/ingest/README.md +++ b/ingest/README.md @@ -10,22 +10,22 @@ From a high level, the ingestion library is broken down into a few modular compo ``` [ Processors ] - _|_ - / \ - / \ - / \ - [Change] [Transaction] - | | - |---+---| |-------+----| - Checkpoint Ledger Ledger Lazy - Change Change Transaction Transaction - Reader Reader Reader Reader + | + / \ + / \ + / \ + [Change] [Transaction] + | | + |---+---| | + Checkpoint Ledger Ledger + Change Change Transaction + Reader Reader Reader [ Ledger Backend ] | | - Captive - Core + Captive + Core ``` This is described in a little more detail in [`doc.go`](./doc.go), its accompanying examples, the documentation within this package, and the rest of this tutorial. @@ -290,7 +290,7 @@ First thing's first: we need to establish a connection to a history archive. ``` ## Tracking Changes -Each history archive contains the current cumulative state of the entire network. +Each history archive contains the current cumulative state of the entire network. Now we can use the history archive to actually read in all of the changes that have accumulated in the entire network by a particular checkpoint.