Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/ingest: Fix fee and transaction meta processing #2050

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions exp/ingest/io/ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,16 @@ func (c *Change) AccountSignersChanged() bool {
return false
}

// GetFeeChanges returns a developer friendly representation of LedgerEntryChanges
// connected to fees.
func (t *LedgerTransaction) GetFeeChanges() []Change {
return getChangesFromLedgerEntryChanges(t.FeeChanges)
}

// GetChanges returns a developer friendly representation of LedgerEntryChanges.
// It contains fee changes, transaction changes and operation changes in that
// order.
// It contains transaction changes and operation changes in that order.
func (t *LedgerTransaction) GetChanges() []Change {
// Fee meta
changes := getChangesFromLedgerEntryChanges(t.FeeChanges)
var changes []Change

// Transaction meta
switch t.Meta.V {
Expand Down
72 changes: 72 additions & 0 deletions exp/ingest/io/ledger_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,78 @@ func TestChangeAccountChangedExceptSignersInvalidType(t *testing.T) {
})
}

func TestFeeAndMetaChangesSeparate(t *testing.T) {
tx := LedgerTransaction{
FeeChanges: xdr.LedgerEntryChanges{
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryState,
State: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 100,
},
},
},
},
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
Updated: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 200,
},
},
},
},
},
Meta: xdr.TransactionMeta{
Operations: &[]xdr.OperationMeta{
{
Changes: xdr.LedgerEntryChanges{
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryState,
State: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 300,
},
},
},
},
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
Updated: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 400,
},
},
},
},
},
},
},
}}

feeChanges := tx.GetFeeChanges()
assert.Len(t, feeChanges, 1)
assert.Equal(t, feeChanges[0].Pre.Data.MustAccount().Balance, xdr.Int64(100))
assert.Equal(t, feeChanges[0].Post.Data.MustAccount().Balance, xdr.Int64(200))

metaChanges := tx.GetChanges()
assert.Len(t, metaChanges, 1)
assert.Equal(t, metaChanges[0].Pre.Data.MustAccount().Balance, xdr.Int64(300))
assert.Equal(t, metaChanges[0].Post.Data.MustAccount().Balance, xdr.Int64(400))
}

func TestChangeAccountChangedExceptSignersLastModifiedLedgerSeq(t *testing.T) {
change := Change{
Type: xdr.LedgerEntryTypeAccount,
Expand Down
4 changes: 3 additions & 1 deletion services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ const (
// when preauth tx is failed.
// - 9: Fixes a bug in asset stats processor that counted unauthorized
// trustlines.
CurrentVersion = 9
// - 9: Fixes a bug in meta processing (fees are now processed before
bartekn marked this conversation as resolved.
Show resolved Hide resolved
// everything else).
CurrentVersion = 10
)

var log = logpkg.DefaultLogger.WithField("service", "expingest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,93 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() {

s.Assert().NoError(err)
}

func (s *AccountsProcessorTestSuiteLedger) TestFeeProcessedBeforeEverythingElse() {
s.mockLedgerReader.On("Read").
Return(io.LedgerTransaction{
Meta: createTransactionMeta([]xdr.OperationMeta{
xdr.OperationMeta{
Changes: []xdr.LedgerEntryChange{
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryState,
State: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 100,
},
},
},
},
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
Updated: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 300,
},
},
},
},
},
},
}),
}, nil).Once()

s.mockLedgerReader.On("Read").
Return(io.LedgerTransaction{
FeeChanges: []xdr.LedgerEntryChange{
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryState,
State: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 200,
},
},
},
},
xdr.LedgerEntryChange{
Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
Updated: &xdr.LedgerEntry{
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
Balance: 100,
},
},
},
},
},
Meta: xdr.TransactionMeta{
Operations: &[]xdr.OperationMeta{},
},
}, nil).Once()

expectedAccount := xdr.AccountEntry{
AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"),
// If fee meta wasn't procesed before everything else, this would be 100
Balance: 300,
}

s.mockQ.On("UpdateAccount", expectedAccount, xdr.Uint32(0)).Return(int64(1), nil).Once()

s.mockLedgerReader.
On("Read").
Return(io.LedgerTransaction{}, stdio.EOF).Once()

err := s.processor.ProcessLedger(
context.Background(),
&supportPipeline.Store{},
s.mockLedgerReader,
s.mockLedgerWriter,
)

s.Assert().NoError(err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S

var successTxCount, failedTxCount, opCount int

// Process transaction meta
// Get all transactions
var transactions []io.LedgerTransaction
for {
transaction, err := r.Read()
if err != nil {
Expand All @@ -232,27 +233,49 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S
failedTxCount++
}

transactions = append(transactions, transaction)
}

if p.Action != Ledgers {
// Remember that it's possible that transaction can remove a preauth
// tx signer even when it's a failed transaction.
if p.Action != Ledgers {
for _, change := range transaction.GetChanges() {
// tx signer even when it's a failed transaction so we need to check
// failed transactions too.

// Fees are processed before everything else.
for _, transaction := range transactions {
for _, change := range transaction.GetFeeChanges() {
err := ledgerCache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error addint to ledgerCache")
return errors.Wrap(err, "error adding to ledgerCache")
}
}

select {
case <-ctx.Done():
return nil
default:
continue
}
}

select {
case <-ctx.Done():
return nil
default:
continue
// Tx meta
for _, transaction := range transactions {
for _, change := range transaction.GetChanges() {
err := ledgerCache.AddChange(change)
if err != nil {
return errors.Wrap(err, "error adding to ledgerCache")
}
}

select {
case <-ctx.Done():
return nil
default:
continue
}
}
}

// Process upgrades meta
if p.Action != Ledgers {
// Process upgrades meta
for {
change, err := r.ReadUpgradeChange()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/expingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const assetStatsBatchSize = 500
// check them.
// There is a test that checks it, to fix it: update the actual `verifyState`
// method instead of just updating this value!
const stateVerifierExpectedIngestionVersion = 9
const stateVerifierExpectedIngestionVersion = 10

// verifyState is called as a go routine from pipeline post hook every 64
// ledgers. It checks if the state is correct. If another go routine is already
Expand Down