Skip to content

Commit

Permalink
exp/ingest: Fix fee and transaction meta processing (#2050)
Browse files Browse the repository at this point in the history
This commit changes `io.LedgerTransaction` to return fee and tx meta
separately and updates Horizon processors to apply changes in correct
order. This issue was found by `StateVerifier`.

The order of applying meta changes in Horizon processors was incorrect.
Fee changes must be applied before everything else [1]. In other words
instead of processing meta like:

TX1_FEE_META, TX1_TX_META, TX2_FEE_META, TX2_TX_META, ...

we should do it like:

TX1_FEE_META, TX2_FEE_META, TX1_TX_META, TX2_TX_META, ...

### Known limitations

The current interface of pipeline processor doesn't make sense because
all transactions need to be read into memory first to apply fee changes.
We either need to refactor the processors or use @tamirms design where
pipeline is removed.

[1] https://github.com/stellar/stellar-core/blob/master/docs/integration.md
  • Loading branch information
bartekn authored Dec 12, 2019
1 parent 76835eb commit fb5f489
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 19 deletions.
12 changes: 8 additions & 4 deletions exp/ingest/io/ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,16 @@ func (c *Change) AccountSignersChanged() bool {
return false
}

// GetFeeChanges returns a developer friendly representation of LedgerEntryChanges
// connected to fees.
func (t *LedgerTransaction) GetFeeChanges() []Change {
return getChangesFromLedgerEntryChanges(t.FeeChanges)
}

// GetChanges returns a developer friendly representation of LedgerEntryChanges.
// It contains fee changes, transaction changes and operation changes in that
// order.
// It contains transaction changes and operation changes in that order.
func (t *LedgerTransaction) GetChanges() []Change {
// Fee meta
changes := getChangesFromLedgerEntryChanges(t.FeeChanges)
var changes []Change

// Transaction meta
switch t.Meta.V {
Expand Down
72 changes: 72 additions & 0 deletions exp/ingest/io/ledger_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,78 @@ func TestChangeAccountChangedExceptSignersInvalidType(t *testing.T) {
})
}

func TestFeeAndMetaChangesSeparate(t *testing.T) {
tx := LedgerTransaction{
FeeChanges: xdr.LedgerEntryChanges{
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryState,
State: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 100,
},
},
},
},
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
Updated: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 200,
},
},
},
},
},
Meta: xdr.TransactionMeta{
Operations: &[]xdr.OperationMeta{
{
Changes: xdr.LedgerEntryChanges{
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryState,
State: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 300,
},
},
},
},
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
Updated: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 400,
},
},
},
},
},
},
},
}}

feeChanges := tx.GetFeeChanges()
assert.Len(t, feeChanges, 1)
assert.Equal(t, feeChanges[0].Pre.Data.MustAccount().Balance, xdr.Int64(100))
assert.Equal(t, feeChanges[0].Post.Data.MustAccount().Balance, xdr.Int64(200))

metaChanges := tx.GetChanges()
assert.Len(t, metaChanges, 1)
assert.Equal(t, metaChanges[0].Pre.Data.MustAccount().Balance, xdr.Int64(300))
assert.Equal(t, metaChanges[0].Post.Data.MustAccount().Balance, xdr.Int64(400))
}

func TestChangeAccountChangedExceptSignersLastModifiedLedgerSeq(t *testing.T) {
change := Change{
Type: xdr.LedgerEntryTypeAccount,
Expand Down
4 changes: 3 additions & 1 deletion services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ const (
// when preauth tx is failed.
// - 9: Fixes a bug in asset stats processor that counted unauthorized
// trustlines.
CurrentVersion = 9
// - 10: Fixes a bug in meta processing (fees are now processed before
// everything else).
CurrentVersion = 10
)

var log = logpkg.DefaultLogger.WithField("service", "expingest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,93 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() {

s.Assert().NoError(err)
}

func (s *AccountsProcessorTestSuiteLedger) TestFeeProcessedBeforeEverythingElse() {
s.mockLedgerReader.On("Read").
Return(io.LedgerTransaction{
Meta: createTransactionMeta([]xdr.OperationMeta{
xdr.OperationMeta{
Changes: []xdr.LedgerEntryChange{
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryState,
State: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 100,
},
},
},
},
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
Updated: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 300,
},
},
},
},
},
},
}),
}, nil).Once()

s.mockLedgerReader.On("Read").
Return(io.LedgerTransaction{
FeeChanges: []xdr.LedgerEntryChange{
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryState,
State: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 200,
},
},
},
},
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
Updated: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 100,
},
},
},
},
},
Meta: xdr.TransactionMeta{
Operations: &[]xdr.OperationMeta{},
},
}, nil).Once()

expectedAccount := xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
// If fee meta wasn't procesed before everything else, this would be 100
Balance: 300,
}

s.mockQ.On("UpdateAccount", expectedAccount, xdr.Uint32(0)).Return(int64(1), nil).Once()

s.mockLedgerReader.
On("Read").
Return(io.LedgerTransaction{}, stdio.EOF).Once()

err := s.processor.ProcessLedger(
context.Background(),
&supportPipeline.Store{},
s.mockLedgerReader,
s.mockLedgerWriter,
)

s.Assert().NoError(err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S

var successTxCount, failedTxCount, opCount int

// Process transaction meta
// Get all transactions
var transactions []io.LedgerTransaction
for {
transaction, err := r.Read()
if err != nil {
Expand All @@ -232,27 +233,49 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S
failedTxCount++
}

transactions = append(transactions, transaction)
}

if p.Action != Ledgers {
// Remember that it's possible that transaction can remove a preauth
// tx signer even when it's a failed transaction.
if p.Action != Ledgers {
for _, change := range transaction.GetChanges() {
// tx signer even when it's a failed transaction so we need to check
// failed transactions too.

// Fees are processed before everything else.
for _, transaction := range transactions {
for _, change := range transaction.GetFeeChanges() {
err := ledgerCache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error addint to ledgerCache")
return errors.Wrap(err, "error adding to ledgerCache")
}
}

select {
case <-ctx.Done():
return nil
default:
continue
}
}

select {
case <-ctx.Done():
return nil
default:
continue
// Tx meta
for _, transaction := range transactions {
for _, change := range transaction.GetChanges() {
err := ledgerCache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
}

select {
case <-ctx.Done():
return nil
default:
continue
}
}
}

// Process upgrades meta
if p.Action != Ledgers {
// Process upgrades meta
for {
change, err := r.ReadUpgradeChange()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/expingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const assetStatsBatchSize = 500
// check them.
// There is a test that checks it, to fix it: update the actual `verifyState`
// method instead of just updating this value!
const stateVerifierExpectedIngestionVersion = 9
const stateVerifierExpectedIngestionVersion = 10

// verifyState is called as a go routine from pipeline post hook every 64
// ledgers. It checks if the state is correct. If another go routine is already
Expand Down

0 comments on commit fb5f489

Please sign in to comment.