diff --git a/ingest/change.go b/ingest/change.go index 8d435f4229..0a2c063f1c 100644 --- a/ingest/change.go +++ b/ingest/change.go @@ -2,6 +2,7 @@ package ingest import ( "bytes" + "fmt" "sort" "github.com/stellar/go/support/errors" @@ -21,7 +22,29 @@ type Change struct { Post *xdr.LedgerEntry } -func (c *Change) ledgerKey() (xdr.LedgerKey, error) { +// String returns a best effort string representation of the change. +// If the Pre or Post xdr is invalid, the field will be omitted from the string. +func (c Change) String() string { + var pre, post string + if c.Pre != nil { + if b64, err := xdr.MarshalBase64(c.Pre); err == nil { + pre = b64 + } + } + if c.Post != nil { + if b64, err := xdr.MarshalBase64(c.Post); err == nil { + post = b64 + } + } + return fmt.Sprintf( + "Change{Type: %s, Pre: %s, Post: %s}", + c.Type.String(), + pre, + post, + ) +} + +func (c Change) ledgerKey() (xdr.LedgerKey, error) { if c.Pre != nil { return c.Pre.LedgerKey() } @@ -124,7 +147,7 @@ func sortChanges(changes []Change) { } // LedgerEntryChangeType returns type in terms of LedgerEntryChangeType. -func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { +func (c Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { switch { case c.Pre == nil && c.Post != nil: return xdr.LedgerEntryChangeTypeLedgerEntryCreated @@ -138,7 +161,7 @@ func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { } // getLiquidityPool gets the most recent state of the LiquidityPool that exists or existed. -func (c *Change) getLiquidityPool() (*xdr.LiquidityPoolEntry, error) { +func (c Change) getLiquidityPool() (*xdr.LiquidityPoolEntry, error) { var entry *xdr.LiquidityPoolEntry if c.Pre != nil { entry = c.Pre.Data.LiquidityPool @@ -153,7 +176,7 @@ func (c *Change) getLiquidityPool() (*xdr.LiquidityPoolEntry, error) { } // GetLiquidityPoolType returns the liquidity pool type. -func (c *Change) GetLiquidityPoolType() (xdr.LiquidityPoolType, error) { +func (c Change) GetLiquidityPoolType() (xdr.LiquidityPoolType, error) { lp, err := c.getLiquidityPool() if err != nil { return xdr.LiquidityPoolType(0), err @@ -164,7 +187,7 @@ func (c *Change) GetLiquidityPoolType() (xdr.LiquidityPoolType, error) { // AccountChangedExceptSigners returns true if account has changed WITHOUT // checking the signers (except master key weight!). In other words, if the only // change is connected to signers, this function will return false. -func (c *Change) AccountChangedExceptSigners() (bool, error) { +func (c Change) AccountChangedExceptSigners() (bool, error) { if c.Type != xdr.LedgerEntryTypeAccount { panic("This should not be called on changes other than Account changes") } diff --git a/services/horizon/internal/ingest/change_processors_test.go b/services/horizon/internal/ingest/change_processors_test.go new file mode 100644 index 0000000000..b6a8251b71 --- /dev/null +++ b/services/horizon/internal/ingest/change_processors_test.go @@ -0,0 +1,85 @@ +package ingest + +import ( + "context" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/ingest" + "github.com/stellar/go/services/horizon/internal/ingest/processors" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +func TestStreamReaderError(t *testing.T) { + tt := assert.New(t) + ctx := context.Background() + + mockChangeReader := &ingest.MockChangeReader{} + mockChangeReader. + On("Read"). + Return(ingest.Change{}, errors.New("transient error")).Once() + mockChangeProcessor := &processors.MockChangeProcessor{} + + err := streamChanges(ctx, mockChangeProcessor, 1, mockChangeReader) + tt.EqualError(err, "could not read transaction: transient error") +} + +func TestStreamChangeProcessorError(t *testing.T) { + tt := assert.New(t) + ctx := context.Background() + + change := ingest.Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: 200, + }, + }, + }, + } + mockChangeReader := &ingest.MockChangeReader{} + mockChangeReader. + On("Read"). + Return(change, nil).Once() + + mockChangeProcessor := &processors.MockChangeProcessor{} + mockChangeProcessor. + On( + "ProcessChange", ctx, + change, + ). + Return(errors.New("transient error")).Once() + + logsGet := log.StartTest(logrus.ErrorLevel) + err := streamChanges(ctx, mockChangeProcessor, 1, mockChangeReader) + tt.EqualError(err, "could not process change: transient error") + logs := logsGet() + line, err := logs[0].String() + tt.NoError(err) + + preB64, err := xdr.MarshalBase64(change.Pre) + tt.NoError(err) + postB64, err := xdr.MarshalBase64(change.Post) + tt.NoError(err) + expectedTokens := []string{"LedgerEntryTypeAccount", preB64, postB64} + + for _, token := range expectedTokens { + tt.Contains(line, token) + } +} diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index e26db5fe31..b98896fe1f 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "time" "github.com/stellar/go/ingest" @@ -272,7 +273,7 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion( log.WithField("sequence", checkpointLedger). Info("Processing entries from History Archive Snapshot") - err = processors.StreamChanges(s.ctx, changeProcessor, newloggingChangeReader( + err = streamChanges(s.ctx, changeProcessor, checkpointLedger, newloggingChangeReader( changeReader, "historyArchive", checkpointLedger, @@ -301,7 +302,7 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( return errors.Wrap(err, "Error creating ledger change reader") } changeReader = ingest.NewCompactingChangeReader(changeReader) - if err = processors.StreamChanges(s.ctx, changeProcessor, changeReader); err != nil { + if err = streamChanges(s.ctx, changeProcessor, ledger.LedgerSequence(), changeReader); err != nil { return errors.Wrap(err, "Error streaming changes from ledger") } @@ -314,6 +315,36 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( return nil } +func streamChanges( + ctx context.Context, + changeProcessor processors.ChangeProcessor, + ledger uint32, + reader ingest.ChangeReader, +) error { + + for { + change, err := reader.Read() + if err == io.EOF { + return nil + } + if err != nil { + return errors.Wrap(err, "could not read transaction") + } + + if err = changeProcessor.ProcessChange(ctx, change); err != nil { + if !isCancelledError(ctx, err) { + log.WithError(err).WithField("sequence", ledger).WithField( + "change", change.String(), + ).Error("error processing change") + } + return errors.Wrap( + err, + "could not process change", + ) + } + } +} + func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta, groupFilterers *groupTransactionFilterers, groupFilteredOutProcessors *groupTransactionProcessors, diff --git a/services/horizon/internal/ingest/processors/change_processors.go b/services/horizon/internal/ingest/processors/change_processors.go deleted file mode 100644 index ee9eb127f1..0000000000 --- a/services/horizon/internal/ingest/processors/change_processors.go +++ /dev/null @@ -1,32 +0,0 @@ -package processors - -import ( - "context" - "io" - - "github.com/stellar/go/ingest" - "github.com/stellar/go/support/errors" -) - -func StreamChanges( - ctx context.Context, - changeProcessor ChangeProcessor, - reader ingest.ChangeReader, -) error { - for { - change, err := reader.Read() - if err == io.EOF { - return nil - } - if err != nil { - return errors.Wrap(err, "could not read transaction") - } - - if err = changeProcessor.ProcessChange(ctx, change); err != nil { - return errors.Wrap( - err, - "could not process change", - ) - } - } -} diff --git a/services/horizon/internal/ingest/processors/change_processors_test.go b/services/horizon/internal/ingest/processors/change_processors_test.go deleted file mode 100644 index 829552543f..0000000000 --- a/services/horizon/internal/ingest/processors/change_processors_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package processors - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/stellar/go/ingest" - "github.com/stellar/go/support/errors" -) - -func TestStreamReaderError(t *testing.T) { - tt := assert.New(t) - ctx := context.Background() - - mockChangeReader := &ingest.MockChangeReader{} - mockChangeReader. - On("Read"). - Return(ingest.Change{}, errors.New("transient error")).Once() - mockChangeProcessor := &MockChangeProcessor{} - - err := StreamChanges(ctx, mockChangeProcessor, mockChangeReader) - tt.EqualError(err, "could not read transaction: transient error") -} - -func TestStreamChangeProcessorError(t *testing.T) { - tt := assert.New(t) - ctx := context.Background() - - change := ingest.Change{} - mockChangeReader := &ingest.MockChangeReader{} - mockChangeReader. - On("Read"). - Return(change, nil).Once() - - mockChangeProcessor := &MockChangeProcessor{} - mockChangeProcessor. - On( - "ProcessChange", ctx, - change, - ). - Return(errors.New("transient error")).Once() - - err := StreamChanges(ctx, mockChangeProcessor, mockChangeReader) - tt.EqualError(err, "could not process change: transient error") -}