Skip to content

Commit

Permalink
Add comments plus hash-based envelope lookup mapping
Browse files Browse the repository at this point in the history
Halve memory usage by keeping refs to envelopes, instead
  • Loading branch information
Shaptic committed Apr 15, 2024
1 parent 7140855 commit 19d5f27
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 80 deletions.
88 changes: 60 additions & 28 deletions ingest/lazy_transaction_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingest
import (
"io"

"github.com/stellar/go/network"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand All @@ -12,18 +13,27 @@ import (
// structures are only created when you actually request a read for that
// particular index.
type LazyTransactionReader struct {
lcm xdr.LedgerCloseMeta
start int // read-only

transactions []LedgerTransaction // cached for Rewind() calls
lastRead int // cycles through ^
lcm xdr.LedgerCloseMeta
start int // read-only
passphrase string // read-only

// we keep a mapping of hashes to envelope refs in the ledger meta, this is
// fine since both have the same scope and we don't want to unnecessarily
// keep two copies
envelopesByHash map[xdr.Hash]*xdr.TransactionEnvelope
transactions []LedgerTransaction // cached for Rewind() calls
lastRead int // cycles through ^
}

// NewLazyTransactionReader creates a new reader instance from raw
// xdr.LedgerCloseMeta starting at a particular transaction index. Note that
// LazyTransactionReader is not thread safe and should not be shared by multiple
// goroutines.
func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) (*LazyTransactionReader, error) {
// xdr.LedgerCloseMeta starting at a particular transaction index (0-based).
// Note that LazyTransactionReader is not thread safe and should not be shared
// by multiple goroutines.
func NewLazyTransactionReader(
ledgerCloseMeta xdr.LedgerCloseMeta,
passphrase string,
start int,
) (*LazyTransactionReader, error) {
if start >= ledgerCloseMeta.CountTransactions() || start < 0 {
return nil, errors.New("'start' index exceeds ledger transaction count")
}
Expand All @@ -32,11 +42,30 @@ func NewLazyTransactionReader(ledgerCloseMeta xdr.LedgerCloseMeta, start int) (*
return nil, errors.New("LazyTransactionReader only works from Protocol 20 onward")
}

return &LazyTransactionReader{
lcm: ledgerCloseMeta,
start: start,
lastRead: -1, // haven't started yet
}, nil
lazy := &LazyTransactionReader{
lcm: ledgerCloseMeta,
passphrase: passphrase,
start: start,
lastRead: -1, // haven't started yet

envelopesByHash: make(
map[xdr.Hash]*xdr.TransactionEnvelope,
ledgerCloseMeta.CountTransactions(),
),
}

// See https://github.com/stellar/go/pull/2720: envelopes in the meta (which
// just come straight from the agreed-upon transaction set) are not in the
// same order as the actual list of metas (which are sorted by hash), so we
// need to hash the envelopes *first* to properly associate them with their
// metas.
for _, txEnv := range ledgerCloseMeta.TransactionEnvelopes() {
// we know that these are proper envelopes so errors aren't possible
hash, _ := network.HashTransactionInEnvelope(txEnv, passphrase)
lazy.envelopesByHash[xdr.Hash(hash)] = &txEnv
}

return lazy, nil
}

// GetSequence returns the sequence number of the ledger data stored by this object.
Expand Down Expand Up @@ -66,36 +95,39 @@ func (reader *LazyTransactionReader) Read() (LedgerTransaction, error) {
}
}

lcm := reader.lcm
// if it doesn't exist we're in BIG trouble elsewhere anyway
envelope := reader.envelopesByHash[reader.lcm.TransactionHash(i)]
reader.lastRead = i
envelope := lcm.TransactionEnvelopes()[i]
cachedIdx := (i - reader.start + txCount /* to fix negatives */) % txCount

// Caching begins from `start`, so we need to properly offset into the
// cached array to correlate the actual transaction index, which is by doing
// `i-start`. We also have to have +txCount to fix negative offsets: mod (%)
// in Go does not make it positive.
cachedIdx := (i - reader.start + txCount) % txCount
if cachedIdx < len(reader.transactions) { // cached? return immediately
return reader.transactions[cachedIdx], nil
}

newTx := LedgerTransaction{
Index: uint32(i + 1), // Transactions start at '1'
Envelope: envelope,
Result: lcm.TransactionResultPair(i),
UnsafeMeta: lcm.TxApplyProcessing(i),
FeeChanges: lcm.FeeProcessing(i),
LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
Envelope: *envelope,
Result: reader.lcm.TransactionResultPair(i),
UnsafeMeta: reader.lcm.TxApplyProcessing(i),
FeeChanges: reader.lcm.FeeProcessing(i),
LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
}
reader.transactions = append(reader.transactions, newTx)
return newTx, nil
}

