Skip to content

Commit

Permalink
Log tx meta when ingestion failures occur (#5268)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored Mar 29, 2024
1 parent 2501c39 commit a30c441
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 86 deletions.
33 changes: 28 additions & 5 deletions ingest/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingest

import (
"bytes"
"fmt"
"sort"

"github.com/stellar/go/support/errors"
Expand All @@ -21,7 +22,29 @@ type Change struct {
Post *xdr.LedgerEntry
}

func (c *Change) ledgerKey() (xdr.LedgerKey, error) {
// String returns a best effort string representation of the change.
// If the Pre or Post xdr is invalid, the field will be omitted from the string.
func (c Change) String() string {
var pre, post string
if c.Pre != nil {
if b64, err := xdr.MarshalBase64(c.Pre); err == nil {
pre = b64
}
}
if c.Post != nil {
if b64, err := xdr.MarshalBase64(c.Post); err == nil {
post = b64
}
}
return fmt.Sprintf(
"Change{Type: %s, Pre: %s, Post: %s}",
c.Type.String(),
pre,
post,
)
}

func (c Change) ledgerKey() (xdr.LedgerKey, error) {
if c.Pre != nil {
return c.Pre.LedgerKey()
}
Expand Down Expand Up @@ -124,7 +147,7 @@ func sortChanges(changes []Change) {
}

// LedgerEntryChangeType returns type in terms of LedgerEntryChangeType.
func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType {
func (c Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType {
switch {
case c.Pre == nil && c.Post != nil:
return xdr.LedgerEntryChangeTypeLedgerEntryCreated
Expand All @@ -138,7 +161,7 @@ func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType {
}

// getLiquidityPool gets the most recent state of the LiquidityPool that exists or existed.
func (c *Change) getLiquidityPool() (*xdr.LiquidityPoolEntry, error) {
func (c Change) getLiquidityPool() (*xdr.LiquidityPoolEntry, error) {
var entry *xdr.LiquidityPoolEntry
if c.Pre != nil {
entry = c.Pre.Data.LiquidityPool
Expand All @@ -153,7 +176,7 @@ func (c *Change) getLiquidityPool() (*xdr.LiquidityPoolEntry, error) {
}

// GetLiquidityPoolType returns the liquidity pool type.
func (c *Change) GetLiquidityPoolType() (xdr.LiquidityPoolType, error) {
func (c Change) GetLiquidityPoolType() (xdr.LiquidityPoolType, error) {
lp, err := c.getLiquidityPool()
if err != nil {
return xdr.LiquidityPoolType(0), err
Expand All @@ -164,7 +187,7 @@ func (c *Change) GetLiquidityPoolType() (xdr.LiquidityPoolType, error) {
// AccountChangedExceptSigners returns true if account has changed WITHOUT
// checking the signers (except master key weight!). In other words, if the only
// change is connected to signers, this function will return false.
func (c *Change) AccountChangedExceptSigners() (bool, error) {
func (c Change) AccountChangedExceptSigners() (bool, error) {
if c.Type != xdr.LedgerEntryTypeAccount {
panic("This should not be called on changes other than Account changes")
}
Expand Down
85 changes: 85 additions & 0 deletions services/horizon/internal/ingest/change_processors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package ingest

import (
"context"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/ingest/processors"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

func TestStreamReaderError(t *testing.T) {
tt := assert.New(t)
ctx := context.Background()

mockChangeReader := &ingest.MockChangeReader{}
mockChangeReader.
On("Read").
Return(ingest.Change{}, errors.New("transient error")).Once()
mockChangeProcessor := &processors.MockChangeProcessor{}

err := streamChanges(ctx, mockChangeProcessor, 1, mockChangeReader)
tt.EqualError(err, "could not read transaction: transient error")
}

func TestStreamChangeProcessorError(t *testing.T) {
tt := assert.New(t)
ctx := context.Background()

change := ingest.Change{
Type: xdr.LedgerEntryTypeAccount,
Pre: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 10,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
},
Post: &xdr.LedgerEntry{
LastModifiedLedgerSeq: 11,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.AccountEntry{
AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
Balance: 200,
},
},
},
}
mockChangeReader := &ingest.MockChangeReader{}
mockChangeReader.
On("Read").
Return(change, nil).Once()

mockChangeProcessor := &processors.MockChangeProcessor{}
mockChangeProcessor.
On(
"ProcessChange", ctx,
change,
).
Return(errors.New("transient error")).Once()

logsGet := log.StartTest(logrus.ErrorLevel)
err := streamChanges(ctx, mockChangeProcessor, 1, mockChangeReader)
tt.EqualError(err, "could not process change: transient error")
logs := logsGet()
line, err := logs[0].String()
tt.NoError(err)

preB64, err := xdr.MarshalBase64(change.Pre)
tt.NoError(err)
postB64, err := xdr.MarshalBase64(change.Post)
tt.NoError(err)
expectedTokens := []string{"LedgerEntryTypeAccount", preB64, postB64}

for _, token := range expectedTokens {
tt.Contains(line, token)
}
}
35 changes: 33 additions & 2 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"time"

"github.com/stellar/go/ingest"
Expand Down Expand Up @@ -272,7 +273,7 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
log.WithField("sequence", checkpointLedger).
Info("Processing entries from History Archive Snapshot")

err = processors.StreamChanges(s.ctx, changeProcessor, newloggingChangeReader(
err = streamChanges(s.ctx, changeProcessor, checkpointLedger, newloggingChangeReader(
changeReader,
"historyArchive",
checkpointLedger,
Expand Down Expand Up @@ -301,7 +302,7 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger(
return errors.Wrap(err, "Error creating ledger change reader")
}
changeReader = ingest.NewCompactingChangeReader(changeReader)
if err = processors.StreamChanges(s.ctx, changeProcessor, changeReader); err != nil {
if err = streamChanges(s.ctx, changeProcessor, ledger.LedgerSequence(), changeReader); err != nil {
return errors.Wrap(err, "Error streaming changes from ledger")

}
Expand All @@ -314,6 +315,36 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger(
return nil
}

func streamChanges(
ctx context.Context,
changeProcessor processors.ChangeProcessor,
ledger uint32,
reader ingest.ChangeReader,
) error {

for {
change, err := reader.Read()
if err == io.EOF {
return nil
}
if err != nil {
return errors.Wrap(err, "could not read transaction")
}

if err = changeProcessor.ProcessChange(ctx, change); err != nil {
if !isCancelledError(ctx, err) {
log.WithError(err).WithField("sequence", ledger).WithField(
"change", change.String(),
).Error("error processing change")
}
return errors.Wrap(
err,
"could not process change",
)
}
}
}

func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta,
groupFilterers *groupTransactionFilterers,
groupFilteredOutProcessors *groupTransactionProcessors,
Expand Down
32 changes: 0 additions & 32 deletions services/horizon/internal/ingest/processors/change_processors.go

This file was deleted.

This file was deleted.

0 comments on commit a30c441

Please sign in to comment.