diff --git a/ingest/lazy_transaction_reader.go b/ingest/lazy_transaction_reader.go deleted file mode 100644 index 262d7cf186..0000000000 --- a/ingest/lazy_transaction_reader.go +++ /dev/null @@ -1,133 +0,0 @@ -package ingest - -import ( - "io" - - "github.com/stellar/go/network" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" -) - -// LazyTransactionReader supports reading ranges of transactions from a raw -// LedgerCloseMeta instance in a "lazy" fashion, meaning the transaction -// structures are only created when you actually request a read for that -// particular index. -type LazyTransactionReader struct { - lcm xdr.LedgerCloseMeta - start int // read-only - passphrase string // read-only - - // we keep a mapping of hashes to envelope refs in the ledger meta, this is - // fine since both have the same scope and we don't want to unnecessarily - // keep two copies - envelopesByHash map[xdr.Hash]*xdr.TransactionEnvelope - transactions []LedgerTransaction // cached for Rewind() calls - lastRead int // cycles through ^ -} - -// NewLazyTransactionReader creates a new reader instance from raw -// xdr.LedgerCloseMeta starting at a particular transaction index (0-based). -// Note that LazyTransactionReader is not thread safe and should not be shared -// by multiple goroutines. -func NewLazyTransactionReader( - ledgerCloseMeta xdr.LedgerCloseMeta, - passphrase string, - start int, -) (*LazyTransactionReader, error) { - if start >= ledgerCloseMeta.CountTransactions() || start < 0 { - return nil, errors.New("'start' index exceeds ledger transaction count") - } - - if ledgerCloseMeta.ProtocolVersion() < 20 { - return nil, errors.New("LazyTransactionReader only works from Protocol 20 onward") - } - - lazy := &LazyTransactionReader{ - lcm: ledgerCloseMeta, - passphrase: passphrase, - start: start, - lastRead: -1, // haven't started yet - - envelopesByHash: make( - map[xdr.Hash]*xdr.TransactionEnvelope, - ledgerCloseMeta.CountTransactions(), - ), - } - - // See https://github.com/stellar/go/pull/2720: envelopes in the meta (which - // just come straight from the agreed-upon transaction set) are not in the - // same order as the actual list of metas (which are sorted by hash), so we - // need to hash the envelopes *first* to properly associate them with their - // metas. - for _, txEnv := range ledgerCloseMeta.TransactionEnvelopes() { - // we know that these are proper envelopes so errors aren't possible - hash, _ := network.HashTransactionInEnvelope(txEnv, passphrase) - lazy.envelopesByHash[xdr.Hash(hash)] = &txEnv - } - - return lazy, nil -} - -// GetSequence returns the sequence number of the ledger data stored by this object. -func (reader *LazyTransactionReader) GetSequence() uint32 { - return reader.lcm.LedgerSequence() -} - -// GetHeader returns the XDR Header data associated with the stored ledger. -func (reader *LazyTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return reader.lcm.LedgerHeaderHistoryEntry() -} - -// Read returns the next transaction in the ledger, ordered by tx number, each -// time it is called. When there are no more transactions to return, an EOF -// error is returned. Note that if you initialized the reader from a non-zero -// index, it will EOF when it cycles back to the start rather than when it -// reaches the end. -func (reader *LazyTransactionReader) Read() (LedgerTransaction, error) { - txCount := reader.lcm.CountTransactions() - - i := reader.start // assume first time - if reader.lastRead != -1 { - i = (reader.lastRead + 1) % txCount - if i == reader.start { // cycle, so rewind but mark as EOF - reader.Rewind() - return LedgerTransaction{}, io.EOF - } - } - - // if it doesn't exist we're in BIG trouble elsewhere anyway - envelope := reader.envelopesByHash[reader.lcm.TransactionHash(i)] - reader.lastRead = i - - // Caching begins from `start`, so we need to properly offset into the - // cached array to correlate the actual transaction index, which is by doing - // `i-start`. We also have to have +txCount to fix negative offsets: mod (%) - // in Go does not make it positive. - cachedIdx := (i - reader.start + txCount) % txCount - if cachedIdx < len(reader.transactions) { // cached? return immediately - return reader.transactions[cachedIdx], nil - } - - newTx := LedgerTransaction{ - Index: uint32(i + 1), // Transactions start at '1' - Envelope: *envelope, - Result: reader.lcm.TransactionResultPair(i), - UnsafeMeta: reader.lcm.TxApplyProcessing(i), - FeeChanges: reader.lcm.FeeProcessing(i), - LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), - } - reader.transactions = append(reader.transactions, newTx) - return newTx, nil -} - -// Rewind resets the reader back to the first transaction in the ledger, -// or to `start` if you did not initialize the instance with 0. -func (reader *LazyTransactionReader) Rewind() { - reader.lastRead = -1 -} - -// Close should be called when reading is finished to clean up memory. -func (reader *LazyTransactionReader) Close() { - reader.Rewind() - reader.transactions = nil -} diff --git a/ingest/lazy_transaction_reader_test.go b/ingest/lazy_transaction_reader_test.go deleted file mode 100644 index 596116b17a..0000000000 --- a/ingest/lazy_transaction_reader_test.go +++ /dev/null @@ -1,188 +0,0 @@ -package ingest - -import ( - "io" - "testing" - - "github.com/stellar/go/network" - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var ( - passphrase = network.TestNetworkPassphrase - // Test prep: - // - two different envelopes which resolve to two different hashes - // - two basically-empty metas that contain the corresponding hashes - // - a ledger that has 5 txs with metas corresponding to these two envs - // - specifically, in the order [first, first, second, second, second] - // - // This tests both hash <--> envelope mapping and indexed iteration. - txEnv1 = xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - Ext: xdr.TransactionExt{V: 0}, - SourceAccount: xdr.MustMuxedAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), - Operations: []xdr.Operation{}, - Fee: 123, - SeqNum: 0, - }, - Signatures: []xdr.DecoratedSignature{}, - }, - } - txEnv2 = xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{ - Ext: xdr.TransactionExt{V: 0}, - SourceAccount: xdr.MustMuxedAddress("GCO26ZSBD63TKYX45H2C7D2WOFWOUSG5BMTNC3BG4QMXM3PAYI6WHKVZ"), - Operations: []xdr.Operation{}, - Fee: 456, - SeqNum: 0, - }, - Signatures: []xdr.DecoratedSignature{}, - }, - } - txHash1, _ = network.HashTransactionInEnvelope(txEnv1, passphrase) - txHash2, _ = network.HashTransactionInEnvelope(txEnv2, passphrase) - txMeta1 = xdr.TransactionResultMeta{ - Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash1)}, - TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, - } - txMeta2 = xdr.TransactionResultMeta{ - Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash2)}, - TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, - } - // barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs - ledgerCloseMeta = xdr.LedgerCloseMeta{ - V: 1, - V1: &xdr.LedgerCloseMetaV1{ - Ext: xdr.ExtensionPoint{V: 0}, - TxProcessing: []xdr.TransactionResultMeta{ - txMeta1, - txMeta1, - txMeta2, - txMeta2, - txMeta2, - }, - TxSet: xdr.GeneralizedTransactionSet{ - V: 1, - V1TxSet: &xdr.TransactionSetV1{ - Phases: []xdr.TransactionPhase{{ - V: 0, - V0Components: &[]xdr.TxSetComponent{{ - TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ - Txs: []xdr.TransactionEnvelope{ - txEnv1, - txEnv1, - txEnv2, - txEnv2, - txEnv2, - }, - }, - }}, - }}, - }, - }, - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{LedgerVersion: 20}, - }, - }, - } -) - -func TestLazyTransactionReader(t *testing.T) { - require.NotEqual(t, - txHash1, txHash2, - "precondition of different hashes violated: env1=%+v, env2=%+v", - txEnv1, txEnv2) - - // simplest case: read from start - - fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 0) - require.NoError(t, err) - for i := 0; i < 5; i++ { - tx, ierr := fromZero.Read() - require.NoError(t, ierr) - assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i) - - thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) - require.NoError(t, ierr) - if tx.Index >= 3 { - assert.Equal(t, txEnv2, tx.Envelope) - assert.Equal(t, txHash2, thisHash) - } else { - assert.Equal(t, txEnv1, tx.Envelope) - assert.Equal(t, txHash1, thisHash) - } - } - _, err = fromZero.Read() - require.ErrorIs(t, err, io.EOF) - - // start reading from the middle set of txs - - fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 2) - require.NoError(t, err) - for i := 0; i < 5; i++ { - tx, ierr := fromMiddle.Read() - require.NoError(t, ierr) - assert.EqualValues(t, - /* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */ - 1+(i+2)%5, - tx.Index, - "iteration i=%d", i) - - thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) - require.NoError(t, ierr) - if tx.Index >= 3 { - assert.Equal(t, txEnv2, tx.Envelope) - assert.Equal(t, txHash2, thisHash) - } else { - assert.Equal(t, txEnv1, tx.Envelope) - assert.Equal(t, txHash1, thisHash) - } - } - _, err = fromMiddle.Read() - require.ErrorIs(t, err, io.EOF) - - // edge case: start from the last tx - - fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 4) - require.NoError(t, err) - for i := 0; i < 5; i++ { - tx, ierr := fromEnd.Read() - require.NoError(t, ierr) - assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) - - thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) - require.NoError(t, ierr) - if tx.Index >= 3 { - assert.Equal(t, txEnv2, tx.Envelope) - assert.Equal(t, txHash2, thisHash) - } else { - assert.Equal(t, txEnv1, tx.Envelope) - assert.Equal(t, txHash1, thisHash) - } - } - _, err = fromEnd.Read() - require.ErrorIs(t, err, io.EOF) - - // ensure that rewinds work after EOF - - fromEnd.Rewind() - for i := 0; i < 5; i++ { - tx, ierr := fromEnd.Read() - require.NoError(t, ierr) - assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) - } - _, err = fromEnd.Read() - require.ErrorIs(t, err, io.EOF) - - // error case: too far or too close - for _, idx := range []int{-1, 5, 6} { - _, err = NewLazyTransactionReader(ledgerCloseMeta, passphrase, idx) - require.Error(t, err, "no error when trying start=%d", idx) - } -} diff --git a/ingest/ledger_change_reader.go b/ingest/ledger_change_reader.go index a282a04d87..496dc98b40 100644 --- a/ingest/ledger_change_reader.go +++ b/ingest/ledger_change_reader.go @@ -176,7 +176,7 @@ func (r *LedgerChangeReader) Read() (Change, error) { } return r.Read() case evictionChangesState: - entries, err := r.ledgerCloseMeta.EvictedPersistentLedgerEntries() + entries, err := r.lcm.EvictedPersistentLedgerEntries() if err != nil { return Change{}, err } @@ -196,9 +196,9 @@ func (r *LedgerChangeReader) Read() (Change, error) { return r.Read() case upgradeChangesState: // Get upgrade changes - if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()) { + if r.upgradeIndex < len(r.LedgerTransactionReader.lcm.UpgradesProcessing()) { changes := GetChangesFromLedgerEntryChanges( - r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()[r.upgradeIndex].Changes, + r.LedgerTransactionReader.lcm.UpgradesProcessing()[r.upgradeIndex].Changes, ) r.pending = append(r.pending, changes...) r.upgradeIndex++ diff --git a/ingest/ledger_transaction_reader.go b/ingest/ledger_transaction_reader.go index 8199309944..67afde86c0 100644 --- a/ingest/ledger_transaction_reader.go +++ b/ingest/ledger_transaction_reader.go @@ -11,17 +11,25 @@ import ( "github.com/stellar/go/xdr" ) -// LedgerTransactionReader reads transactions for a given ledger sequence from a backend. -// Use NewTransactionReader to create a new instance. +// LedgerTransactionReader reads transactions for a given ledger sequence from a +// backend. Use NewTransactionReader to create a new instance. type LedgerTransactionReader struct { - ledgerCloseMeta xdr.LedgerCloseMeta - transactions []LedgerTransaction - readIdx int + lcm xdr.LedgerCloseMeta // read-only + envelopesByHash map[xdr.Hash]xdr.TransactionEnvelope // set once + + txByIdx map[int]LedgerTransaction // cache + readIdx int // tracks iteration & seeking } -// NewLedgerTransactionReader creates a new TransactionReader instance. -// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines. -func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) { +// NewLedgerTransactionReader creates a new TransactionReader instance. Note +// that TransactionReader is not thread safe and should not be shared by +// multiple goroutines. +func NewLedgerTransactionReader( + ctx context.Context, + backend ledgerbackend.LedgerBackend, + networkPassphrase string, + sequence uint32, +) (*LedgerTransactionReader, error) { ledgerCloseMeta, err := backend.GetLedger(ctx, sequence) if err != nil { return nil, errors.Wrap(err, "error getting ledger from the backend") @@ -30,11 +38,21 @@ func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.Ledge return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) } -// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader instance from xdr.LedgerCloseMeta. -// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines. -func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error) { - reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta} - if err := reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil { +// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader +// instance from xdr.LedgerCloseMeta. Note that TransactionReader is not thread +// safe and should not be shared by multiple goroutines. +func NewLedgerTransactionReaderFromLedgerCloseMeta( + networkPassphrase string, + ledgerCloseMeta xdr.LedgerCloseMeta, +) (*LedgerTransactionReader, error) { + reader := &LedgerTransactionReader{ + lcm: ledgerCloseMeta, + txByIdx: make(map[int]LedgerTransaction, ledgerCloseMeta.CountTransactions()), + envelopesByHash: make(map[xdr.Hash]xdr.TransactionEnvelope, ledgerCloseMeta.CountTransactions()), + readIdx: 0, + } + + if err := reader.storeTransactions(networkPassphrase); err != nil { return nil, errors.Wrap(err, "error extracting transactions from ledger close meta") } return reader, nil @@ -42,68 +60,87 @@ func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, led // GetSequence returns the sequence number of the ledger data stored by this object. func (reader *LedgerTransactionReader) GetSequence() uint32 { - return reader.ledgerCloseMeta.LedgerSequence() + return reader.lcm.LedgerSequence() } // GetHeader returns the XDR Header data associated with the stored ledger. func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return reader.ledgerCloseMeta.LedgerHeaderHistoryEntry() + return reader.lcm.LedgerHeaderHistoryEntry() } // Read returns the next transaction in the ledger, ordered by tx number, each time // it is called. When there are no more transactions to return, an EOF error is returned. func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { - if reader.readIdx < len(reader.transactions) { - reader.readIdx++ - return reader.transactions[reader.readIdx-1], nil + i := reader.readIdx + if i >= reader.lcm.CountTransactions() { + return LedgerTransaction{}, io.EOF + } + reader.readIdx++ + + hash := reader.lcm.TransactionHash(i) + envelope, ok := reader.envelopesByHash[hash] + if !ok { + hexHash := hex.EncodeToString(hash[:]) + return LedgerTransaction{}, errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash) + } + + // We check the version only if FeeProcessing are non empty because some backends + // (like HistoryArchiveBackend) do not return meta. + if reader.lcm.ProtocolVersion() < 10 && reader.lcm.TxApplyProcessing(i).V < 2 && + len(reader.lcm.FeeProcessing(i)) > 0 { + return LedgerTransaction{}, errors.New( + "TransactionMeta.V=2 is required in protocol version older than version 10. " + + "Please process ledgers again using the latest stellar-core version.", + ) + } + + if ledgerTx, ok := reader.txByIdx[i]; ok { + return ledgerTx, nil + } + // generate and cache if not found + ledgerTx := LedgerTransaction{ + Index: uint32(i + 1), // Transactions start at '1' + Envelope: envelope, + Result: reader.lcm.TransactionResultPair(i), + UnsafeMeta: reader.lcm.TxApplyProcessing(i), + FeeChanges: reader.lcm.FeeProcessing(i), + LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), } - return LedgerTransaction{}, io.EOF + reader.txByIdx[i] = ledgerTx + return ledgerTx, nil } // Rewind resets the reader back to the first transaction in the ledger func (reader *LedgerTransactionReader) Rewind() { - reader.readIdx = 0 + reader.Seek(0) } -// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide -// a per-transaction view of the data when Read() is called. -func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta, networkPassphrase string) error { - byHash := map[xdr.Hash]xdr.TransactionEnvelope{} - for i, tx := range lcm.TransactionEnvelopes() { +// Seek sets the reader back to a specific transaction in the ledger +func (reader *LedgerTransactionReader) Seek(index int) error { + if index >= reader.lcm.CountTransactions() || index < 0 { + return io.EOF + } + + reader.readIdx = index + return nil +} + +// storeHashes creates a mapping between hashes and envelopes in order to +// correctly provide a per-transaction view on-the-fly when Read() is called. +func (reader *LedgerTransactionReader) storeTransactions(networkPassphrase string) error { + // See https://github.com/stellar/go/pull/2720: envelopes in the meta (which + // just come straight from the agreed-upon transaction set) are not in the + // same order as the actual list of metas (which are sorted by hash), so we + // need to hash the envelopes *first* to properly associate them with their + // metas. + for i, tx := range reader.lcm.TransactionEnvelopes() { hash, err := network.HashTransactionInEnvelope(tx, networkPassphrase) if err != nil { return errors.Wrapf(err, "could not hash transaction %d in TxSet", i) } - byHash[hash] = tx + reader.envelopesByHash[xdr.Hash(hash)] = tx } - for i := 0; i < lcm.CountTransactions(); i++ { - hash := lcm.TransactionHash(i) - envelope, ok := byHash[hash] - if !ok { - hexHash := hex.EncodeToString(hash[:]) - return errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash) - } - - // We check the version only if FeeProcessing are non empty because some backends - // (like HistoryArchiveBackend) do not return meta. - if lcm.ProtocolVersion() < 10 && lcm.TxApplyProcessing(i).V < 2 && - len(lcm.FeeProcessing(i)) > 0 { - return errors.New( - "TransactionMeta.V=2 is required in protocol version older than version 10. " + - "Please process ledgers again using the latest stellar-core version.", - ) - } - - reader.transactions = append(reader.transactions, LedgerTransaction{ - Index: uint32(i + 1), // Transactions start at '1' - Envelope: envelope, - Result: lcm.TransactionResultPair(i), - UnsafeMeta: lcm.TxApplyProcessing(i), - FeeChanges: lcm.FeeProcessing(i), - LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), - }) - } return nil } @@ -111,6 +148,7 @@ func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta // helpful when there are still some transactions available so reader can stop // streaming them. func (reader *LedgerTransactionReader) Close() error { - reader.transactions = nil + reader.envelopesByHash = nil + reader.txByIdx = nil return nil }