// Rewind resets the reader back to the first transaction in the ledger
// Rewind resets the reader back to the first transaction in the ledger,
// or to `start` if you did not initialize the instance with 0.
func (reader *LazyTransactionReader) Rewind() {
reader.lastRead = -1
}

// Close should be called when reading is finished. This is especially
// helpful when there are still some transactions available so reader can stop
// streaming them.
func (reader *LazyTransactionReader) Close() error {
// Close should be called when reading is finished to clean up memory.
func (reader *LazyTransactionReader) Close() {
reader.Rewind()
reader.transactions = nil
return nil
}
175 changes: 123 additions & 52 deletions ingest/lazy_transaction_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,96 +4,167 @@ import (
"io"
"testing"

"github.com/stellar/go/network"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var txMeta = xdr.TransactionResultMeta{
TxApplyProcessing: xdr.TransactionMeta{
V: 3,
V3: &xdr.TransactionMetaV3{},
},
}
var txEnv = xdr.TransactionEnvelope{
Type: xdr.EnvelopeTypeEnvelopeTypeTx,
V1: &xdr.TransactionV1Envelope{
Tx: xdr.Transaction{},
},
}

// barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs
var ledgerCloseMeta = xdr.LedgerCloseMeta{
V: 1,
V1: &xdr.LedgerCloseMetaV1{
Ext: xdr.ExtensionPoint{V: 0},
TxProcessing: []xdr.TransactionResultMeta{
txMeta,
txMeta,
txMeta,
txMeta,
txMeta,
var (
passphrase = network.TestNetworkPassphrase
// Test prep:
// - two different envelopes which resolve to two different hashes
// - two basically-empty metas that contain the corresponding hashes
// - a ledger that has 5 txs with metas corresponding to these two envs
// - specifically, in the order [first, first, second, second, second]
//
// This tests both hash <--> envelope mapping and indexed iteration.
txEnv1 = xdr.TransactionEnvelope{
Type: xdr.EnvelopeTypeEnvelopeTypeTx,
V1: &xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
Ext: xdr.TransactionExt{V: 0},
SourceAccount: xdr.MustMuxedAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Operations: []xdr.Operation{},
Fee: 123,
SeqNum: 0,
},
Signatures: []xdr.DecoratedSignature{},
},
TxSet: xdr.GeneralizedTransactionSet{
V: 1,
V1TxSet: &xdr.TransactionSetV1{
Phases: []xdr.TransactionPhase{{
V: 0,
V0Components: &[]xdr.TxSetComponent{{
TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{
Txs: []xdr.TransactionEnvelope{
txEnv,
txEnv,
txEnv,
txEnv,
txEnv,
}
txEnv2 = xdr.TransactionEnvelope{
Type: xdr.EnvelopeTypeEnvelopeTypeTx,
V1: &xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
Ext: xdr.TransactionExt{V: 0},
SourceAccount: xdr.MustMuxedAddress("GCO26ZSBD63TKYX45H2C7D2WOFWOUSG5BMTNC3BG4QMXM3PAYI6WHKVZ"),
Operations: []xdr.Operation{},
Fee: 456,
SeqNum: 0,
},
Signatures: []xdr.DecoratedSignature{},
},
}
txHash1, _ = network.HashTransactionInEnvelope(txEnv1, passphrase)
txHash2, _ = network.HashTransactionInEnvelope(txEnv2, passphrase)
txMeta1 = xdr.TransactionResultMeta{
Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash1)},
TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}},
}
txMeta2 = xdr.TransactionResultMeta{
Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash2)},
TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}},
}
// barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs
ledgerCloseMeta = xdr.LedgerCloseMeta{
V: 1,
V1: &xdr.LedgerCloseMetaV1{
Ext: xdr.ExtensionPoint{V: 0},
TxProcessing: []xdr.TransactionResultMeta{
txMeta1,
txMeta1,
txMeta2,
txMeta2,
txMeta2,
},
TxSet: xdr.GeneralizedTransactionSet{
V: 1,
V1TxSet: &xdr.TransactionSetV1{
Phases: []xdr.TransactionPhase{{
V: 0,
V0Components: &[]xdr.TxSetComponent{{
TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{
Txs: []xdr.TransactionEnvelope{
txEnv1,
txEnv1,
txEnv2,
txEnv2,
txEnv2,
},
},
},
}},
}},
}},
},
},
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{LedgerVersion: 20},
},
},
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{LedgerVersion: 20},
},
},
}
}
)

