From 73233da7c30e3058cec8e7958a93bd5dd419ba85 Mon Sep 17 00:00:00 2001 From: George Date: Tue, 16 Apr 2024 09:23:39 -0700 Subject: [PATCH] ingest: Make `LedgerTransactionReader` seekable and lazy (#5274) * Adapt LedgerTransactionReader to support seeking and on-the-fly reads * Add a test suite * Update changelog accordingly --- ingest/CHANGELOG.md | 19 ++- ingest/ledger_change_reader.go | 6 +- ingest/ledger_transaction_reader.go | 140 +++++++++++++--------- ingest/ledger_transaction_reader_test.go | 143 +++++++++++++++++++++++ 4 files changed, 249 insertions(+), 59 deletions(-) create mode 100644 ingest/ledger_transaction_reader_test.go diff --git a/ingest/CHANGELOG.md b/ingest/CHANGELOG.md index edd428a62f..1aab4ae542 100644 --- a/ingest/CHANGELOG.md +++ b/ingest/CHANGELOG.md @@ -5,14 +5,27 @@ All notable changes to this project will be documented in this file. This projec ## Unreleased -* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050) - ### New Features -* **Performance improvement**: the Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later. +* Support for Soroban and Protocol 20! +* The `LedgerTransactionReader` now has a `Seek(index int)` method to provide reading from arbitrary parts of the ledger [5274](https://github.com/stellar/go/pull/5274). +* `Change` now has a canonical stringification and a set of them is deterministically sortable. +* `NewCompactingChangeReader` will give you a wrapped `ChangeReader` that compacts the changes. +* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050). + +### Performance Improvements +* The Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later. +* There have been miscallaneous memory and processing speed improvements. ### Bug Fixes * The Stellar Core runner now parses logs from its underlying subprocess better [#3746](https://github.com/stellar/go/pull/3746). +* Ensures that the underlying Stellar Core is terminated before restarting. +* Backends will now connect with a user agent. +* Better handling of various error and restart scenarios. +### Breaking Changes +* **Captive Core is now the only available backend.** +* The Captive Core configuration should be provided via a TOML file. +* `Change.AccountSignersChanged` has been removed. ## v2.0.0 diff --git a/ingest/ledger_change_reader.go b/ingest/ledger_change_reader.go index a282a04d87..496dc98b40 100644 --- a/ingest/ledger_change_reader.go +++ b/ingest/ledger_change_reader.go @@ -176,7 +176,7 @@ func (r *LedgerChangeReader) Read() (Change, error) { } return r.Read() case evictionChangesState: - entries, err := r.ledgerCloseMeta.EvictedPersistentLedgerEntries() + entries, err := r.lcm.EvictedPersistentLedgerEntries() if err != nil { return Change{}, err } @@ -196,9 +196,9 @@ func (r *LedgerChangeReader) Read() (Change, error) { return r.Read() case upgradeChangesState: // Get upgrade changes - if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()) { + if r.upgradeIndex < len(r.LedgerTransactionReader.lcm.UpgradesProcessing()) { changes := GetChangesFromLedgerEntryChanges( - r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()[r.upgradeIndex].Changes, + r.LedgerTransactionReader.lcm.UpgradesProcessing()[r.upgradeIndex].Changes, ) r.pending = append(r.pending, changes...) r.upgradeIndex++ diff --git a/ingest/ledger_transaction_reader.go b/ingest/ledger_transaction_reader.go index 8199309944..5d2ad1d237 100644 --- a/ingest/ledger_transaction_reader.go +++ b/ingest/ledger_transaction_reader.go @@ -11,17 +11,29 @@ import ( "github.com/stellar/go/xdr" ) -// LedgerTransactionReader reads transactions for a given ledger sequence from a backend. -// Use NewTransactionReader to create a new instance. +var badMetaVersionErr = errors.New( + "TransactionMeta.V=2 is required in protocol version older than version 10. " + + "Please process ledgers again using the latest stellar-core version.", +) + +// LedgerTransactionReader reads transactions for a given ledger sequence from a +// backend. Use NewTransactionReader to create a new instance. type LedgerTransactionReader struct { - ledgerCloseMeta xdr.LedgerCloseMeta - transactions []LedgerTransaction - readIdx int + lcm xdr.LedgerCloseMeta // read-only + envelopesByHash map[xdr.Hash]xdr.TransactionEnvelope // set once + + readIdx int // tracks iteration & seeking } -// NewLedgerTransactionReader creates a new TransactionReader instance. -// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines. -func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) { +// NewLedgerTransactionReader creates a new TransactionReader instance. Note +// that TransactionReader is not thread safe and should not be shared by +// multiple goroutines. +func NewLedgerTransactionReader( + ctx context.Context, + backend ledgerbackend.LedgerBackend, + networkPassphrase string, + sequence uint32, +) (*LedgerTransactionReader, error) { ledgerCloseMeta, err := backend.GetLedger(ctx, sequence) if err != nil { return nil, errors.Wrap(err, "error getting ledger from the backend") @@ -30,11 +42,20 @@ func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.Ledge return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) } -// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader instance from xdr.LedgerCloseMeta. -// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines. -func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error) { - reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta} - if err := reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil { +// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader +// instance from xdr.LedgerCloseMeta. Note that TransactionReader is not thread +// safe and should not be shared by multiple goroutines. +func NewLedgerTransactionReaderFromLedgerCloseMeta( + networkPassphrase string, + ledgerCloseMeta xdr.LedgerCloseMeta, +) (*LedgerTransactionReader, error) { + reader := &LedgerTransactionReader{ + lcm: ledgerCloseMeta, + envelopesByHash: make(map[xdr.Hash]xdr.TransactionEnvelope, ledgerCloseMeta.CountTransactions()), + readIdx: 0, + } + + if err := reader.storeTransactions(networkPassphrase); err != nil { return nil, errors.Wrap(err, "error extracting transactions from ledger close meta") } return reader, nil @@ -42,68 +63,81 @@ func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, led // GetSequence returns the sequence number of the ledger data stored by this object. func (reader *LedgerTransactionReader) GetSequence() uint32 { - return reader.ledgerCloseMeta.LedgerSequence() + return reader.lcm.LedgerSequence() } // GetHeader returns the XDR Header data associated with the stored ledger. func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return reader.ledgerCloseMeta.LedgerHeaderHistoryEntry() + return reader.lcm.LedgerHeaderHistoryEntry() } // Read returns the next transaction in the ledger, ordered by tx number, each time // it is called. When there are no more transactions to return, an EOF error is returned. func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { - if reader.readIdx < len(reader.transactions) { - reader.readIdx++ - return reader.transactions[reader.readIdx-1], nil + if reader.readIdx >= reader.lcm.CountTransactions() { + return LedgerTransaction{}, io.EOF } - return LedgerTransaction{}, io.EOF + i := reader.readIdx + reader.readIdx++ // next read will advance even on error + + hash := reader.lcm.TransactionHash(i) + envelope, ok := reader.envelopesByHash[hash] + if !ok { + hexHash := hex.EncodeToString(hash[:]) + return LedgerTransaction{}, errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash) + } + + return LedgerTransaction{ + Index: uint32(i + 1), // Transactions start at '1' + Envelope: envelope, + Result: reader.lcm.TransactionResultPair(i), + UnsafeMeta: reader.lcm.TxApplyProcessing(i), + FeeChanges: reader.lcm.FeeProcessing(i), + LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), + }, nil } // Rewind resets the reader back to the first transaction in the ledger func (reader *LedgerTransactionReader) Rewind() { - reader.readIdx = 0 + reader.Seek(0) +} + +// Seek sets the reader back to a specific transaction in the ledger +func (reader *LedgerTransactionReader) Seek(index int) error { + if index >= reader.lcm.CountTransactions() || index < 0 { + return io.EOF + } + + reader.readIdx = index + return nil } -// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide -// a per-transaction view of the data when Read() is called. -func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta, networkPassphrase string) error { - byHash := map[xdr.Hash]xdr.TransactionEnvelope{} - for i, tx := range lcm.TransactionEnvelopes() { +// storeHashes creates a mapping between hashes and envelopes in order to +// correctly provide a per-transaction view on-the-fly when Read() is called. +func (reader *LedgerTransactionReader) storeTransactions(networkPassphrase string) error { + // See https://github.com/stellar/go/pull/2720: envelopes in the meta (which + // just come straight from the agreed-upon transaction set) are not in the + // same order as the actual list of metas (which are sorted by hash), so we + // need to hash the envelopes *first* to properly associate them with their + // metas. + for i, tx := range reader.lcm.TransactionEnvelopes() { hash, err := network.HashTransactionInEnvelope(tx, networkPassphrase) if err != nil { return errors.Wrapf(err, "could not hash transaction %d in TxSet", i) } - byHash[hash] = tx - } + reader.envelopesByHash[xdr.Hash(hash)] = tx - for i := 0; i < lcm.CountTransactions(); i++ { - hash := lcm.TransactionHash(i) - envelope, ok := byHash[hash] - if !ok { - hexHash := hex.EncodeToString(hash[:]) - return errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash) + // We check the version only if FeeProcessing is non-empty, because some + // backends (like HistoryArchiveBackend) do not return meta. + // + // Note that the ordering differences are irrelevant here because all we + // care about is checking every meta for this condition. + if reader.lcm.ProtocolVersion() < 10 && reader.lcm.TxApplyProcessing(i).V < 2 && + len(reader.lcm.FeeProcessing(i)) > 0 { + return badMetaVersionErr } - - // We check the version only if FeeProcessing are non empty because some backends - // (like HistoryArchiveBackend) do not return meta. - if lcm.ProtocolVersion() < 10 && lcm.TxApplyProcessing(i).V < 2 && - len(lcm.FeeProcessing(i)) > 0 { - return errors.New( - "TransactionMeta.V=2 is required in protocol version older than version 10. " + - "Please process ledgers again using the latest stellar-core version.", - ) - } - - reader.transactions = append(reader.transactions, LedgerTransaction{ - Index: uint32(i + 1), // Transactions start at '1' - Envelope: envelope, - Result: lcm.TransactionResultPair(i), - UnsafeMeta: lcm.TxApplyProcessing(i), - FeeChanges: lcm.FeeProcessing(i), - LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), - }) } + return nil } @@ -111,6 +145,6 @@ func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta // helpful when there are still some transactions available so reader can stop // streaming them. func (reader *LedgerTransactionReader) Close() error { - reader.transactions = nil + reader.envelopesByHash = nil return nil } diff --git a/ingest/ledger_transaction_reader_test.go b/ingest/ledger_transaction_reader_test.go new file mode 100644 index 0000000000..041f6a9aae --- /dev/null +++ b/ingest/ledger_transaction_reader_test.go @@ -0,0 +1,143 @@ +package ingest + +import ( + "io" + "testing" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/network" + "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + passphrase = network.TestNetworkPassphrase + // Test prep: + // - two different envelopes which resolve to two different hashes + // - two basically-empty metas that contain the corresponding hashes + // - a ledger that has 5 txs with metas corresponding to these two envs + // - specifically, in the order [first, first, second, second, second] + // + // This tests both hash <--> envelope mapping and indexed iteration. + txEnvs, txHashes, txMetas = makeTransactions(5) + // barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs + ledgerCloseMeta = xdr.LedgerCloseMeta{V: 1, + V1: &xdr.LedgerCloseMetaV1{ + TxProcessing: txMetas, + TxSet: xdr.GeneralizedTransactionSet{V: 1, + V1TxSet: &xdr.TransactionSetV1{ + Phases: []xdr.TransactionPhase{{V: 0, + V0Components: &[]xdr.TxSetComponent{{ + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: txEnvs, + }}, + }, + }}, + }, + }, + }, + } +) + +func TestTransactionReader(t *testing.T) { + s := set.NewSet[xdr.Hash](5) + for _, hash := range txHashes { + s.Add(hash) + } + require.Lenf(t, s, len(txHashes), "precondition: hashes aren't unique, envs: %+v", txEnvs) + + // simplest case: read from start + + reader, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) + require.NoError(t, err) + + for i := 0; i < 5; i++ { + tx, ierr := reader.Read() + require.NoError(t, ierr) + assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope) + assert.Equal(t, txHashes[tx.Index-1], thisHash) + } + _, err = reader.Read() + require.ErrorIs(t, err, io.EOF) + + // start reading from the middle set of txs + + require.NoError(t, reader.Seek(2)) + for i := 0; i < 3; i++ { + tx, ierr := reader.Read() + require.NoError(t, ierr) + assert.EqualValues(t, + /* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */ + 1+(i+2)%5, + tx.Index, + "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope) + assert.Equal(t, txHashes[tx.Index-1], thisHash) + } + _, err = reader.Read() + require.ErrorIs(t, err, io.EOF) + + // edge case: start from the last tx + require.NoError(t, reader.Seek(4)) + tx, ierr := reader.Read() + require.NoError(t, ierr) + assert.EqualValues(t, 5, tx.Index) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + assert.Equal(t, txEnvs[4], tx.Envelope) + assert.Equal(t, txHashes[4], thisHash) + _, err = reader.Read() + require.ErrorIs(t, err, io.EOF) + + // error case: too far or too close + for _, idx := range []int{-1, 5, 6} { + rdr, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta) + require.NoError(t, err) + require.Error(t, rdr.Seek(idx), "no error when trying seek=%d", idx) + } +} + +func makeTransactions(count int) ( + envs []xdr.TransactionEnvelope, + hashes [][32]byte, + metas []xdr.TransactionResultMeta, +) { + seqNum := 123_456 + for i := 0; i < count; i++ { + txEnv := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Ext: xdr.TransactionExt{V: 0}, + SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), + Operations: []xdr.Operation{}, + Fee: xdr.Uint32(seqNum + i), + SeqNum: xdr.SequenceNumber(seqNum + i), + }, + Signatures: []xdr.DecoratedSignature{}, + }, + } + + txHash, _ := network.HashTransactionInEnvelope(txEnv, passphrase) + txMeta := xdr.TransactionResultMeta{ + Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash)}, + TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, + } + + envs = append(envs, txEnv) + hashes = append(hashes, txHash) + metas = append(metas, txMeta) + } + + return +}