Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: Make LedgerTransactionReader seekable and lazy #5274

Merged
merged 11 commits into from
Apr 16, 2024
19 changes: 16 additions & 3 deletions ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,27 @@ All notable changes to this project will be documented in this file. This projec

## Unreleased

* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050)

### New Features
* **Performance improvement**: the Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later.
* Support for Soroban and Protocol 20!
* There is now a `LazyTransactionReader` that provides transactions starting from any part of the ledger and will only transform the ones that are being used [5274](https://github.com/stellar/go/pull/5274).
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
* `Change` now has a canonical stringification and a set of them is deterministically sortable.
* `NewCompactingChangeReader` will give you a wrapped `ChangeReader` that compacts the changes.
* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050).

### Performance Improvements
* The Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later.
* There have been miscallaneous memory and processing speed improvements.

### Bug Fixes
* The Stellar Core runner now parses logs from its underlying subprocess better [#3746](https://github.com/stellar/go/pull/3746).
* Ensures that the underlying Stellar Core is terminated before restarting.
* Backends will now connect with a user agent.
* Better handling of various error and restart scenarios.

### Breaking Changes
* **Captive Core is now the only available backend.**
* The Captive Core configuration should be provided via a TOML file.
* `Change.AccountSignersChanged` has been removed.

## v2.0.0

Expand Down
26 changes: 13 additions & 13 deletions ingest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ From a high level, the ingestion library is broken down into a few modular compo

```
[ Processors ]
|
/ \
/ \
/ \
[Change] [Transaction]
| |
|---+---| |
Checkpoint Ledger Ledger
Change Change Transaction
Reader Reader Reader
_|_
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
/ \
/ \
/ \
[Change] [Transaction]
| |
|---+---| |-------+----|
Checkpoint Ledger Ledger Lazy
Change Change Transaction Transaction
Reader Reader Reader Reader

[ Ledger Backend ]
|
|
Captive
Core
Captive
Core
```

This is described in a little more detail in [`doc.go`](./doc.go), its accompanying examples, the documentation within this package, and the rest of this tutorial.
Expand Down Expand Up @@ -290,7 +290,7 @@ First thing's first: we need to establish a connection to a history archive.
```

## Tracking Changes
Each history archive contains the current cumulative state of the entire network.
Each history archive contains the current cumulative state of the entire network.

Now we can use the history archive to actually read in all of the changes that have accumulated in the entire network by a particular checkpoint.

Expand Down
6 changes: 3 additions & 3 deletions ingest/ledger_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (r *LedgerChangeReader) Read() (Change, error) {
}
return r.Read()
case evictionChangesState:
entries, err := r.ledgerCloseMeta.EvictedPersistentLedgerEntries()
entries, err := r.lcm.EvictedPersistentLedgerEntries()
if err != nil {
return Change{}, err
}
Expand All @@ -196,9 +196,9 @@ func (r *LedgerChangeReader) Read() (Change, error) {
return r.Read()
case upgradeChangesState:
// Get upgrade changes
if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()) {
if r.upgradeIndex < len(r.LedgerTransactionReader.lcm.UpgradesProcessing()) {
changes := GetChangesFromLedgerEntryChanges(
r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()[r.upgradeIndex].Changes,
r.LedgerTransactionReader.lcm.UpgradesProcessing()[r.upgradeIndex].Changes,
)
r.pending = append(r.pending, changes...)
r.upgradeIndex++
Expand Down
140 changes: 87 additions & 53 deletions ingest/ledger_transaction_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,29 @@ import (
"github.com/stellar/go/xdr"
)

// LedgerTransactionReader reads transactions for a given ledger sequence from a backend.
// Use NewTransactionReader to create a new instance.
var badMetaVersionErr = errors.New(
"TransactionMeta.V=2 is required in protocol version older than version 10. " +
"Please process ledgers again using the latest stellar-core version.",
)

// LedgerTransactionReader reads transactions for a given ledger sequence from a
// backend. Use NewTransactionReader to create a new instance.
type LedgerTransactionReader struct {
ledgerCloseMeta xdr.LedgerCloseMeta
transactions []LedgerTransaction
readIdx int
lcm xdr.LedgerCloseMeta // read-only
envelopesByHash map[xdr.Hash]xdr.TransactionEnvelope // set once

readIdx int // tracks iteration & seeking
}

// NewLedgerTransactionReader creates a new TransactionReader instance.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) {
// NewLedgerTransactionReader creates a new TransactionReader instance. Note
// that TransactionReader is not thread safe and should not be shared by
// multiple goroutines.
func NewLedgerTransactionReader(
ctx context.Context,
backend ledgerbackend.LedgerBackend,
networkPassphrase string,
sequence uint32,
) (*LedgerTransactionReader, error) {
ledgerCloseMeta, err := backend.GetLedger(ctx, sequence)
if err != nil {
return nil, errors.Wrap(err, "error getting ledger from the backend")
Expand All @@ -30,87 +42,109 @@ func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.Ledge
return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta)
}

// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader instance from xdr.LedgerCloseMeta.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error) {
reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta}
if err := reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil {
// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader
// instance from xdr.LedgerCloseMeta. Note that TransactionReader is not thread
// safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta(
networkPassphrase string,
ledgerCloseMeta xdr.LedgerCloseMeta,
) (*LedgerTransactionReader, error) {
reader := &LedgerTransactionReader{
lcm: ledgerCloseMeta,
envelopesByHash: make(map[xdr.Hash]xdr.TransactionEnvelope, ledgerCloseMeta.CountTransactions()),
readIdx: 0,
}

if err := reader.storeTransactions(networkPassphrase); err != nil {
return nil, errors.Wrap(err, "error extracting transactions from ledger close meta")
}
return reader, nil
}

// GetSequence returns the sequence number of the ledger data stored by this object.
func (reader *LedgerTransactionReader) GetSequence() uint32 {
return reader.ledgerCloseMeta.LedgerSequence()
return reader.lcm.LedgerSequence()
}

// GetHeader returns the XDR Header data associated with the stored ledger.
func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry {
return reader.ledgerCloseMeta.LedgerHeaderHistoryEntry()
return reader.lcm.LedgerHeaderHistoryEntry()
}

// Read returns the next transaction in the ledger, ordered by tx number, each time
// it is called. When there are no more transactions to return, an EOF error is returned.
func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) {
if reader.readIdx < len(reader.transactions) {
reader.readIdx++
return reader.transactions[reader.readIdx-1], nil
if reader.readIdx >= reader.lcm.CountTransactions() {
return LedgerTransaction{}, io.EOF
}
return LedgerTransaction{}, io.EOF
i := reader.readIdx
reader.readIdx++ // next read will advance even on error

hash := reader.lcm.TransactionHash(i)
envelope, ok := reader.envelopesByHash[hash]
if !ok {
hexHash := hex.EncodeToString(hash[:])
return LedgerTransaction{}, errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
}

return LedgerTransaction{
Index: uint32(i + 1), // Transactions start at '1'
Envelope: envelope,
Result: reader.lcm.TransactionResultPair(i),
UnsafeMeta: reader.lcm.TxApplyProcessing(i),
FeeChanges: reader.lcm.FeeProcessing(i),
LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
}, nil
}

// Rewind resets the reader back to the first transaction in the ledger
func (reader *LedgerTransactionReader) Rewind() {
reader.readIdx = 0
reader.Seek(0)
}

// Seek sets the reader back to a specific transaction in the ledger
func (reader *LedgerTransactionReader) Seek(index int) error {
if index >= reader.lcm.CountTransactions() || index < 0 {
return io.EOF
}

reader.readIdx = index
return nil
}

// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide
// a per-transaction view of the data when Read() is called.
func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta, networkPassphrase string) error {
byHash := map[xdr.Hash]xdr.TransactionEnvelope{}
for i, tx := range lcm.TransactionEnvelopes() {
// storeHashes creates a mapping between hashes and envelopes in order to
// correctly provide a per-transaction view on-the-fly when Read() is called.
func (reader *LedgerTransactionReader) storeTransactions(networkPassphrase string) error {
// See https://github.com/stellar/go/pull/2720: envelopes in the meta (which
// just come straight from the agreed-upon transaction set) are not in the
// same order as the actual list of metas (which are sorted by hash), so we
// need to hash the envelopes *first* to properly associate them with their
// metas.
for i, tx := range reader.lcm.TransactionEnvelopes() {
hash, err := network.HashTransactionInEnvelope(tx, networkPassphrase)
if err != nil {
return errors.Wrapf(err, "could not hash transaction %d in TxSet", i)
}
byHash[hash] = tx
}
reader.envelopesByHash[xdr.Hash(hash)] = tx

for i := 0; i < lcm.CountTransactions(); i++ {
hash := lcm.TransactionHash(i)
envelope, ok := byHash[hash]
if !ok {
hexHash := hex.EncodeToString(hash[:])
return errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
// We check the version only if FeeProcessing is non-empty, because some
// backends (like HistoryArchiveBackend) do not return meta.
//
// Note that the ordering differences are irrelevant here because all we
// care about is checking every meta for this condition.
if reader.lcm.ProtocolVersion() < 10 && reader.lcm.TxApplyProcessing(i).V < 2 &&
len(reader.lcm.FeeProcessing(i)) > 0 {
return badMetaVersionErr
}

// We check the version only if FeeProcessing are non empty because some backends
// (like HistoryArchiveBackend) do not return meta.
if lcm.ProtocolVersion() < 10 && lcm.TxApplyProcessing(i).V < 2 &&
len(lcm.FeeProcessing(i)) > 0 {
return errors.New(
"TransactionMeta.V=2 is required in protocol version older than version 10. " +
"Please process ledgers again using the latest stellar-core version.",
)
}

reader.transactions = append(reader.transactions, LedgerTransaction{
Index: uint32(i + 1), // Transactions start at '1'
Envelope: envelope,
Result: lcm.TransactionResultPair(i),
UnsafeMeta: lcm.TxApplyProcessing(i),
FeeChanges: lcm.FeeProcessing(i),
LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
})
}

return nil
}

// Close should be called when reading is finished. This is especially
// helpful when there are still some transactions available so reader can stop
// streaming them.
func (reader *LedgerTransactionReader) Close() error {
reader.transactions = nil
reader.envelopesByHash = nil
return nil
}
Loading
Loading