diff --git a/exp/ingest/io/change.go b/exp/ingest/io/change.go index 5a87301a4f..6d65cd7e74 100644 --- a/exp/ingest/io/change.go +++ b/exp/ingest/io/change.go @@ -20,6 +20,54 @@ type Change struct { Post *xdr.LedgerEntry } +// GetChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. +// Each `update` and `removed` is preceded with `state` and `create` changes +// are alone, without `state`. The transformation we're doing is to move each +// change (state/update, state/removed or create) to an array of pre/post pairs. +// Then: +// - for create, pre is null and post is a new entry, +// - for update, pre is previous state and post is the current state, +// - for removed, pre is previous state and post is null. +// +// stellar-core source: +// https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582 +func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change { + changes := []Change{} + + for i, entryChange := range ledgerEntryChanges { + switch entryChange.Type { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + created := entryChange.MustCreated() + changes = append(changes, Change{ + Type: created.Data.Type, + Pre: nil, + Post: &created, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + state := ledgerEntryChanges[i-1].MustState() + updated := entryChange.MustUpdated() + changes = append(changes, Change{ + Type: state.Data.Type, + Pre: &state, + Post: &updated, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + state := ledgerEntryChanges[i-1].MustState() + changes = append(changes, Change{ + Type: state.Data.Type, + Pre: &state, + Post: nil, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryState: + continue + default: + panic("Invalid LedgerEntryChangeType") + } + } + + return changes +} + // LedgerEntryChangeType returns type in terms of LedgerEntryChangeType. func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { switch { diff --git a/exp/ingest/io/change_reader.go b/exp/ingest/io/change_reader.go deleted file mode 100644 index 065df2989e..0000000000 --- a/exp/ingest/io/change_reader.go +++ /dev/null @@ -1,165 +0,0 @@ -package io - -import ( - "context" - "io" - - "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/xdr" -) - -// ChangeReader provides convenient, streaming access to a sequence of Changes -type ChangeReader interface { - // Read should return the next `Change` in the leader. If there are no more - // changes left it should return an `io.EOF` error. - Read() (Change, error) - // Close should be called when reading is finished. This is especially - // helpful when there are still some changes available so reader can stop - // streaming them. - Close() error -} - -// LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core -// for a single ledger -type LedgerChangeReader struct { - dbReader DBLedgerReader - streamedFeeChanges bool - streamedMetaChanges bool - streamedUpgradeChanges bool - pending []Change - pendingIndex int -} - -// Ensure LedgerChangeReader implements ChangeReader -var _ ChangeReader = (*LedgerChangeReader)(nil) - -// NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. -// Note that the returned LedgerChangeReader is not thread safe and should not be shared -// by multiple goroutines. -func NewLedgerChangeReader( - ctx context.Context, sequence uint32, backend ledgerbackend.LedgerBackend, -) (*LedgerChangeReader, error) { - reader, err := NewDBLedgerReader(ctx, sequence, backend) - if err != nil { - return nil, err - } - - return &LedgerChangeReader{dbReader: *reader}, nil -} - -// GetHeader returns the ledger header for the reader -func (r *LedgerChangeReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return r.dbReader.GetHeader() -} - -func (r *LedgerChangeReader) getNextFeeChange() (Change, error) { - if r.streamedFeeChanges { - return Change{}, io.EOF - } - - // Remember that it's possible that transaction can remove a preauth - // tx signer even when it's a failed transaction so we need to check - // failed transactions too. - for { - transaction, err := r.dbReader.Read() - if err != nil { - if err == io.EOF { - r.dbReader.rewind() - r.streamedFeeChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - changes := transaction.GetFeeChanges() - if len(changes) >= 1 { - r.pending = append(r.pending, changes[1:]...) - return changes[0], nil - } - } -} - -func (r *LedgerChangeReader) getNextMetaChange() (Change, error) { - if r.streamedMetaChanges { - return Change{}, io.EOF - } - - for { - transaction, err := r.dbReader.Read() - if err != nil { - if err == io.EOF { - r.streamedMetaChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - changes, err := transaction.GetChanges() - if err != nil { - return Change{}, err - } - if len(changes) >= 1 { - r.pending = append(r.pending, changes[1:]...) - return changes[0], nil - } - } -} - -func (r *LedgerChangeReader) getNextUpgradeChange() (Change, error) { - if r.streamedUpgradeChanges { - return Change{}, io.EOF - } - - change, err := r.dbReader.readUpgradeChange() - if err != nil { - if err == io.EOF { - r.streamedUpgradeChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - return change, nil -} - -// Read returns the next change in the stream. -// If there are no changes remaining io.EOF is returned -// as an error. -func (r *LedgerChangeReader) Read() (Change, error) { - if err := r.dbReader.ctx.Err(); err != nil { - return Change{}, err - } - - if r.pendingIndex < len(r.pending) { - next := r.pending[r.pendingIndex] - r.pendingIndex++ - if r.pendingIndex == len(r.pending) { - r.pendingIndex = 0 - r.pending = r.pending[:0] - } - return next, nil - } - - change, err := r.getNextFeeChange() - if err == nil || err != io.EOF { - return change, err - } - - change, err = r.getNextMetaChange() - if err == nil || err != io.EOF { - return change, err - } - - return r.getNextUpgradeChange() -} - -func (r *LedgerChangeReader) Close() error { - r.pending = nil - r.streamedFeeChanges = true - r.streamedMetaChanges = true - r.streamedUpgradeChanges = true - return r.dbReader.Close() -} diff --git a/exp/ingest/io/ledger_change_reader.go b/exp/ingest/io/ledger_change_reader.go new file mode 100644 index 0000000000..3879efaebe --- /dev/null +++ b/exp/ingest/io/ledger_change_reader.go @@ -0,0 +1,127 @@ +package io + +import ( + "io" + + "github.com/stellar/go/exp/ingest/ledgerbackend" +) + +// ChangeReader provides convenient, streaming access to a sequence of Changes. +type ChangeReader interface { + // Read should return the next `Change` in the leader. If there are no more + // changes left it should return an `io.EOF` error. + Read() (Change, error) + // Close should be called when reading is finished. This is especially + // helpful when there are still some changes available so reader can stop + // streaming them. + Close() error +} + +// ledgerChangeReaderState defines possible states of LedgerChangeReader. +type ledgerChangeReaderState int + +const ( + // feeChangesState is active when LedgerChangeReader is reading fee changes. + feeChangesState ledgerChangeReaderState = iota + // feeChangesState is active when LedgerChangeReader is reading transaction meta changes. + metaChangesState + // feeChangesState is active when LedgerChangeReader is reading upgrade changes. + upgradeChangesState +) + +// LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core +// for a single ledger +type LedgerChangeReader struct { + *LedgerTransactionReader + state ledgerChangeReaderState + pending []Change + pendingIndex int + upgradeIndex int +} + +// Ensure LedgerChangeReader implements ChangeReader +var _ ChangeReader = (*LedgerChangeReader)(nil) + +// NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. +// Note that the returned LedgerChangeReader is not thread safe and should not be shared +// by multiple goroutines. +func NewLedgerChangeReader(backend ledgerbackend.LedgerBackend, sequence uint32) (*LedgerChangeReader, error) { + transactionReader, err := NewLedgerTransactionReader(backend, sequence) + if err != nil { + return nil, err + } + + return &LedgerChangeReader{ + LedgerTransactionReader: transactionReader, + state: feeChangesState, + }, nil +} + +// Read returns the next change in the stream. +// If there are no changes remaining io.EOF is returned as an error. +func (r *LedgerChangeReader) Read() (Change, error) { + // Changes within a ledger should be read in the following order: + // - fee changes of all transactions, + // - transaction meta changes of all transactions, + // - upgrade changes. + // Because a single transaction can introduce many changes we read all the + // changes from a single transaction and save them in r.pending. + // When Read() is called we stream pending changes first. We also call Read() + // recursively after adding some changes (what will return them from r.pending) + // to not duplicate the code. + if r.pendingIndex < len(r.pending) { + next := r.pending[r.pendingIndex] + r.pendingIndex++ + if r.pendingIndex == len(r.pending) { + r.pendingIndex = 0 + r.pending = r.pending[:0] + } + return next, nil + } + + switch r.state { + case feeChangesState, metaChangesState: + tx, err := r.LedgerTransactionReader.Read() + if err != nil { + if err == io.EOF { + // If done streaming fee changes rewind to stream meta changes + if r.state == feeChangesState { + r.LedgerTransactionReader.Rewind() + } + r.state++ + return r.Read() + } + return Change{}, err + } + + switch r.state { + case feeChangesState: + r.pending = append(r.pending, tx.GetFeeChanges()...) + case metaChangesState: + metaChanges, err := tx.GetChanges() + if err != nil { + return Change{}, err + } + r.pending = append(r.pending, metaChanges...) + } + return r.Read() + case upgradeChangesState: + // Get upgrade changes + if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesMeta) { + changes := GetChangesFromLedgerEntryChanges( + r.LedgerTransactionReader.ledgerCloseMeta.UpgradesMeta[r.upgradeIndex], + ) + r.pending = append(r.pending, changes...) + r.upgradeIndex++ + return r.Read() + } + } + + return Change{}, io.EOF +} + +// Close should be called when reading is finished. +func (r *LedgerChangeReader) Close() error { + r.pending = nil + return r.LedgerTransactionReader.Close() +} diff --git a/exp/ingest/io/ledger_change_reader_test.go b/exp/ingest/io/ledger_change_reader_test.go index 85cf27ee30..4fc58d6180 100644 --- a/exp/ingest/io/ledger_change_reader_test.go +++ b/exp/ingest/io/ledger_change_reader_test.go @@ -1,7 +1,6 @@ package io import ( - "context" "fmt" "io" "testing" @@ -26,11 +25,11 @@ func TestNewLedgerChangeReaderFails(t *testing.T) { ledgerbackend.LedgerCloseMeta{}, fmt.Errorf("ledger error"), ).Once() - _, err := NewLedgerChangeReader(context.Background(), seq, mock) + _, err := NewLedgerChangeReader(mock, seq) assert.EqualError( t, err, - "error reading ledger from backend: ledger error", + "error getting ledger from the backend: ledger error", ) } @@ -42,7 +41,7 @@ func TestNewLedgerChangeReaderLedgerDoesNotExist(t *testing.T) { ledgerbackend.LedgerCloseMeta{}, nil, ).Once() - _, err := NewLedgerChangeReader(context.Background(), seq, mock) + _, err := NewLedgerChangeReader(mock, seq) assert.Equal( t, err, @@ -69,7 +68,7 @@ func TestNewLedgerChangeReaderSucceeds(t *testing.T) { nil, ).Once() - reader, err := NewLedgerChangeReader(context.Background(), seq, mock) + reader, err := NewLedgerChangeReader(mock, seq) assert.NoError(t, err) assert.Equal(t, reader.GetHeader(), header) @@ -108,7 +107,7 @@ func assertChangesEqual( backend ledgerbackend.LedgerBackend, expected []balanceEntry, ) { - reader, err := NewLedgerChangeReader(context.Background(), sequence, backend) + reader, err := NewLedgerChangeReader(backend, sequence) assert.NoError(t, err) changes := []balanceEntry{} @@ -266,73 +265,3 @@ func TestLedgerChangeReaderOrder(t *testing.T) { assertChangesEqual(t, seq, mock, []balanceEntry{}) mock.AssertExpectations(t) } - -func TestLedgerChangeReaderContext(t *testing.T) { - mock := &ledgerbackend.MockDatabaseBackend{} - seq := uint32(123) - - ledger := ledgerbackend.LedgerCloseMeta{ - TransactionResult: []xdr.TransactionResultPair{ - xdr.TransactionResultPair{}, - xdr.TransactionResultPair{}, - }, - TransactionEnvelope: []xdr.TransactionEnvelope{ - xdr.TransactionEnvelope{}, - xdr.TransactionEnvelope{}, - }, - TransactionMeta: []xdr.TransactionMeta{ - xdr.TransactionMeta{ - V: 1, - V1: &xdr.TransactionMetaV1{ - Operations: []xdr.OperationMeta{}, - }, - }, - xdr.TransactionMeta{ - V: 1, - V1: &xdr.TransactionMetaV1{ - Operations: []xdr.OperationMeta{}, - }, - }, - }, - TransactionFeeChanges: []xdr.LedgerEntryChanges{ - xdr.LedgerEntryChanges{ - buildChange(feeAddress, 100), - }, - xdr.LedgerEntryChanges{ - buildChange(feeAddress, 300), - }, - }, - UpgradesMeta: []xdr.LedgerEntryChanges{ - xdr.LedgerEntryChanges{ - buildChange(upgradeAddress, 2), - }, - xdr.LedgerEntryChanges{ - buildChange(upgradeAddress, 3), - }, - }, - } - - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() - ctx, cancel := context.WithCancel(context.Background()) - reader, err := NewLedgerChangeReader(ctx, seq, mock) - mock.AssertExpectations(t) - assert.NoError(t, err) - - cancel() - _, err = reader.Read() - assert.Equal(t, context.Canceled, err) - - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() - ctx, cancel = context.WithCancel(context.Background()) - reader, err = NewLedgerChangeReader(ctx, seq, mock) - mock.AssertExpectations(t) - assert.NoError(t, err) - - change, err := reader.Read() - assert.Equal(t, balanceEntry{feeAddress, 100}, parseChange(change)) - assert.NoError(t, err) - - cancel() - _, err = reader.Read() - assert.Equal(t, context.Canceled, err) -} diff --git a/exp/ingest/io/ledger_reader.go b/exp/ingest/io/ledger_reader.go deleted file mode 100644 index 63bc271cd8..0000000000 --- a/exp/ingest/io/ledger_reader.go +++ /dev/null @@ -1,144 +0,0 @@ -package io - -import ( - "context" - "io" - - "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" -) - -// LedgerReader provides convenient, streaming access to the transactions within a ledger. -type LedgerReader interface { - GetSequence() uint32 - GetHeader() xdr.LedgerHeaderHistoryEntry - // Read should return the next transaction. If there are no more - // transactions it should return `io.EOF` error. - Read() (LedgerTransaction, error) - // Close should be called when reading is finished. This is especially - // helpful when there are still some transactions available so reader can stop - // streaming them. - Close() error -} - -// DBLedgerReader is a database-backed implementation of the io.LedgerReader interface. -// Use NewDBLedgerReader to create a new instance. -type DBLedgerReader struct { - ctx context.Context - sequence uint32 - backend ledgerbackend.LedgerBackend - header xdr.LedgerHeaderHistoryEntry - transactions []LedgerTransaction - upgradeChanges []Change - readIdx int - upgradeReadIdx int -} - -// Ensure DBLedgerReader implements LedgerReader -var _ LedgerReader = (*DBLedgerReader)(nil) - -// NewDBLedgerReader creates a new DBLedgerReader instance. -// Note that DBLedgerReader is not thread safe and should not be shared by multiple goroutines -func NewDBLedgerReader( - ctx context.Context, sequence uint32, backend ledgerbackend.LedgerBackend, -) (*DBLedgerReader, error) { - reader := &DBLedgerReader{ - ctx: ctx, - sequence: sequence, - backend: backend, - } - - err := reader.init() - if err != nil { - return nil, err - } - - return reader, nil -} - -// GetSequence returns the sequence number of the ledger data stored by this object. -func (dblrc *DBLedgerReader) GetSequence() uint32 { - return dblrc.sequence -} - -// GetHeader returns the XDR Header data associated with the stored ledger. -func (dblrc *DBLedgerReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return dblrc.header -} - -// Read returns the next transaction in the ledger, ordered by tx number, each time it is called. When there -// are no more transactions to return, an EOF error is returned. -func (dblrc *DBLedgerReader) Read() (LedgerTransaction, error) { - if err := dblrc.ctx.Err(); err != nil { - return LedgerTransaction{}, err - } - - if dblrc.readIdx < len(dblrc.transactions) { - dblrc.readIdx++ - return dblrc.transactions[dblrc.readIdx-1], nil - } - return LedgerTransaction{}, io.EOF -} - -// readUpgradeChange returns the next upgrade change in the ledger, each time it -// is called. When there are no more upgrades to return, an EOF error is returned. -func (dblrc *DBLedgerReader) readUpgradeChange() (Change, error) { - if err := dblrc.ctx.Err(); err != nil { - return Change{}, err - } - - if dblrc.upgradeReadIdx < len(dblrc.upgradeChanges) { - dblrc.upgradeReadIdx++ - return dblrc.upgradeChanges[dblrc.upgradeReadIdx-1], nil - } - return Change{}, io.EOF -} - -// Rewind resets the reader back to the first transaction in the ledger -func (dblrc *DBLedgerReader) rewind() { - dblrc.readIdx = 0 -} - -// Init pulls data from the backend to set this object up for use. -func (dblrc *DBLedgerReader) init() error { - exists, ledgerCloseMeta, err := dblrc.backend.GetLedger(dblrc.sequence) - - if err != nil { - return errors.Wrap(err, "error reading ledger from backend") - } - if !exists { - return ErrNotFound - } - - dblrc.header = ledgerCloseMeta.LedgerHeader - - dblrc.storeTransactions(ledgerCloseMeta) - - for _, upgradeChanges := range ledgerCloseMeta.UpgradesMeta { - changes := getChangesFromLedgerEntryChanges(upgradeChanges) - dblrc.upgradeChanges = append(dblrc.upgradeChanges, changes...) - } - - return nil -} - -// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide -// a per-transaction view of the data when Read() is called. -func (dblrc *DBLedgerReader) storeTransactions(lcm ledgerbackend.LedgerCloseMeta) { - for i := range lcm.TransactionEnvelope { - dblrc.transactions = append(dblrc.transactions, LedgerTransaction{ - Index: uint32(i + 1), // Transactions start at '1' - Envelope: lcm.TransactionEnvelope[i], - Result: lcm.TransactionResult[i], - Meta: lcm.TransactionMeta[i], - FeeChanges: lcm.TransactionFeeChanges[i], - }) - } -} - -func (dblrc *DBLedgerReader) Close() error { - dblrc.transactions = nil - dblrc.upgradeChanges = nil - return nil -} diff --git a/exp/ingest/io/ledger_transaction.go b/exp/ingest/io/ledger_transaction.go index 0f222f4f8c..766d2afe67 100644 --- a/exp/ingest/io/ledger_transaction.go +++ b/exp/ingest/io/ledger_transaction.go @@ -24,7 +24,7 @@ func (t *LedgerTransaction) txInternalError() bool { // GetFeeChanges returns a developer friendly representation of LedgerEntryChanges // connected to fees. func (t *LedgerTransaction) GetFeeChanges() []Change { - return getChangesFromLedgerEntryChanges(t.FeeChanges) + return GetChangesFromLedgerEntryChanges(t.FeeChanges) } // GetChanges returns a developer friendly representation of LedgerEntryChanges. @@ -40,7 +40,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { return changes, errors.New("TransactionMeta.V=0 not supported") case 1: v1Meta := t.Meta.MustV1() - txChanges := getChangesFromLedgerEntryChanges(v1Meta.TxChanges) + txChanges := GetChangesFromLedgerEntryChanges(v1Meta.TxChanges) changes = append(changes, txChanges...) // Ignore operations meta if txInternalError https://github.com/stellar/go/issues/2111 @@ -49,7 +49,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { } for _, operationMeta := range v1Meta.Operations { - opChanges := getChangesFromLedgerEntryChanges( + opChanges := GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) changes = append(changes, opChanges...) @@ -57,7 +57,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { case 2: v2Meta := t.Meta.MustV2() - txChangesBefore := getChangesFromLedgerEntryChanges(v2Meta.TxChangesBefore) + txChangesBefore := GetChangesFromLedgerEntryChanges(v2Meta.TxChangesBefore) changes = append(changes, txChangesBefore...) // Ignore operations meta and txChangesAfter if txInternalError @@ -67,13 +67,13 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { } for _, operationMeta := range v2Meta.Operations { - opChanges := getChangesFromLedgerEntryChanges( + opChanges := GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) changes = append(changes, opChanges...) } - txChangesAfter := getChangesFromLedgerEntryChanges(v2Meta.TxChangesAfter) + txChangesAfter := GetChangesFromLedgerEntryChanges(v2Meta.TxChangesAfter) changes = append(changes, txChangesAfter...) default: return changes, errors.New("Unsupported TransactionMeta version") @@ -120,55 +120,7 @@ func operationChanges(ops []xdr.OperationMeta, index uint32) []Change { } operationMeta := ops[index] - return getChangesFromLedgerEntryChanges( + return GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) } - -// getChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. -// Each `update` and `removed` is preceded with `state` and `create` changes -// are alone, without `state`. The transformation we're doing is to move each -// change (state/update, state/removed or create) to an array of pre/post pairs. -// Then: -// - for create, pre is null and post is a new entry, -// - for update, pre is previous state and post is the current state, -// - for removed, pre is previous state and post is null. -// -// stellar-core source: -// https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582 -func getChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change { - changes := []Change{} - - for i, entryChange := range ledgerEntryChanges { - switch entryChange.Type { - case xdr.LedgerEntryChangeTypeLedgerEntryCreated: - created := entryChange.MustCreated() - changes = append(changes, Change{ - Type: created.Data.Type, - Pre: nil, - Post: &created, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: - state := ledgerEntryChanges[i-1].MustState() - updated := entryChange.MustUpdated() - changes = append(changes, Change{ - Type: state.Data.Type, - Pre: &state, - Post: &updated, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: - state := ledgerEntryChanges[i-1].MustState() - changes = append(changes, Change{ - Type: state.Data.Type, - Pre: &state, - Post: nil, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryState: - continue - default: - panic("Invalid LedgerEntryChangeType") - } - } - - return changes -} diff --git a/exp/ingest/io/ledger_transaction_reader.go b/exp/ingest/io/ledger_transaction_reader.go new file mode 100644 index 0000000000..534ca86a72 --- /dev/null +++ b/exp/ingest/io/ledger_transaction_reader.go @@ -0,0 +1,81 @@ +package io + +import ( + "io" + + "github.com/stellar/go/exp/ingest/ledgerbackend" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +// LedgerTransactionReader reads transactions for a given ledger sequence from a backend. +// Use NewTransactionReader to create a new instance. +type LedgerTransactionReader struct { + ledgerCloseMeta ledgerbackend.LedgerCloseMeta + transactions []LedgerTransaction + readIdx int +} + +// NewLedgerTransactionReader creates a new TransactionReader instance. +// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines +func NewLedgerTransactionReader(backend ledgerbackend.LedgerBackend, sequence uint32) (*LedgerTransactionReader, error) { + exists, ledgerCloseMeta, err := backend.GetLedger(sequence) + if err != nil { + return nil, errors.Wrap(err, "error getting ledger from the backend") + } + + if !exists { + return nil, ErrNotFound + } + + reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta} + reader.storeTransactions(ledgerCloseMeta) + return reader, nil +} + +// GetSequence returns the sequence number of the ledger data stored by this object. +func (reader *LedgerTransactionReader) GetSequence() uint32 { + return uint32(reader.ledgerCloseMeta.LedgerHeader.Header.LedgerSeq) +} + +// GetHeader returns the XDR Header data associated with the stored ledger. +func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { + return reader.ledgerCloseMeta.LedgerHeader +} + +// Read returns the next transaction in the ledger, ordered by tx number, each time +// it is called. When there are no more transactions to return, an EOF error is returned. +func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { + if reader.readIdx < len(reader.transactions) { + reader.readIdx++ + return reader.transactions[reader.readIdx-1], nil + } + return LedgerTransaction{}, io.EOF +} + +// Rewind resets the reader back to the first transaction in the ledger +func (reader *LedgerTransactionReader) Rewind() { + reader.readIdx = 0 +} + +// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide +// a per-transaction view of the data when Read() is called. +func (reader *LedgerTransactionReader) storeTransactions(lcm ledgerbackend.LedgerCloseMeta) { + for i := range lcm.TransactionEnvelope { + reader.transactions = append(reader.transactions, LedgerTransaction{ + Index: uint32(i + 1), // Transactions start at '1' + Envelope: lcm.TransactionEnvelope[i], + Result: lcm.TransactionResult[i], + Meta: lcm.TransactionMeta[i], + FeeChanges: lcm.TransactionFeeChanges[i], + }) + } +} + +// Close should be called when reading is finished. This is especially +// helpful when there are still some transactions available so reader can stop +// streaming them. +func (reader *LedgerTransactionReader) Close() error { + reader.transactions = nil + return nil +} diff --git a/exp/ingest/io/mock_ledger_reader.go b/exp/ingest/io/mock_ledger_reader.go deleted file mode 100644 index aa05bde538..0000000000 --- a/exp/ingest/io/mock_ledger_reader.go +++ /dev/null @@ -1,46 +0,0 @@ -package io - -import ( - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/mock" -) - -var _ LedgerReader = (*MockLedgerReader)(nil) - -type MockLedgerReader struct { - mock.Mock -} - -func (m *MockLedgerReader) GetSequence() uint32 { - args := m.Called() - return args.Get(0).(uint32) -} - -func (m *MockLedgerReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - args := m.Called() - return args.Get(0).(xdr.LedgerHeaderHistoryEntry) -} - -func (m *MockLedgerReader) Read() (LedgerTransaction, error) { - args := m.Called() - return args.Get(0).(LedgerTransaction), args.Error(1) -} - -func (m *MockLedgerReader) ReadUpgradeChange() (Change, error) { - args := m.Called() - return args.Get(0).(Change), args.Error(1) -} - -func (m *MockLedgerReader) GetUpgradeChanges() []Change { - args := m.Called() - return args.Get(0).([]Change) -} - -func (m *MockLedgerReader) IgnoreUpgradeChanges() { - m.Called() -} - -func (m *MockLedgerReader) Close() error { - args := m.Called() - return args.Error(0) -} diff --git a/exp/ingest/io/mock_ledger_transaction_processor.go b/exp/ingest/io/mock_ledger_transaction_processor.go deleted file mode 100644 index 4834682cc5..0000000000 --- a/exp/ingest/io/mock_ledger_transaction_processor.go +++ /dev/null @@ -1,14 +0,0 @@ -package io - -import "github.com/stretchr/testify/mock" - -var _ LedgerTransactionProcessor = (*MockLedgerTransactionProcessor)(nil) - -type MockLedgerTransactionProcessor struct { - mock.Mock -} - -func (m *MockLedgerTransactionProcessor) ProcessTransaction(transaction LedgerTransaction) error { - args := m.Called(transaction) - return args.Error(0) -} diff --git a/exp/ingest/io/processors.go b/exp/ingest/io/processors.go index 6f5271370e..ee4f2e81e1 100644 --- a/exp/ingest/io/processors.go +++ b/exp/ingest/io/processors.go @@ -16,7 +16,7 @@ type LedgerTransactionProcessor interface { func StreamLedgerTransactions( txProcessor LedgerTransactionProcessor, - reader LedgerReader, + reader *LedgerTransactionReader, ) error { for { tx, err := reader.Read() diff --git a/services/horizon/internal/expingest/processor_runner.go b/services/horizon/internal/expingest/processor_runner.go index f7bd978628..658abed151 100644 --- a/services/horizon/internal/expingest/processor_runner.go +++ b/services/horizon/internal/expingest/processor_runner.go @@ -158,20 +158,19 @@ func (s *ProcessorRunner) validateBucketList(ledgerSequence uint32) error { return errors.Wrap(err, "Error getting bucket list hash") } - ledgerReader, err := io.NewDBLedgerReader(s.ctx, ledgerSequence, s.ledgerBackend) + exists, ledgerCloseMeta, err := s.ledgerBackend.GetLedger(ledgerSequence) if err != nil { - if err == io.ErrNotFound { - return fmt.Errorf( - "cannot validate bucket hash list. Checkpoint ledger (%d) must exist in Stellar-Core database.", - ledgerSequence, - ) - } else { - return errors.Wrap(err, "Error getting ledger") - } + return errors.Wrap(err, "Error getting ledger") + } + + if !exists { + return fmt.Errorf( + "cannot validate bucket hash list. Checkpoint ledger (%d) must exist in Stellar-Core database.", + ledgerSequence, + ) } - ledgerHeader := ledgerReader.GetHeader() - ledgerBucketHashList := ledgerHeader.Header.BucketListHash + ledgerBucketHashList := ledgerCloseMeta.LedgerHeader.Header.BucketListHash if !bytes.Equal(historyBucketListHash[:], ledgerBucketHashList[:]) { return fmt.Errorf( @@ -238,7 +237,7 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( ) error { var changeReader io.ChangeReader var err error - changeReader, err = io.NewLedgerChangeReader(s.ctx, ledger, s.ledgerBackend) + changeReader, err = io.NewLedgerChangeReader(s.ledgerBackend, ledger) if err != nil { return errors.Wrap(err, "Error creating ledger change reader") } @@ -264,13 +263,13 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger uint32) (io.StatsLedgerTransactionProcessorResults, error) { ledgerTransactionStats := io.StatsLedgerTransactionProcessor{} - ledgerReader, err := io.NewDBLedgerReader(s.ctx, ledger, s.ledgerBackend) + transactionReader, err := io.NewLedgerTransactionReader(s.ledgerBackend, ledger) if err != nil { return ledgerTransactionStats.GetResults(), errors.Wrap(err, "Error creating ledger reader") } - txProcessor := s.buildTransactionProcessor(&ledgerTransactionStats, ledgerReader.GetHeader()) - err = io.StreamLedgerTransactions(txProcessor, ledgerReader) + txProcessor := s.buildTransactionProcessor(&ledgerTransactionStats, transactionReader.GetHeader()) + err = io.StreamLedgerTransactions(txProcessor, transactionReader) if err != nil { return ledgerTransactionStats.GetResults(), errors.Wrap(err, "Error streaming changes from ledger") }