From 19d5f27e864b7ebc9b81568c5ca991a77486d2e5 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 11 Apr 2024 12:51:01 -0700 Subject: [PATCH] Add comments plus hash-based envelope lookup mapping Halve memory usage by keeping refs to envelopes, instead --- ingest/lazy_transaction_reader.go | 88 +++++++++---- ingest/lazy_transaction_reader_test.go | 175 +++++++++++++++++-------- 2 files changed, 183 insertions(+), 80 deletions(-) diff --git a/ingest/lazy_transaction_reader.go b/ingest/lazy_transaction_reader.go index 201785b3dd..262d7cf186 100644 --- a/ingest/lazy_transaction_reader.go +++ b/ingest/lazy_transaction_reader.go @@ -3,6 +3,7 @@ package ingest import ( "io" + "github.com/stellar/go/network" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -12,18 +13,27 @@ import ( // structures are only created when you actually request a read for that // particular index. type LazyTransactionReader struct { - lcm xdr.LedgerCloseMeta - start int // read-only - - transactions []LedgerTransaction // cached for Rewind() calls - lastRead int // cycles through ^ + lcm xdr.LedgerCloseMeta + start int // read-only + passphrase string // read-only + + // we keep a mapping of hashes to envelope refs in the ledger meta, this is + // fine since both have the same scope and we don't want to unnecessarily + // keep two copies + envelopesByHash map[xdr.Hash]*xdr.TransactionEnvelope + transactions []LedgerTransaction // cached for Rewind() calls + lastRead int // cycles through ^ } // NewLazyTransactionReader creates a new reader instance from raw -// xdr.LedgerCloseMeta starting at a particular transaction index. Note that -// LazyTransactionReader is not thread safe and should not be shared by multiple -// goroutines. -func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) (*LazyTransactionReader, error) { +// xdr.LedgerCloseMeta starting at a particular transaction index (0-based). +// Note that LazyTransactionReader is not thread safe and should not be shared +// by multiple goroutines. +func NewLazyTransactionReader( + ledgerCloseMeta xdr.LedgerCloseMeta, + passphrase string, + start int, +) (*LazyTransactionReader, error) { if start >= ledgerCloseMeta.CountTransactions() || start < 0 { return nil, errors.New("'start' index exceeds ledger transaction count") } @@ -32,11 +42,30 @@ func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) (* return nil, errors.New("LazyTransactionReader only works from Protocol 20 onward") } - return &LazyTransactionReader{ - lcm: ledgerCloseMeta, - start: start, - lastRead: -1, // haven't started yet - }, nil + lazy := &LazyTransactionReader{ + lcm: ledgerCloseMeta, + passphrase: passphrase, + start: start, + lastRead: -1, // haven't started yet + + envelopesByHash: make( + map[xdr.Hash]*xdr.TransactionEnvelope, + ledgerCloseMeta.CountTransactions(), + ), + } + + // See https://github.com/stellar/go/pull/2720: envelopes in the meta (which + // just come straight from the agreed-upon transaction set) are not in the + // same order as the actual list of metas (which are sorted by hash), so we + // need to hash the envelopes *first* to properly associate them with their + // metas. + for _, txEnv := range ledgerCloseMeta.TransactionEnvelopes() { + // we know that these are proper envelopes so errors aren't possible + hash, _ := network.HashTransactionInEnvelope(txEnv, passphrase) + lazy.envelopesByHash[xdr.Hash(hash)] = &txEnv + } + + return lazy, nil } // GetSequence returns the sequence number of the ledger data stored by this object. @@ -66,36 +95,39 @@ func (reader *LazyTransactionReader) Read() (LedgerTransaction, error) { } } - lcm := reader.lcm + // if it doesn't exist we're in BIG trouble elsewhere anyway + envelope := reader.envelopesByHash[reader.lcm.TransactionHash(i)] reader.lastRead = i - envelope := lcm.TransactionEnvelopes()[i] - cachedIdx := (i - reader.start + txCount /* to fix negatives */) % txCount + // Caching begins from `start`, so we need to properly offset into the + // cached array to correlate the actual transaction index, which is by doing + // `i-start`. We also have to have +txCount to fix negative offsets: mod (%) + // in Go does not make it positive. + cachedIdx := (i - reader.start + txCount) % txCount if cachedIdx < len(reader.transactions) { // cached? return immediately return reader.transactions[cachedIdx], nil } newTx := LedgerTransaction{ Index: uint32(i + 1), // Transactions start at '1' - Envelope: envelope, - Result: lcm.TransactionResultPair(i), - UnsafeMeta: lcm.TxApplyProcessing(i), - FeeChanges: lcm.FeeProcessing(i), - LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), + Envelope: *envelope, + Result: reader.lcm.TransactionResultPair(i), + UnsafeMeta: reader.lcm.TxApplyProcessing(i), + FeeChanges: reader.lcm.FeeProcessing(i), + LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion), } reader.transactions = append(reader.transactions, newTx) return newTx, nil } -// Rewind resets the reader back to the first transaction in the ledger +// Rewind resets the reader back to the first transaction in the ledger, +// or to `start` if you did not initialize the instance with 0. func (reader *LazyTransactionReader) Rewind() { reader.lastRead = -1 } -// Close should be called when reading is finished. This is especially -// helpful when there are still some transactions available so reader can stop -// streaming them. -func (reader *LazyTransactionReader) Close() error { +// Close should be called when reading is finished to clean up memory. +func (reader *LazyTransactionReader) Close() { + reader.Rewind() reader.transactions = nil - return nil } diff --git a/ingest/lazy_transaction_reader_test.go b/ingest/lazy_transaction_reader_test.go index 3b8c488814..596116b17a 100644 --- a/ingest/lazy_transaction_reader_test.go +++ b/ingest/lazy_transaction_reader_test.go @@ -4,96 +4,167 @@ import ( "io" "testing" + "github.com/stellar/go/network" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -var txMeta = xdr.TransactionResultMeta{ - TxApplyProcessing: xdr.TransactionMeta{ - V: 3, - V3: &xdr.TransactionMetaV3{}, - }, -} -var txEnv = xdr.TransactionEnvelope{ - Type: xdr.EnvelopeTypeEnvelopeTypeTx, - V1: &xdr.TransactionV1Envelope{ - Tx: xdr.Transaction{}, - }, -} - -// barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs -var ledgerCloseMeta = xdr.LedgerCloseMeta{ - V: 1, - V1: &xdr.LedgerCloseMetaV1{ - Ext: xdr.ExtensionPoint{V: 0}, - TxProcessing: []xdr.TransactionResultMeta{ - txMeta, - txMeta, - txMeta, - txMeta, - txMeta, +var ( + passphrase = network.TestNetworkPassphrase + // Test prep: + // - two different envelopes which resolve to two different hashes + // - two basically-empty metas that contain the corresponding hashes + // - a ledger that has 5 txs with metas corresponding to these two envs + // - specifically, in the order [first, first, second, second, second] + // + // This tests both hash <--> envelope mapping and indexed iteration. + txEnv1 = xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Ext: xdr.TransactionExt{V: 0}, + SourceAccount: xdr.MustMuxedAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Operations: []xdr.Operation{}, + Fee: 123, + SeqNum: 0, + }, + Signatures: []xdr.DecoratedSignature{}, }, - TxSet: xdr.GeneralizedTransactionSet{ - V: 1, - V1TxSet: &xdr.TransactionSetV1{ - Phases: []xdr.TransactionPhase{{ - V: 0, - V0Components: &[]xdr.TxSetComponent{{ - TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ - Txs: []xdr.TransactionEnvelope{ - txEnv, - txEnv, - txEnv, - txEnv, - txEnv, + } + txEnv2 = xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Ext: xdr.TransactionExt{V: 0}, + SourceAccount: xdr.MustMuxedAddress("GCO26ZSBD63TKYX45H2C7D2WOFWOUSG5BMTNC3BG4QMXM3PAYI6WHKVZ"), + Operations: []xdr.Operation{}, + Fee: 456, + SeqNum: 0, + }, + Signatures: []xdr.DecoratedSignature{}, + }, + } + txHash1, _ = network.HashTransactionInEnvelope(txEnv1, passphrase) + txHash2, _ = network.HashTransactionInEnvelope(txEnv2, passphrase) + txMeta1 = xdr.TransactionResultMeta{ + Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash1)}, + TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, + } + txMeta2 = xdr.TransactionResultMeta{ + Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash2)}, + TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}}, + } + // barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs + ledgerCloseMeta = xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + Ext: xdr.ExtensionPoint{V: 0}, + TxProcessing: []xdr.TransactionResultMeta{ + txMeta1, + txMeta1, + txMeta2, + txMeta2, + txMeta2, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{ + Phases: []xdr.TransactionPhase{{ + V: 0, + V0Components: &[]xdr.TxSetComponent{{ + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{ + txEnv1, + txEnv1, + txEnv2, + txEnv2, + txEnv2, + }, }, - }, + }}, }}, - }}, + }, + }, + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{LedgerVersion: 20}, }, }, - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{LedgerVersion: 20}, - }, - }, -} + } +) func TestLazyTransactionReader(t *testing.T) { - require.True(t, true) + require.NotEqual(t, + txHash1, txHash2, + "precondition of different hashes violated: env1=%+v, env2=%+v", + txEnv1, txEnv2) // simplest case: read from start - fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, 0) + fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 0) require.NoError(t, err) for i := 0; i < 5; i++ { tx, ierr := fromZero.Read() require.NoError(t, ierr) assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + if tx.Index >= 3 { + assert.Equal(t, txEnv2, tx.Envelope) + assert.Equal(t, txHash2, thisHash) + } else { + assert.Equal(t, txEnv1, tx.Envelope) + assert.Equal(t, txHash1, thisHash) + } } _, err = fromZero.Read() require.ErrorIs(t, err, io.EOF) // start reading from the middle set of txs - fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, 2) + fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 2) require.NoError(t, err) for i := 0; i < 5; i++ { tx, ierr := fromMiddle.Read() require.NoError(t, ierr) - assert.EqualValues(t, 1+((i+2)%5), tx.Index, "iteration i=%d", i) + assert.EqualValues(t, + /* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */ + 1+(i+2)%5, + tx.Index, + "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + if tx.Index >= 3 { + assert.Equal(t, txEnv2, tx.Envelope) + assert.Equal(t, txHash2, thisHash) + } else { + assert.Equal(t, txEnv1, tx.Envelope) + assert.Equal(t, txHash1, thisHash) + } } _, err = fromMiddle.Read() require.ErrorIs(t, err, io.EOF) // edge case: start from the last tx - fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, 4) + fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 4) require.NoError(t, err) for i := 0; i < 5; i++ { tx, ierr := fromEnd.Read() require.NoError(t, ierr) assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i) + + thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase) + require.NoError(t, ierr) + if tx.Index >= 3 { + assert.Equal(t, txEnv2, tx.Envelope) + assert.Equal(t, txHash2, thisHash) + } else { + assert.Equal(t, txEnv1, tx.Envelope) + assert.Equal(t, txHash1, thisHash) + } } _, err = fromEnd.Read() require.ErrorIs(t, err, io.EOF) @@ -110,8 +181,8 @@ func TestLazyTransactionReader(t *testing.T) { require.ErrorIs(t, err, io.EOF) // error case: too far or too close - for i := range []int{-1, 5, 6} { - _, err = NewLazyTransactionReader(ledgerCloseMeta, i) - require.Error(t, err) + for _, idx := range []int{-1, 5, 6} { + _, err = NewLazyTransactionReader(ledgerCloseMeta, passphrase, idx) + require.Error(t, err, "no error when trying start=%d", idx) } }