func TestLazyTransactionReader(t *testing.T) {
require.True(t, true)
require.NotEqual(t,
txHash1, txHash2,
"precondition of different hashes violated: env1=%+v, env2=%+v",
txEnv1, txEnv2)

// simplest case: read from start

fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, 0)
fromZero, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 0)
require.NoError(t, err)
for i := 0; i < 5; i++ {
tx, ierr := fromZero.Read()
require.NoError(t, ierr)
assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
if tx.Index >= 3 {
assert.Equal(t, txEnv2, tx.Envelope)
assert.Equal(t, txHash2, thisHash)
} else {
assert.Equal(t, txEnv1, tx.Envelope)
assert.Equal(t, txHash1, thisHash)
}
}
_, err = fromZero.Read()
require.ErrorIs(t, err, io.EOF)

// start reading from the middle set of txs

fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, 2)
fromMiddle, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 2)
require.NoError(t, err)
for i := 0; i < 5; i++ {
tx, ierr := fromMiddle.Read()
require.NoError(t, ierr)
assert.EqualValues(t, 1+((i+2)%5), tx.Index, "iteration i=%d", i)
assert.EqualValues(t,
/* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */
1+(i+2)%5,
tx.Index,
"iteration i=%d", i)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
if tx.Index >= 3 {
assert.Equal(t, txEnv2, tx.Envelope)
assert.Equal(t, txHash2, thisHash)
} else {
assert.Equal(t, txEnv1, tx.Envelope)
assert.Equal(t, txHash1, thisHash)
}
}
_, err = fromMiddle.Read()
require.ErrorIs(t, err, io.EOF)

// edge case: start from the last tx

fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, 4)
fromEnd, err := NewLazyTransactionReader(ledgerCloseMeta, passphrase, 4)
require.NoError(t, err)
for i := 0; i < 5; i++ {
tx, ierr := fromEnd.Read()
require.NoError(t, ierr)
assert.EqualValues(t, 1+((i+4)%5), tx.Index, "iteration i=%d", i)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
if tx.Index >= 3 {
assert.Equal(t, txEnv2, tx.Envelope)
assert.Equal(t, txHash2, thisHash)
} else {
assert.Equal(t, txEnv1, tx.Envelope)
assert.Equal(t, txHash1, thisHash)
}
}
_, err = fromEnd.Read()
require.ErrorIs(t, err, io.EOF)
Expand All @@ -110,8 +181,8 @@ func TestLazyTransactionReader(t *testing.T) {
require.ErrorIs(t, err, io.EOF)

// error case: too far or too close
for i := range []int{-1, 5, 6} {
_, err = NewLazyTransactionReader(ledgerCloseMeta, i)
require.Error(t, err)
for _, idx := range []int{-1, 5, 6} {
_, err = NewLazyTransactionReader(ledgerCloseMeta, passphrase, idx)
require.Error(t, err, "no error when trying start=%d", idx)
}
}

0 comments on commit 19d5f27

Please sign in to comment.