Skip to content

Commit

Permalink
ingest: Make LedgerTransactionReader seekable and lazy (#5274)
Browse files Browse the repository at this point in the history
* Adapt LedgerTransactionReader to support seeking and on-the-fly reads
* Add a test suite
* Update changelog accordingly
  • Loading branch information
Shaptic authored Apr 16, 2024
1 parent 927ab6b commit 73233da
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 59 deletions.
19 changes: 16 additions & 3 deletions ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,27 @@ All notable changes to this project will be documented in this file. This projec

## Unreleased

* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050)

### New Features
* **Performance improvement**: the Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later.
* Support for Soroban and Protocol 20!
* The `LedgerTransactionReader` now has a `Seek(index int)` method to provide reading from arbitrary parts of the ledger [5274](https://github.com/stellar/go/pull/5274).
* `Change` now has a canonical stringification and a set of them is deterministically sortable.
* `NewCompactingChangeReader` will give you a wrapped `ChangeReader` that compacts the changes.
* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050).

### Performance Improvements
* The Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later.
* There have been miscallaneous memory and processing speed improvements.

### Bug Fixes
* The Stellar Core runner now parses logs from its underlying subprocess better [#3746](https://github.com/stellar/go/pull/3746).
* Ensures that the underlying Stellar Core is terminated before restarting.
* Backends will now connect with a user agent.
* Better handling of various error and restart scenarios.

### Breaking Changes
* **Captive Core is now the only available backend.**
* The Captive Core configuration should be provided via a TOML file.
* `Change.AccountSignersChanged` has been removed.

## v2.0.0

Expand Down
6 changes: 3 additions & 3 deletions ingest/ledger_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (r *LedgerChangeReader) Read() (Change, error) {
}
return r.Read()
case evictionChangesState:
entries, err := r.ledgerCloseMeta.EvictedPersistentLedgerEntries()
entries, err := r.lcm.EvictedPersistentLedgerEntries()
if err != nil {
return Change{}, err
}
Expand All @@ -196,9 +196,9 @@ func (r *LedgerChangeReader) Read() (Change, error) {
return r.Read()
case upgradeChangesState:
// Get upgrade changes
if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()) {
if r.upgradeIndex < len(r.LedgerTransactionReader.lcm.UpgradesProcessing()) {
changes := GetChangesFromLedgerEntryChanges(
r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()[r.upgradeIndex].Changes,
r.LedgerTransactionReader.lcm.UpgradesProcessing()[r.upgradeIndex].Changes,
)
r.pending = append(r.pending, changes...)
r.upgradeIndex++
Expand Down
140 changes: 87 additions & 53 deletions ingest/ledger_transaction_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,29 @@ import (
"github.com/stellar/go/xdr"
)

// LedgerTransactionReader reads transactions for a given ledger sequence from a backend.
// Use NewTransactionReader to create a new instance.
var badMetaVersionErr = errors.New(
"TransactionMeta.V=2 is required in protocol version older than version 10. " +
"Please process ledgers again using the latest stellar-core version.",
)

// LedgerTransactionReader reads transactions for a given ledger sequence from a
// backend. Use NewTransactionReader to create a new instance.
type LedgerTransactionReader struct {
ledgerCloseMeta xdr.LedgerCloseMeta
transactions []LedgerTransaction
readIdx int
lcm xdr.LedgerCloseMeta // read-only
envelopesByHash map[xdr.Hash]xdr.TransactionEnvelope // set once

readIdx int // tracks iteration & seeking
}

// NewLedgerTransactionReader creates a new TransactionReader instance.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) {
// NewLedgerTransactionReader creates a new TransactionReader instance. Note
// that TransactionReader is not thread safe and should not be shared by
// multiple goroutines.
func NewLedgerTransactionReader(
ctx context.Context,
backend ledgerbackend.LedgerBackend,
networkPassphrase string,
sequence uint32,
) (*LedgerTransactionReader, error) {
ledgerCloseMeta, err := backend.GetLedger(ctx, sequence)
if err != nil {
return nil, errors.Wrap(err, "error getting ledger from the backend")
Expand All @@ -30,87 +42,109 @@ func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.Ledge
return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta)
}

// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader instance from xdr.LedgerCloseMeta.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error) {
reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta}
if err := reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil {
// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader
// instance from xdr.LedgerCloseMeta. Note that TransactionReader is not thread
// safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta(
networkPassphrase string,
ledgerCloseMeta xdr.LedgerCloseMeta,
) (*LedgerTransactionReader, error) {
reader := &LedgerTransactionReader{
lcm: ledgerCloseMeta,
envelopesByHash: make(map[xdr.Hash]xdr.TransactionEnvelope, ledgerCloseMeta.CountTransactions()),
readIdx: 0,
}

if err := reader.storeTransactions(networkPassphrase); err != nil {
return nil, errors.Wrap(err, "error extracting transactions from ledger close meta")
}
return reader, nil
}

// GetSequence returns the sequence number of the ledger data stored by this object.
func (reader *LedgerTransactionReader) GetSequence() uint32 {
return reader.ledgerCloseMeta.LedgerSequence()
return reader.lcm.LedgerSequence()
}

// GetHeader returns the XDR Header data associated with the stored ledger.
func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry {
return reader.ledgerCloseMeta.LedgerHeaderHistoryEntry()
return reader.lcm.LedgerHeaderHistoryEntry()
}

// Read returns the next transaction in the ledger, ordered by tx number, each time
// it is called. When there are no more transactions to return, an EOF error is returned.
func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) {
if reader.readIdx < len(reader.transactions) {
reader.readIdx++
return reader.transactions[reader.readIdx-1], nil
if reader.readIdx >= reader.lcm.CountTransactions() {
return LedgerTransaction{}, io.EOF
}
return LedgerTransaction{}, io.EOF
i := reader.readIdx
reader.readIdx++ // next read will advance even on error

hash := reader.lcm.TransactionHash(i)
envelope, ok := reader.envelopesByHash[hash]
if !ok {
hexHash := hex.EncodeToString(hash[:])
return LedgerTransaction{}, errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
}

return LedgerTransaction{
Index: uint32(i + 1), // Transactions start at '1'
Envelope: envelope,
Result: reader.lcm.TransactionResultPair(i),
UnsafeMeta: reader.lcm.TxApplyProcessing(i),
FeeChanges: reader.lcm.FeeProcessing(i),
LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
}, nil
}

// Rewind resets the reader back to the first transaction in the ledger
func (reader *LedgerTransactionReader) Rewind() {
reader.readIdx = 0
reader.Seek(0)
}

// Seek sets the reader back to a specific transaction in the ledger
func (reader *LedgerTransactionReader) Seek(index int) error {
if index >= reader.lcm.CountTransactions() || index < 0 {
return io.EOF
}

reader.readIdx = index
return nil
}

// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide
// a per-transaction view of the data when Read() is called.
func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta, networkPassphrase string) error {
byHash := map[xdr.Hash]xdr.TransactionEnvelope{}
for i, tx := range lcm.TransactionEnvelopes() {
// storeHashes creates a mapping between hashes and envelopes in order to
// correctly provide a per-transaction view on-the-fly when Read() is called.
func (reader *LedgerTransactionReader) storeTransactions(networkPassphrase string) error {
// See https://github.com/stellar/go/pull/2720: envelopes in the meta (which
// just come straight from the agreed-upon transaction set) are not in the
// same order as the actual list of metas (which are sorted by hash), so we
// need to hash the envelopes *first* to properly associate them with their
// metas.
for i, tx := range reader.lcm.TransactionEnvelopes() {
hash, err := network.HashTransactionInEnvelope(tx, networkPassphrase)
if err != nil {
return errors.Wrapf(err, "could not hash transaction %d in TxSet", i)
}
byHash[hash] = tx
}
reader.envelopesByHash[xdr.Hash(hash)] = tx

for i := 0; i < lcm.CountTransactions(); i++ {
hash := lcm.TransactionHash(i)
envelope, ok := byHash[hash]
if !ok {
hexHash := hex.EncodeToString(hash[:])
return errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
// We check the version only if FeeProcessing is non-empty, because some
// backends (like HistoryArchiveBackend) do not return meta.
//
// Note that the ordering differences are irrelevant here because all we
// care about is checking every meta for this condition.
if reader.lcm.ProtocolVersion() < 10 && reader.lcm.TxApplyProcessing(i).V < 2 &&
len(reader.lcm.FeeProcessing(i)) > 0 {
return badMetaVersionErr
}

// We check the version only if FeeProcessing are non empty because some backends
// (like HistoryArchiveBackend) do not return meta.
if lcm.ProtocolVersion() < 10 && lcm.TxApplyProcessing(i).V < 2 &&
len(lcm.FeeProcessing(i)) > 0 {
return errors.New(
"TransactionMeta.V=2 is required in protocol version older than version 10. " +
"Please process ledgers again using the latest stellar-core version.",
)
}

reader.transactions = append(reader.transactions, LedgerTransaction{
Index: uint32(i + 1), // Transactions start at '1'
Envelope: envelope,
Result: lcm.TransactionResultPair(i),
UnsafeMeta: lcm.TxApplyProcessing(i),
FeeChanges: lcm.FeeProcessing(i),
LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
})
}

return nil
}

// Close should be called when reading is finished. This is especially
// helpful when there are still some transactions available so reader can stop
// streaming them.
func (reader *LedgerTransactionReader) Close() error {
reader.transactions = nil
reader.envelopesByHash = nil
return nil
}
143 changes: 143 additions & 0 deletions ingest/ledger_transaction_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package ingest

import (
"io"
"testing"

"github.com/stellar/go/keypair"
"github.com/stellar/go/network"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
passphrase = network.TestNetworkPassphrase
// Test prep:
// - two different envelopes which resolve to two different hashes
// - two basically-empty metas that contain the corresponding hashes
// - a ledger that has 5 txs with metas corresponding to these two envs
// - specifically, in the order [first, first, second, second, second]
//
// This tests both hash <--> envelope mapping and indexed iteration.
txEnvs, txHashes, txMetas = makeTransactions(5)
// barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs
ledgerCloseMeta = xdr.LedgerCloseMeta{V: 1,
V1: &xdr.LedgerCloseMetaV1{
TxProcessing: txMetas,
TxSet: xdr.GeneralizedTransactionSet{V: 1,
V1TxSet: &xdr.TransactionSetV1{
Phases: []xdr.TransactionPhase{{V: 0,
V0Components: &[]xdr.TxSetComponent{{
TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{
Txs: txEnvs,
}},
},
}},
},
},
},
}
)

func TestTransactionReader(t *testing.T) {
s := set.NewSet[xdr.Hash](5)
for _, hash := range txHashes {
s.Add(hash)
}
require.Lenf(t, s, len(txHashes), "precondition: hashes aren't unique, envs: %+v", txEnvs)

// simplest case: read from start

reader, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta)
require.NoError(t, err)

for i := 0; i < 5; i++ {
tx, ierr := reader.Read()
require.NoError(t, ierr)
assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope)
assert.Equal(t, txHashes[tx.Index-1], thisHash)
}
_, err = reader.Read()
require.ErrorIs(t, err, io.EOF)

// start reading from the middle set of txs

require.NoError(t, reader.Seek(2))
for i := 0; i < 3; i++ {
tx, ierr := reader.Read()
require.NoError(t, ierr)
assert.EqualValues(t,
/* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */
1+(i+2)%5,
tx.Index,
"iteration i=%d", i)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope)
assert.Equal(t, txHashes[tx.Index-1], thisHash)
}
_, err = reader.Read()
require.ErrorIs(t, err, io.EOF)

// edge case: start from the last tx
require.NoError(t, reader.Seek(4))
tx, ierr := reader.Read()
require.NoError(t, ierr)
assert.EqualValues(t, 5, tx.Index)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
assert.Equal(t, txEnvs[4], tx.Envelope)
assert.Equal(t, txHashes[4], thisHash)
_, err = reader.Read()
require.ErrorIs(t, err, io.EOF)

// error case: too far or too close
for _, idx := range []int{-1, 5, 6} {
rdr, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta)
require.NoError(t, err)
require.Error(t, rdr.Seek(idx), "no error when trying seek=%d", idx)
}
}

func makeTransactions(count int) (
envs []xdr.TransactionEnvelope,
hashes [][32]byte,
metas []xdr.TransactionResultMeta,
) {
seqNum := 123_456
for i := 0; i < count; i++ {
txEnv := xdr.TransactionEnvelope{
Type: xdr.EnvelopeTypeEnvelopeTypeTx,
V1: &xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
Ext: xdr.TransactionExt{V: 0},
SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()),
Operations: []xdr.Operation{},
Fee: xdr.Uint32(seqNum + i),
SeqNum: xdr.SequenceNumber(seqNum + i),
},
Signatures: []xdr.DecoratedSignature{},
},
}

txHash, _ := network.HashTransactionInEnvelope(txEnv, passphrase)
txMeta := xdr.TransactionResultMeta{
Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash)},
TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}},
}

envs = append(envs, txEnv)
hashes = append(hashes, txHash)
metas = append(metas, txMeta)
}

return
}

0 comments on commit 73233da

Please sign in to comment.