diff --git a/exp/ingest/errors/main.go b/exp/ingest/errors/main.go new file mode 100644 index 0000000000..8d41f1538b --- /dev/null +++ b/exp/ingest/errors/main.go @@ -0,0 +1,13 @@ +package errors + +// StateError are errors indicating invalid state. Type is used to differentiate +// between network, i/o, marshaling, bad usage etc. errors and actual state errors. +// You can use type assertion or type switch to check for type. +type StateError struct { + error +} + +// NewStateError creates a new StateError. +func NewStateError(err error) StateError { + return StateError{err} +} diff --git a/exp/ingest/io/ledger_entry_change_cache.go b/exp/ingest/io/ledger_entry_change_cache.go new file mode 100644 index 0000000000..8ce5963791 --- /dev/null +++ b/exp/ingest/io/ledger_entry_change_cache.go @@ -0,0 +1,217 @@ +package io + +import ( + "sync" + + ingesterrors "github.com/stellar/go/exp/ingest/errors" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +// LedgerEntryChangeCache is a cache of ledger entry changes that squashes all +// changes within a single ledger. By doing this, it decreases number of DB +// queries sent to a DB to update the current state of the ledger. +// It has integrity checks built in so ex. removing an account that was +// previously removed returns an error. In such case verify.StateError is +// returned. +// +// It applies changes to the cache using the following algorithm: +// +// 1. If the change is CREATED it checks if any change connected to given entry +// is already in the cache. If not, it adds CREATED change. Otherwise, if +// existing change is: +// a. CREATED it returns error because we can't add an entry that already +// exists. +// b. UPDATED it returns error because we can't add an entry that already +// exists. +// c. REMOVED it means that due to previous transitions we want to remove +// this from a DB what means that it already exists in a DB so we need to +// update the type of change to UPDATED. +// 2. If the change is UPDATE it checks if any change connected to given entry +// is already in the cache. If not, it adds UPDATE change. Otherwise, if +// existing change is: +// a. CREATED it means that due to previous transitions we want to create +// this in a DB what means that it doesn't exist in a DB so we need to +// update the entry but stay with CREATED type. +// b. UPDATED we simply update it with the new value. +// c. REMOVED it means that at this point in the ledger the entry is removed +// so updating it returns an error. +// 3. If the change is REMOVE it checks if any change connected to given entry +// is already in the cache. If not, it adds REMOVE change. Otherwise, if +// existing change is: +// a. CREATED it means that due to previous transitions we want to create +// this in a DB what means that it doesn't exist in a DB. If it was +// created and removed in the same ledger it's a noop so we remove entry +// from the cache. +// b. UPDATED we simply update it to be a REMOVE change because the UPDATE +// change means the entry exists in a DB. +// c. REMOVED it returns error because we can't remove an entry that was +// already removed. +type LedgerEntryChangeCache struct { + // ledger key => Change + cache map[string]Change + mutex sync.Mutex +} + +// NewLedgerEntryChangeCache returns a new LedgerEntryChangeCache. +func NewLedgerEntryChangeCache() *LedgerEntryChangeCache { + return &LedgerEntryChangeCache{ + cache: make(map[string]Change), + } +} + +// AddChange adds a change to LedgerEntryChangeCache. All changes are stored +// in memory. To get the final, squashed changes call GetChanges. +// +// Please note that the current ledger capacity in pubnet (max 1000 ops/ledger) +// makes LedgerEntryChangeCache safe to use in terms of memory usage. If the +// cache takes too much memory, you apply changes returned by GetChanges and +// create a new LedgerEntryChangeCache object to continue ingestion. +func (c *LedgerEntryChangeCache) AddChange(change Change) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + switch { + case change.Pre == nil && change.Post != nil: + return c.addCreatedChange(change) + case change.Pre != nil && change.Post != nil: + return c.addUpdatedChange(change) + case change.Pre != nil && change.Post == nil: + return c.addRemovedChange(change) + default: + return errors.New("Unknown entry change state") + } +} + +// addCreatedChange adds a change to the cache, but returns an error if create +// change is unexpected. +func (c *LedgerEntryChangeCache) addCreatedChange(change Change) error { + ledgerKeyString, err := change.Post.LedgerKey().MarshalBinaryBase64() + if err != nil { + return errors.Wrap(err, "Error MarshalBinaryBase64") + } + + existingChange, exist := c.cache[ledgerKeyString] + if !exist { + c.cache[ledgerKeyString] = change + return nil + } + + switch existingChange.LedgerEntryChangeType() { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + return ingesterrors.NewStateError(errors.Errorf( + "can't create an entry that already exists (ledger key = %s)", + ledgerKeyString, + )) + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + return ingesterrors.NewStateError(errors.Errorf( + "can't create an entry that already exists (ledger key = %s)", + ledgerKeyString, + )) + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + // If existing type is removed it means that this entry does exist + // in a DB so we update entry change. + c.cache[ledgerKeyString] = Change{ + Type: change.Post.LedgerKey().Type, + Pre: existingChange.Pre, + Post: change.Post, + } + default: + return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType()) + } + + return nil +} + +// addUpdatedChange adds a change to the cache, but returns an error if update +// change is unexpected. +func (c *LedgerEntryChangeCache) addUpdatedChange(change Change) error { + ledgerKeyString, err := change.Post.LedgerKey().MarshalBinaryBase64() + if err != nil { + return errors.Wrap(err, "Error MarshalBinaryBase64") + } + + existingChange, exist := c.cache[ledgerKeyString] + if !exist { + c.cache[ledgerKeyString] = change + return nil + } + + switch existingChange.LedgerEntryChangeType() { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + // If existing type is created it means that this entry does not + // exist in a DB so we update entry change. + c.cache[ledgerKeyString] = Change{ + Type: change.Post.LedgerKey().Type, + Pre: existingChange.Pre, // = nil + Post: change.Post, + } + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + c.cache[ledgerKeyString] = Change{ + Type: change.Post.LedgerKey().Type, + Pre: existingChange.Pre, + Post: change.Post, + } + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + return ingesterrors.NewStateError(errors.Errorf( + "can't update an entry that was previously removed (ledger key = %s)", + ledgerKeyString, + )) + default: + return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type) + } + + return nil +} + +// addRemovedChange adds a change to the cache, but returns an error if remove +// change is unexpected. +func (c *LedgerEntryChangeCache) addRemovedChange(change Change) error { + ledgerKeyString, err := change.Pre.LedgerKey().MarshalBinaryBase64() + if err != nil { + return errors.Wrap(err, "Error MarshalBinaryBase64") + } + + existingChange, exist := c.cache[ledgerKeyString] + if !exist { + c.cache[ledgerKeyString] = change + return nil + } + + switch existingChange.LedgerEntryChangeType() { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + // If existing type is created it means that this will be no op. + // Entry was created and is now removed in a single ledger. + delete(c.cache, ledgerKeyString) + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + c.cache[ledgerKeyString] = Change{ + Type: change.Pre.LedgerKey().Type, + Pre: existingChange.Pre, + Post: nil, + } + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + return ingesterrors.NewStateError(errors.Errorf( + "can't remove an entry that was previously removed (ledger key = %s)", + ledgerKeyString, + )) + default: + return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type) + } + + return nil +} + +// GetChanges returns a slice of Changes in the cache. The order of changes is +// random but each change is connected to a separate entry. +func (c *LedgerEntryChangeCache) GetChanges() []Change { + c.mutex.Lock() + defer c.mutex.Unlock() + + changes := make([]Change, 0, len(c.cache)) + + for _, entryChange := range c.cache { + changes = append(changes, entryChange) + } + + return changes +} diff --git a/exp/ingest/io/ledger_entry_change_cache_test.go b/exp/ingest/io/ledger_entry_change_cache_test.go new file mode 100644 index 0000000000..c2132655f2 --- /dev/null +++ b/exp/ingest/io/ledger_entry_change_cache_test.go @@ -0,0 +1,397 @@ +package io + +import ( + "testing" + + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +func TestLedgerEntryChangeCacheExistingCreated(t *testing.T) { + suite.Run(t, new(TestLedgerEntryChangeCacheExistingCreatedSuite)) +} + +// TestLedgerEntryChangeCacheExistingCreatedSuite tests transitions from +// existing CREATED state in the cache. +type TestLedgerEntryChangeCacheExistingCreatedSuite struct { + suite.Suite + cache *LedgerEntryChangeCache +} + +func (s *TestLedgerEntryChangeCacheExistingCreatedSuite) SetupTest() { + s.cache = NewLedgerEntryChangeCache() + + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryCreated) +} + +func (s *TestLedgerEntryChangeCacheExistingCreatedSuite) TestChangeCreated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().EqualError( + s.cache.AddChange(change), + "can't create an entry that already exists (ledger key = AAAAAAAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWg==)", + ) +} + +func (s *TestLedgerEntryChangeCacheExistingCreatedSuite) TestChangeUpdated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryCreated) +} + +func (s *TestLedgerEntryChangeCacheExistingCreatedSuite) TestChangeRemoved() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: nil, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 0) +} + +func TestLedgerEntryChangeCacheExistingUpdated(t *testing.T) { + suite.Run(t, new(TestLedgerEntryChangeCacheExistingUpdatedSuite)) +} + +// TestLedgerEntryChangeCacheExistingUpdatedSuite tests transitions from +// existing UPDATED state in the cache. +type TestLedgerEntryChangeCacheExistingUpdatedSuite struct { + suite.Suite + cache *LedgerEntryChangeCache +} + +func (s *TestLedgerEntryChangeCacheExistingUpdatedSuite) SetupTest() { + s.cache = NewLedgerEntryChangeCache() + + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryUpdated) +} + +func (s *TestLedgerEntryChangeCacheExistingUpdatedSuite) TestChangeCreated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().EqualError( + s.cache.AddChange(change), + "can't create an entry that already exists (ledger key = AAAAAAAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWg==)", + ) +} + +func (s *TestLedgerEntryChangeCacheExistingUpdatedSuite) TestChangeUpdated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryUpdated) + s.Assert().Equal(changes[0].Post.LastModifiedLedgerSeq, xdr.Uint32(12)) +} + +func (s *TestLedgerEntryChangeCacheExistingUpdatedSuite) TestChangeRemoved() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: nil, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryRemoved) +} + +func TestLedgerEntryChangeCacheExistingRemoved(t *testing.T) { + suite.Run(t, new(TestLedgerEntryChangeCacheExistingRemovedSuite)) +} + +// TestLedgerEntryChangeCacheExistingRemovedSuite tests transitions from +// existing REMOVED state in the cache. +type TestLedgerEntryChangeCacheExistingRemovedSuite struct { + suite.Suite + cache *LedgerEntryChangeCache +} + +func (s *TestLedgerEntryChangeCacheExistingRemovedSuite) SetupTest() { + s.cache = NewLedgerEntryChangeCache() + + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: nil, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryRemoved) +} + +func (s *TestLedgerEntryChangeCacheExistingRemovedSuite) TestChangeCreated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryUpdated) + s.Assert().Equal(changes[0].Post.LastModifiedLedgerSeq, xdr.Uint32(12)) +} + +func (s *TestLedgerEntryChangeCacheExistingRemovedSuite) TestChangeUpdated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().EqualError( + s.cache.AddChange(change), + "can't update an entry that was previously removed (ledger key = AAAAAAAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWg==)", + ) +} + +func (s *TestLedgerEntryChangeCacheExistingRemovedSuite) TestChangeRemoved() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: nil, + } + s.Assert().EqualError( + s.cache.AddChange(change), + "can't remove an entry that was previously removed (ledger key = AAAAAAAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWg==)", + ) +} + +// TestLedgerEntryChangeCacheSquashMultiplePayments simulates sending multiple +// payments between two accounts. Ledger cache should squash multiple changes +// into just two. +// GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ sends money +// GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML receives money +func TestLedgerEntryChangeCacheSquashMultiplePayments(t *testing.T) { + cache := NewLedgerEntryChangeCache() + + for i := 1; i <= 1000; i++ { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ"), + Balance: xdr.Int64(2000 - i + 1), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ"), + Balance: xdr.Int64(2000 - i), + }, + }, + }, + } + assert.NoError(t, cache.AddChange(change)) + + change = Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: xdr.Int64(2000 + i - 1), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: xdr.Int64(2000 + i), + }, + }, + }, + } + assert.NoError(t, cache.AddChange(change)) + } + + changes := cache.GetChanges() + assert.Len(t, changes, 2) + for _, change := range changes { + assert.Equal(t, change.LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryUpdated) + account := change.Post.Data.MustAccount() + switch account.AccountId.Address() { + case "GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ": + assert.Equal(t, account.Balance, xdr.Int64(1000)) + case "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML": + assert.Equal(t, account.Balance, xdr.Int64(3000)) + default: + assert.Fail(t, "Invalid account") + } + } +} diff --git a/exp/ingest/io/ledger_transaction.go b/exp/ingest/io/ledger_transaction.go index 3af2f1346c..6719f2d323 100644 --- a/exp/ingest/io/ledger_transaction.go +++ b/exp/ingest/io/ledger_transaction.go @@ -20,6 +20,20 @@ type Change struct { Post *xdr.LedgerEntry } +// LedgerEntryChangeType returns type in terms of LedgerEntryChangeType. +func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { + switch { + case c.Pre == nil && c.Post != nil: + return xdr.LedgerEntryChangeTypeLedgerEntryCreated + case c.Pre != nil && c.Post == nil: + return xdr.LedgerEntryChangeTypeLedgerEntryRemoved + case c.Pre != nil && c.Post != nil: + return xdr.LedgerEntryChangeTypeLedgerEntryUpdated + default: + panic("Invalid state of Change (Pre == nil && Post == nil)") + } +} + // AccountChangedExceptSigners returns true if account has changed WITHOUT // checking the signers (except master key weight!). In other words, if the only // change is connected to signers, this function will return false. @@ -132,12 +146,16 @@ func (c *Change) AccountSignersChanged() bool { return false } +// GetFeeChanges returns a developer friendly representation of LedgerEntryChanges +// connected to fees. +func (t *LedgerTransaction) GetFeeChanges() []Change { + return getChangesFromLedgerEntryChanges(t.FeeChanges) +} + // GetChanges returns a developer friendly representation of LedgerEntryChanges. -// It contains fee changes, transaction changes and operation changes in that -// order. +// It contains transaction changes and operation changes in that order. func (t *LedgerTransaction) GetChanges() []Change { - // Fee meta - changes := getChangesFromLedgerEntryChanges(t.FeeChanges) + var changes []Change // Transaction meta switch t.Meta.V { diff --git a/exp/ingest/io/ledger_transaction_test.go b/exp/ingest/io/ledger_transaction_test.go index 0c2a312c49..fe80f02b27 100644 --- a/exp/ingest/io/ledger_transaction_test.go +++ b/exp/ingest/io/ledger_transaction_test.go @@ -17,6 +17,78 @@ func TestChangeAccountChangedExceptSignersInvalidType(t *testing.T) { }) } +func TestFeeAndMetaChangesSeparate(t *testing.T) { + tx := LedgerTransaction{ + FeeChanges: xdr.LedgerEntryChanges{ + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 100, + }, + }, + }, + }, + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 200, + }, + }, + }, + }, + }, + Meta: xdr.TransactionMeta{ + Operations: &[]xdr.OperationMeta{ + { + Changes: xdr.LedgerEntryChanges{ + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 300, + }, + }, + }, + }, + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 400, + }, + }, + }, + }, + }, + }, + }, + }} + + feeChanges := tx.GetFeeChanges() + assert.Len(t, feeChanges, 1) + assert.Equal(t, feeChanges[0].Pre.Data.MustAccount().Balance, xdr.Int64(100)) + assert.Equal(t, feeChanges[0].Post.Data.MustAccount().Balance, xdr.Int64(200)) + + metaChanges := tx.GetChanges() + assert.Len(t, metaChanges, 1) + assert.Equal(t, metaChanges[0].Pre.Data.MustAccount().Balance, xdr.Int64(300)) + assert.Equal(t, metaChanges[0].Post.Data.MustAccount().Balance, xdr.Int64(400)) +} + func TestChangeAccountChangedExceptSignersLastModifiedLedgerSeq(t *testing.T) { change := Change{ Type: xdr.LedgerEntryTypeAccount, diff --git a/exp/ingest/live_session.go b/exp/ingest/live_session.go index 7a68b3e40c..277d5c7148 100644 --- a/exp/ingest/live_session.go +++ b/exp/ingest/live_session.go @@ -17,6 +17,8 @@ var _ Session = &LiveSession{} const defaultCoreCursorName = "EXPINGESTLIVESESSION" +// Run runs the session starting from the last checkpoint ledger. +// Returns nil when session has been shutdown. func (s *LiveSession) Run() error { s.standardSession.shutdown = make(chan bool) @@ -96,6 +98,7 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error { } // Resume resumes the session from `ledgerSequence`. +// Returns nil when session has been shutdown. // // WARNING: it's likely that developers will use `GetLatestSuccessfullyProcessedLedger()` // to get the latest successfuly processed ledger after `Resume` returns error. @@ -107,6 +110,14 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error { func (s *LiveSession) Resume(ledgerSequence uint32) error { s.standardSession.shutdown = make(chan bool) + err := s.validate() + if err != nil { + return errors.Wrap(err, "Validation error") + } + + s.setRunningState(true) + defer s.setRunningState(false) + ledgerAdapter := &adapters.LedgerBackendAdapter{ Backend: s.LedgerBackend, } @@ -176,7 +187,6 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg select { case <-s.standardSession.shutdown: - s.LedgerPipeline.Shutdown() return nil case <-time.After(time.Second): // TODO make the idle time smaller @@ -193,27 +203,20 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg ledgerReader = reporterLedgerReader{ledgerReader, s.LedgerReporter} } - errChan := s.LedgerPipeline.Process(ledgerReader) - select { - case err2 := <-errChan: - if err2 != nil { - // Return with no errors if pipeline shutdown - if err2 == pipeline.ErrShutdown { - s.LedgerReporter.OnEndLedger(nil, true) - return nil - } - + err = <-s.LedgerPipeline.Process(ledgerReader) + if err != nil { + // Return with no errors if pipeline shutdown + if err == pipeline.ErrShutdown { if s.LedgerReporter != nil { - s.LedgerReporter.OnEndLedger(err2, false) + s.LedgerReporter.OnEndLedger(nil, true) } - return errors.Wrap(err2, "Ledger pipeline errored") + return nil } - case <-s.standardSession.shutdown: + if s.LedgerReporter != nil { - s.LedgerReporter.OnEndLedger(nil, true) + s.LedgerReporter.OnEndLedger(err, false) } - s.LedgerPipeline.Shutdown() - return nil + return errors.Wrap(err, "Ledger pipeline errored") } if s.LedgerReporter != nil { @@ -229,6 +232,14 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg } ledgerSequence++ + + // Exit early if Shutdown() was called. + select { + case <-s.standardSession.shutdown: + return nil + default: + // Continue + } } return nil @@ -276,26 +287,20 @@ func (s *LiveSession) initState(historyAdapter *adapters.HistoryArchiveAdapter, stateReader = reporterStateReader{stateReader, s.StateReporter} } - errChan := s.StatePipeline.Process(stateReader) - select { - case err := <-errChan: - if err != nil { - // Return with no errors if pipeline shutdown - if err == pipeline.ErrShutdown { - s.StateReporter.OnEndState(nil, true) - return nil - } - + err = <-s.StatePipeline.Process(stateReader) + if err != nil { + // Return with no errors if pipeline shutdown + if err == pipeline.ErrShutdown { if s.StateReporter != nil { - s.StateReporter.OnEndState(err, false) + s.StateReporter.OnEndState(nil, true) } - return errors.Wrap(err, "State pipeline errored") + return nil } - case <-s.standardSession.shutdown: + if s.StateReporter != nil { - s.StateReporter.OnEndState(nil, true) + s.StateReporter.OnEndState(err, false) } - s.StatePipeline.Shutdown() + return errors.Wrap(err, "State pipeline errored") } if s.StateReporter != nil { @@ -303,3 +308,26 @@ func (s *LiveSession) initState(historyAdapter *adapters.HistoryArchiveAdapter, } return nil } + +// Shutdown gracefully stops the pipelines and the session. This method blocks +// until pipelines are gracefully shutdown. +func (s *LiveSession) Shutdown() { + // Send shutdown signal + s.standardSession.Shutdown() + + // Shutdown pipelines + s.StatePipeline.Shutdown() + s.LedgerPipeline.Shutdown() + + // Shutdown signals sent, block/wait until pipelines are done + // shutting down. + for { + stateRunning := s.StatePipeline.IsRunning() + ledgerRunning := s.LedgerPipeline.IsRunning() + if stateRunning || ledgerRunning { + time.Sleep(time.Second) + continue + } + break + } +} diff --git a/exp/ingest/verify/main.go b/exp/ingest/verify/main.go index eb7d1f8b5a..07245689f5 100644 --- a/exp/ingest/verify/main.go +++ b/exp/ingest/verify/main.go @@ -7,6 +7,7 @@ import ( "encoding/base64" stdio "io" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" @@ -20,18 +21,6 @@ import ( // that will be used for equality check. type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) -// StateError are errors indicating invalid state. Type is used to differentiate -// between network, i/o, marshaling, bad usage etc. errors and actual state errors. -// You can use type assertion or type switch to check for type. -type StateError struct { - error -} - -// NewStateError creates a new StateError. -func NewStateError(err error) StateError { - return StateError{err} -} - // StateVerifier verifies if ledger entries provided by Add method are the same // as in the checkpoint ledger entries provided by SingleLedgerStateReader. // The algorithm works in the following way: @@ -123,11 +112,11 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { expectedEntry, exist := v.currentEntries[key] if !exist { - return StateError{errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Cannot find entry in currentEntries map: %s (key = %s)", base64.StdEncoding.EncodeToString(actualEntryMarshaled), key, - )} + )) } delete(v.currentEntries, key) @@ -156,12 +145,12 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { } if !bytes.Equal(actualEntryMarshaled, expectedEntryMarshaled) { - return StateError{errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Entry does not match the fetched entry. Expected: %s (pretransform = %s), actual: %s", base64.StdEncoding.EncodeToString(expectedEntryMarshaled), base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled), base64.StdEncoding.EncodeToString(actualEntryMarshaled), - )} + )) } return nil @@ -186,11 +175,11 @@ func (v *StateVerifier) Verify(countAll int) error { } if v.readEntries != countAll { - return StateError{errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Number of entries read using GetEntries (%d) does not match number of entries in your storage (%d).", v.readEntries, countAll, - )} + )) } return nil @@ -206,11 +195,11 @@ func (v *StateVerifier) checkUnreadEntries() error { // Ignore error as StateError below is more important entryString, _ := xdr.MarshalBase64(entry) - return StateError{errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Entries (%d) not found locally, example: %s", len(v.currentEntries), entryString, - )} + )) } return nil diff --git a/exp/ingest/verify/main_test.go b/exp/ingest/verify/main_test.go index 8bd4a1fff7..0274dfb3bf 100644 --- a/exp/ingest/verify/main_test.go +++ b/exp/ingest/verify/main_test.go @@ -5,6 +5,7 @@ import ( stdio "io" "testing" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" @@ -13,7 +14,7 @@ import ( ) func assertStateError(t *testing.T, err error, expectStateError bool) { - _, ok := err.(StateError) + _, ok := err.(ingesterrors.StateError) if expectStateError { assert.True(t, ok, "err should be StateError") } else { diff --git a/exp/support/pipeline/pipeline.go b/exp/support/pipeline/pipeline.go index 24eed9c10f..edefb14dd2 100644 --- a/exp/support/pipeline/pipeline.go +++ b/exp/support/pipeline/pipeline.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" "sync" - "time" "github.com/stellar/go/support/errors" ) @@ -86,6 +85,14 @@ func (p *Pipeline) setRunning(setRunning bool) error { return nil } +// IsRunning returns true if pipeline is running +func (p *Pipeline) IsRunning() bool { + // Protects internal fields + p.mutex.Lock() + defer p.mutex.Unlock() + return p.running +} + // reset resets internal state of the pipeline and all the nodes and processors. func (p *Pipeline) reset() { p.cancelled = false @@ -192,8 +199,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip } }() - finishUpdatingStats := p.updateStats(node, reader, writer) - for i, child := range node.Children { wg.Add(1) go func(i int, child *PipelineNode) { @@ -209,8 +214,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip go func() { wg.Wait() - finishUpdatingStats <- true - if node == p.root { // If pipeline processing is finished run post-hooks and send error // if not already sent. @@ -259,40 +262,8 @@ func (p *Pipeline) Shutdown() { } p.shutDown = true p.cancelled = true - p.cancelFunc() -} - -func (p *Pipeline) updateStats(node *PipelineNode, reader Reader, writer *multiWriter) chan<- bool { - // Update stats - interval := time.Second - done := make(chan bool) - ticker := time.NewTicker(interval) - - go func() { - defer ticker.Stop() - - for { - // This is not thread-safe: check if Mutex slows it down a lot... - readBuffer, readBufferIsBufferedReadWriter := reader.(*BufferedReadWriter) - - node.writesPerSecond = (writer.wroteEntries - node.wroteEntries) * int(time.Second/interval) - node.wroteEntries = writer.wroteEntries - - if readBufferIsBufferedReadWriter { - node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval) - node.readEntries = readBuffer.readEntries - node.queuedEntries = readBuffer.QueuedEntries() - } - - select { - case <-ticker.C: - continue - case <-done: - // Pipeline done - return - } - } - }() - - return done + // It's possible that Shutdown will be called before first run. + if p.cancelFunc != nil { + p.cancelFunc() + } } diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 20ea319175..8acded941a 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -6,6 +6,13 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). As this project is pre 1.0, breaking changes may happen for minor version bumps. A breaking change will get clearly notified in this log. +## v0.24.1 + +* Add cache to improve performance of experimental ingestion system (#[2004](https://github.com/stellar/go/pull/2004)). +* Fix experimental ingestion bug where ledger changes were not applied in the correct order (#[2050](https://github.com/stellar/go/pull/2050)). +* Fix experimental ingestion bug where unique constraint errors are incurred when the ingestion system has to reingest state from history archive checkpoints (#[2055](https://github.com/stellar/go/pull/2055)). +* Fix experimental ingestion bug where a race condition during shutdown leads to a crash (#[2058](https://github.com/stellar/go/pull/2058)). + ## v0.24.0 * Add `fee_charged` and `max_fee` objects to `/fee_stats` endpoint ([#1964](https://github.com/stellar/go/pull/1964)). diff --git a/services/horizon/internal/db2/history/ledger.go b/services/horizon/internal/db2/history/ledger.go index 8d238d3784..a410f92df3 100644 --- a/services/horizon/internal/db2/history/ledger.go +++ b/services/horizon/internal/db2/history/ledger.go @@ -165,6 +165,28 @@ func (q *Q) InsertExpLedger( return result.RowsAffected() } +// ExpIngestRemovalSummary describes how many rows in the experimental ingestion +// history tables have been deleted by RemoveExpIngestHistory() +type ExpIngestRemovalSummary struct { + LedgersRemoved int64 +} + +// RemoveExpIngestHistory removes all rows in the experimental ingestion +// history tables which have a ledger sequence higher than `newerThanSequence` +func (q *Q) RemoveExpIngestHistory(newerThanSequence uint32) (ExpIngestRemovalSummary, error) { + result, err := q.Exec( + sq.Delete("exp_history_ledgers"). + Where("sequence > ?", newerThanSequence), + ) + if err != nil { + return ExpIngestRemovalSummary{}, err + } + + summary := ExpIngestRemovalSummary{} + summary.LedgersRemoved, err = result.RowsAffected() + return summary, err +} + func ledgerHeaderToMap( ledger xdr.LedgerHeaderHistoryEntry, successTxsCount int, diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index 2037f64e84..603ae5a968 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -247,3 +247,80 @@ func TestCheckExpLedger(t *testing.T) { tt.Assert.True(valid) } } + +func TestRemoveExpIngestHistory(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + summary, err := q.RemoveExpIngestHistory(69859) + tt.Assert.Equal(ExpIngestRemovalSummary{}, summary) + tt.Assert.NoError(err) + + ledger := Ledger{ + Sequence: 69859, + PreviousLedgerHash: null.NewString("4b0b8bace3b2438b2404776ce57643966855487ba6384724a3c664c7aa4cd9e4", true), + ImporterVersion: 321, + TransactionCount: 12, + SuccessfulTransactionCount: new(int32), + FailedTransactionCount: new(int32), + OperationCount: 23, + TotalCoins: 23451, + FeePool: 213, + BaseReserve: 687, + MaxTxSetSize: 345, + ProtocolVersion: 12, + BaseFee: 100, + ClosedAt: time.Now().UTC().Truncate(time.Second), + LedgerHeaderXDR: null.NewString("temp", true), + } + *ledger.SuccessfulTransactionCount = 12 + *ledger.FailedTransactionCount = 3 + hashes := []string{ + "4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "5db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "6db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "7db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "8db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + } + + for i, hash := range hashes { + ledger.TotalOrderID.ID = toid.New(ledger.Sequence, 0, 0).ToInt64() + ledger.LedgerHash = hash + if i > 0 { + ledger.PreviousLedgerHash = null.NewString(hashes[i-1], true) + } + + insertSQL := sq.Insert("exp_history_ledgers").SetMap(ledgerToMap(ledger)) + _, err = q.Exec(insertSQL) + tt.Assert.NoError(err) + + ledger.Sequence++ + } + + var ledgers []Ledger + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 5) + + summary, err = q.RemoveExpIngestHistory(69863) + tt.Assert.Equal(ExpIngestRemovalSummary{}, summary) + tt.Assert.NoError(err) + + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 5) + + summary, err = q.RemoveExpIngestHistory(69861) + tt.Assert.Equal(ExpIngestRemovalSummary{2}, summary) + tt.Assert.NoError(err) + + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 3) + + for _, ledger := range ledgers { + tt.Assert.LessOrEqual(ledger.Sequence, int32(69861)) + } +} diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 9159f203b1..cec34705cc 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/jmoiron/sqlx" "github.com/stellar/go/clients/stellarcore" "github.com/stellar/go/exp/ingest" "github.com/stellar/go/exp/ingest/io" @@ -39,7 +40,9 @@ const ( // when preauth tx is failed. // - 9: Fixes a bug in asset stats processor that counted unauthorized // trustlines. - CurrentVersion = 9 + // - 10: Fixes a bug in meta processing (fees are now processed before + // everything else). + CurrentVersion = 10 ) var log = logpkg.DefaultLogger.WithField("service", "expingest") @@ -64,12 +67,14 @@ type Config struct { type dbQ interface { Begin() error Rollback() error + GetTx() *sqlx.Tx GetLastLedgerExpIngest() (uint32, error) GetExpIngestVersion() (int, error) UpdateLastLedgerExpIngest(uint32) error UpdateExpStateInvalid(bool) error GetExpStateInvalid() (bool, error) GetAllOffers() ([]history.Offer, error) + RemoveExpIngestHistory(uint32) (history.ExpIngestRemovalSummary, error) } type dbSession interface { @@ -292,6 +297,10 @@ func (s *System) Run() { "err": err, "last_ingested_ledger": lastIngestedLedger, }).Error("Error running session, resuming from the last ingested ledger") + } else { + // LiveSession.Run returns nil => shutdown + log.Info("Session shut down") + return nil } } else { // The other node already ingested a state (just now or in the past) diff --git a/services/horizon/internal/expingest/pipeline_hooks_test.go b/services/horizon/internal/expingest/pipeline_hooks_test.go index e85ffcbd9e..ebd4ed6536 100644 --- a/services/horizon/internal/expingest/pipeline_hooks_test.go +++ b/services/horizon/internal/expingest/pipeline_hooks_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/jmoiron/sqlx" "github.com/stellar/go/exp/ingest/pipeline" "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2" @@ -12,102 +13,157 @@ import ( "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" + "github.com/stretchr/testify/suite" ) -func TestStatePreProcessingHook(t *testing.T) { - tt := test.Start(t).Scenario("base") - defer tt.Finish() +type PreProcessingHookTestSuite struct { + suite.Suite + historyQ *mockDBQ + system *System + ctx context.Context + ledgerSeqFromContext uint32 +} - system := &System{} - session := tt.HorizonSession() - defer session.Rollback() - ctx := context.WithValue( +func TestPreProcessingHookTestSuite(t *testing.T) { + suite.Run(t, new(PreProcessingHookTestSuite)) +} + +func (s *PreProcessingHookTestSuite) SetupTest() { + s.system = &System{} + s.historyQ = &mockDBQ{} + s.ledgerSeqFromContext = uint32(5) + + s.ctx = context.WithValue( context.Background(), pipeline.LedgerSequenceContextKey, - uint32(0), + s.ledgerSeqFromContext, ) - pipelineType := statePipeline - historyQ := &history.Q{session} - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(0)) - - tt.Assert.Nil(session.GetTx()) - newCtx, err := preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.False(system.StateReady()) } -func TestLedgerPreProcessingHook(t *testing.T) { - tt := test.Start(t).Scenario("base") - defer tt.Finish() +func (s *PreProcessingHookTestSuite) TearDownTest() { + s.historyQ.AssertExpectations(s.T()) +} - system := &System{} - session := tt.HorizonSession() - defer session.Rollback() - ctx := context.WithValue( - context.Background(), - pipeline.LedgerSequenceContextKey, - uint32(2), +func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithPreExistingTx() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{3}, nil, ) - pipelineType := ledgerPipeline - historyQ := &history.Q{session} - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(1)) - - tt.Assert.Nil(session.GetTx()) - newCtx, err := preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(2)) - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.Nil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.Nil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.True(system.StateReady()) + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithoutPreExistingTx() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{3}, nil, + ) + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnGetLastLedgerExpIngestError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error getting last ledger: transient error") +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnRemoveExpIngestHistoryError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{}, errors.New("transient error"), + ) + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error removing exp ingest history: transient error") +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnBeginError() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error starting a transaction: transient error") +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithPreExistingTx() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithoutPreExistingTx() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsAsMaster() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(s.ledgerSeqFromContext-1, nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().NotNil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnGetLastLedgerExpIngestError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error getting last ledger: transient error") +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnBeginError() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error starting a transaction: transient error") } func TestPostProcessingHook(t *testing.T) { diff --git a/services/horizon/internal/expingest/pipelines.go b/services/horizon/internal/expingest/pipelines.go index 0c0df48aa7..19713241fb 100644 --- a/services/horizon/internal/expingest/pipelines.go +++ b/services/horizon/internal/expingest/pipelines.go @@ -5,9 +5,9 @@ import ( "fmt" "github.com/stellar/go/exp/ingest" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/pipeline" "github.com/stellar/go/exp/ingest/processors" - "github.com/stellar/go/exp/ingest/verify" "github.com/stellar/go/exp/orderbook" supportPipeline "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -141,7 +141,7 @@ func preProcessingHook( ctx context.Context, pipelineType pType, system *System, - historySession *db.Session, + historyQ dbQ, ) (context.Context, error) { var err error defer func() { @@ -149,18 +149,16 @@ func preProcessingHook( // queries will fail indefinietly with a following error: // current transaction is aborted, commands ignored until end of transaction block if err != nil { - historySession.Rollback() + historyQ.Rollback() } }() - historyQ := &history.Q{historySession} - // Start a transaction only if not in a transaction already. // The only case this can happen is during the first run when // a transaction is started to get the latest ledger `FOR UPDATE` // in `System.Run()`. - if tx := historySession.GetTx(); tx == nil { - err = historySession.Begin() + if tx := historyQ.GetTx(); tx == nil { + err = historyQ.Begin() if err != nil { return ctx, errors.Wrap(err, "Error starting a transaction") } @@ -180,6 +178,12 @@ func preProcessingHook( // State pipeline is always fully run because loading offers // from a database is done outside the pipeline. updateDatabase = true + var summary history.ExpIngestRemovalSummary + summary, err = historyQ.RemoveExpIngestHistory(ledgerSeq) + if err != nil { + return ctx, errors.Wrap(err, "Error removing exp ingest history") + } + log.WithField("historyRemoved", summary).Info("removed entries from historical ingestion tables") } else { // mark the system as ready because we have progressed to running // the ledger pipeline @@ -197,7 +201,7 @@ func preProcessingHook( // If we are not going to update a DB release a lock by rolling back a // transaction. if !updateDatabase { - historySession.Rollback() + historyQ.Rollback() } log.WithFields(logpkg.F{ @@ -230,7 +234,7 @@ func postProcessingHook( } switch errors.Cause(err).(type) { - case verify.StateError: + case ingesterrors.StateError: markStateInvalid(historySession, err) default: log. @@ -305,7 +309,7 @@ func postProcessingHook( if err != nil { errorCount := system.incrementStateVerificationErrors() switch errors.Cause(err).(type) { - case verify.StateError: + case ingesterrors.StateError: markStateInvalid(historySession, err) default: logger := log.WithField("err", err).Warn @@ -348,9 +352,10 @@ func addPipelineHooks( default: panic(fmt.Sprintf("Unknown pipeline type %T", p)) } + historyQ := &history.Q{historySession} p.AddPreProcessingHook(func(ctx context.Context) (context.Context, error) { - return preProcessingHook(ctx, pipelineType, system, historySession) + return preProcessingHook(ctx, pipelineType, system, historyQ) }) p.AddPostProcessingHook(func(ctx context.Context, err error) error { diff --git a/services/horizon/internal/expingest/processors/accounts_data_processor_test.go b/services/horizon/internal/expingest/processors/accounts_data_processor_test.go index 9621479626..591f463488 100644 --- a/services/horizon/internal/expingest/processors/accounts_data_processor_test.go +++ b/services/horizon/internal/expingest/processors/accounts_data_processor_test.go @@ -210,12 +210,6 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestNewAccount() { }), }, nil).Once() - s.mockQ.On( - "InsertAccountData", - data, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedData := xdr.DataEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), DataName: "test", @@ -252,8 +246,9 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestNewAccount() { }, }), }, nil).Once() + // We use LedgerEntryChangesCache so all changes are squashed s.mockQ.On( - "UpdateAccountData", + "InsertAccountData", updatedData, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -360,12 +355,6 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - s.mockQ.On( - "InsertAccountData", - data, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() @@ -402,7 +391,7 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.mockLedgerReader.On("Close").Return(nil).Once() s.mockQ.On( - "UpdateAccountData", + "InsertAccountData", modifiedData, lastModifiedLedgerSeq+1, ).Return(int64(1), nil).Once() diff --git a/services/horizon/internal/expingest/processors/accounts_processor_test.go b/services/horizon/internal/expingest/processors/accounts_processor_test.go index 1adfe3e692..32c7bfe3a4 100644 --- a/services/horizon/internal/expingest/processors/accounts_processor_test.go +++ b/services/horizon/internal/expingest/processors/accounts_processor_test.go @@ -208,12 +208,6 @@ func (s *AccountsProcessorTestSuiteLedger) TestNewAccount() { }), }, nil).Once() - s.mockQ.On( - "InsertAccount", - account, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedAccount := xdr.AccountEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Thresholds: [4]byte{0, 1, 2, 3}, @@ -259,8 +253,10 @@ func (s *AccountsProcessorTestSuiteLedger) TestNewAccount() { }, }), }, nil).Once() + + // We use LedgerEntryChangesCache so all changes are squashed s.mockQ.On( - "UpdateAccount", + "InsertAccount", updatedAccount, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -362,12 +358,6 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - s.mockQ.On( - "InsertAccount", - account, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedAccount := xdr.AccountEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Thresholds: [4]byte{0, 1, 2, 3}, @@ -406,7 +396,7 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.mockLedgerReader.On("Close").Return(nil).Once() s.mockQ.On( - "UpdateAccount", + "InsertAccount", updatedAccount, lastModifiedLedgerSeq+1, ).Return(int64(1), nil).Once() @@ -420,3 +410,93 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.Assert().NoError(err) } + +func (s *AccountsProcessorTestSuiteLedger) TestFeeProcessedBeforeEverythingElse() { + s.mockLedgerReader.On("Read"). + Return(io.LedgerTransaction{ + Meta: createTransactionMeta([]xdr.OperationMeta{ + xdr.OperationMeta{ + Changes: []xdr.LedgerEntryChange{ + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 100, + }, + }, + }, + }, + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 300, + }, + }, + }, + }, + }, + }, + }), + }, nil).Once() + + s.mockLedgerReader.On("Read"). + Return(io.LedgerTransaction{ + FeeChanges: []xdr.LedgerEntryChange{ + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 200, + }, + }, + }, + }, + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 100, + }, + }, + }, + }, + }, + Meta: xdr.TransactionMeta{ + Operations: &[]xdr.OperationMeta{}, + }, + }, nil).Once() + + expectedAccount := xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + // If fee meta wasn't procesed before everything else, this would be 100 + Balance: 300, + } + + s.mockQ.On("UpdateAccount", expectedAccount, xdr.Uint32(0)).Return(int64(1), nil).Once() + + s.mockLedgerReader. + On("Read"). + Return(io.LedgerTransaction{}, stdio.EOF).Once() + + err := s.processor.ProcessLedger( + context.Background(), + &supportPipeline.Store{}, + s.mockLedgerReader, + s.mockLedgerWriter, + ) + + s.Assert().NoError(err) +} diff --git a/services/horizon/internal/expingest/processors/accounts_signer_processor_test.go b/services/horizon/internal/expingest/processors/accounts_signer_processor_test.go index 951dd1b152..5ea019ec48 100644 --- a/services/horizon/internal/expingest/processors/accounts_signer_processor_test.go +++ b/services/horizon/internal/expingest/processors/accounts_signer_processor_test.go @@ -5,8 +5,8 @@ import ( stdio "io" "testing" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" - "github.com/stellar/go/exp/ingest/verify" supportPipeline "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -603,10 +603,6 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestRemoveAccount() { } func (s *AccountsSignerProcessorTestSuiteLedger) TestNewAccountNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{ @@ -630,6 +626,8 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestNewAccountNoRowsAffected() }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ. On( "CreateAccountSigner", @@ -647,20 +645,16 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestNewAccountNoRowsAffected() ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) s.Assert().EqualError( err, - "Error in AccountsForSigner handler: No rows affected when inserting "+ + "Error in AccountsForSigner handler: 0 rows affected when inserting "+ "account=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML "+ "signer=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML to database", ) } func (s *AccountsSignerProcessorTestSuiteLedger) TestRemoveAccountNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{ @@ -693,6 +687,8 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestRemoveAccountNoRowsAffected }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ. On( "RemoveAccountSigner", @@ -709,12 +705,13 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestRemoveAccountNoRowsAffected ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) s.Assert().EqualError( err, "Error in AccountsForSigner handler: Expected "+ "account=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML "+ - "signer=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML in database but not found when removing", + "signer=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML in database but not found when removing "+ + "(rows affected = 0)", ) } @@ -773,34 +770,6 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - // Remove old signer - s.mockQ. - On( - "RemoveAccountSigner", - "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - "GCBBDQLCTNASZJ3MTKAOYEOWRGSHDFAJVI7VPZUOP7KXNHYR3HP2BUKV", - ). - Return(int64(1), nil).Once() - - // Create new and old signer - s.mockQ. - On( - "CreateAccountSigner", - "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - "GCBBDQLCTNASZJ3MTKAOYEOWRGSHDFAJVI7VPZUOP7KXNHYR3HP2BUKV", - int32(10), - ). - Return(int64(1), nil).Once() - - s.mockQ. - On( - "CreateAccountSigner", - "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - "GCAHY6JSXQFKWKP6R7U5JPXDVNV4DJWOWRFLY3Y6YPBF64QRL4BPFDNS", - int32(15), - ). - Return(int64(1), nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() @@ -836,13 +805,17 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestProcessUpgradeChange() { Key: xdr.MustSigner("GCBBDQLCTNASZJ3MTKAOYEOWRGSHDFAJVI7VPZUOP7KXNHYR3HP2BUKV"), Weight: 12, }, + xdr.Signer{ + Key: xdr.MustSigner("GCAHY6JSXQFKWKP6R7U5JPXDVNV4DJWOWRFLY3Y6YPBF64QRL4BPFDNS"), + Weight: 15, + }, }, }, }, }, }, nil).Once() - // Update signer + // Remove old signer s.mockQ. On( "RemoveAccountSigner", @@ -851,6 +824,7 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestProcessUpgradeChange() { ). Return(int64(1), nil).Once() + // Create new and old (updated) signer s.mockQ. On( "CreateAccountSigner", @@ -860,6 +834,15 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestProcessUpgradeChange() { ). Return(int64(1), nil).Once() + s.mockQ. + On( + "CreateAccountSigner", + "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + "GCAHY6JSXQFKWKP6R7U5JPXDVNV4DJWOWRFLY3Y6YPBF64QRL4BPFDNS", + int32(15), + ). + Return(int64(1), nil).Once() + s.mockLedgerReader. On("ReadUpgradeChange"). Return(io.Change{}, stdio.EOF).Once() diff --git a/services/horizon/internal/expingest/processors/database_processor.go b/services/horizon/internal/expingest/processors/database_processor.go index 3610ea1e21..7cf0ef442a 100644 --- a/services/horizon/internal/expingest/processors/database_processor.go +++ b/services/horizon/internal/expingest/processors/database_processor.go @@ -7,9 +7,9 @@ import ( stdio "io" "math/big" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" ingestpipeline "github.com/stellar/go/exp/ingest/pipeline" - "github.com/stellar/go/exp/ingest/verify" "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -191,6 +191,7 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S }() defer w.Close() + ledgerCache := io.NewLedgerEntryChangeCache() p.AssetStatSet = AssetStatSet{} actionHandlers := map[DatabaseProcessorActionType]func(change io.Change) error{ @@ -213,7 +214,8 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S var successTxCount, failedTxCount, opCount int - // Process transaction meta + // Get all transactions + var transactions []io.LedgerTransaction for { transaction, err := r.Read() if err != nil { @@ -231,51 +233,81 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S failedTxCount++ } - for _, action := range actions { - handler, ok := actionHandlers[action] - if !ok { - return errors.New("Unknown action") + transactions = append(transactions, transaction) + } + + if p.Action != Ledgers { + // Remember that it's possible that transaction can remove a preauth + // tx signer even when it's a failed transaction so we need to check + // failed transactions too. + + // Fees are processed before everything else. + for _, transaction := range transactions { + for _, change := range transaction.GetFeeChanges() { + err := ledgerCache.AddChange(change) + if err != nil { + return errors.Wrap(err, "error adding to ledgerCache") + } } - // Remember that it's possible that transaction can remove a preauth - // tx signer even when it's a failed transaction. + select { + case <-ctx.Done(): + return nil + default: + continue + } + } + // Tx meta + for _, transaction := range transactions { for _, change := range transaction.GetChanges() { - err := handler(change) + err := ledgerCache.AddChange(change) if err != nil { - return errors.Wrap( - err, - fmt.Sprintf("Error in %s handler", action), - ) + return errors.Wrap(err, "error adding to ledgerCache") } } - } - select { - case <-ctx.Done(): - return nil - default: - continue + select { + case <-ctx.Done(): + return nil + default: + continue + } } - } - // Process upgrades meta - for { - change, err := r.ReadUpgradeChange() - if err != nil { - if err == stdio.EOF { - break - } else { - return err + // Process upgrades meta + for { + change, err := r.ReadUpgradeChange() + if err != nil { + if err == stdio.EOF { + break + } else { + return err + } } - } - for _, action := range actions { - handler, ok := actionHandlers[action] - if !ok { - return errors.New("Unknown action") + err = ledgerCache.AddChange(change) + if err != nil { + return errors.Wrap(err, "error addint to ledgerCache") } + select { + case <-ctx.Done(): + return nil + default: + continue + } + } + } + + changes := ledgerCache.GetChanges() + for _, action := range actions { + handler, ok := actionHandlers[action] + if !ok { + return errors.New("Unknown action") + } + + for _, change := range changes { err := handler(change) if err != nil { return errors.Wrap( @@ -312,7 +344,7 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S if assetStatNotFound { // Insert if delta.NumAccounts < 0 { - return verify.NewStateError(errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "NumAccounts negative but DB entry does not exist for asset: %s %s %s", delta.AssetType, delta.AssetCode, @@ -343,7 +375,7 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S if statAccounts == 0 { // Remove stats if statBalance.Cmp(big.NewInt(0)) != 0 { - return verify.NewStateError(errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Removing asset stat by final amount non-zero for: %s %s %s", delta.AssetType, delta.AssetCode, @@ -374,8 +406,9 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when adjusting asset stat for asset: %s %s %s", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when adjusting asset stat for asset: %s %s %s", + rowsAffected, delta.AssetType, delta.AssetCode, delta.AssetIssuer, @@ -415,9 +448,9 @@ func (p *DatabaseProcessor) ingestLedgerHeader( if rowsAffected != 1 { log.WithField("rowsAffected", rowsAffected). WithField("sequence", r.GetSequence()). - Error("No rows affected when ingesting new ledger") + Error("Invalid number of rows affected when ingesting new ledger") return errors.Errorf( - "No rows affected when ingesting new ledger: %v", + "0 rows affected when ingesting new ledger: %v", r.GetSequence(), ) } @@ -495,8 +528,9 @@ func (p *DatabaseProcessor) processLedgerAccounts(change io.Change) error { } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when %s account %s", + return ingesterrors.NewStateError(errors.Errorf( + "%d No rows affected when %s account %s", + rowsAffected, action, accountID, )) @@ -550,8 +584,9 @@ func (p *DatabaseProcessor) processLedgerAccountData(change io.Change) error { } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when %s data: %s %s", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when %s data: %s %s", + rowsAffected, action, ledgerKey.Data.AccountId.Address(), ledgerKey.Data.DataName, @@ -581,10 +616,11 @@ func (p *DatabaseProcessor) processLedgerAccountSigners(change io.Change) error } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "Expected account=%s signer=%s in database but not found when removing", + return ingesterrors.NewStateError(errors.Errorf( + "Expected account=%s signer=%s in database but not found when removing (rows affected = %d)", preAccountEntry.AccountId.Address(), signer, + rowsAffected, )) } } @@ -599,8 +635,9 @@ func (p *DatabaseProcessor) processLedgerAccountSigners(change io.Change) error } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when inserting account=%s signer=%s to database", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when inserting account=%s signer=%s to database", + rowsAffected, postAccountEntry.AccountId.Address(), signer, )) @@ -647,8 +684,9 @@ func (p *DatabaseProcessor) processLedgerOffers(change io.Change) error { } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when %s offer %d", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when %s offer %d", + rowsAffected, action, offerID, )) @@ -700,7 +738,7 @@ func (p *DatabaseProcessor) adjustAssetStat( // else, trustline was unauthorized and remains unauthorized // so there is no change to accounts or balances } else { - return verify.NewStateError(errors.New("both pre and post trustlines cannot be nil")) + return ingesterrors.NewStateError(errors.New("both pre and post trustlines cannot be nil")) } err := p.AssetStatSet.AddDelta(trustline.Asset, int64(deltaBalance), deltaAccounts) @@ -768,8 +806,9 @@ func (p *DatabaseProcessor) processLedgerTrustLines(change io.Change) error { } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when %s trustline: %s %s", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when %s trustline: %s %s", + rowsAffected, action, ledgerKey.TrustLine.AccountId.Address(), ledgerKey.TrustLine.Asset.String(), diff --git a/services/horizon/internal/expingest/processors/ledgers_processor_test.go b/services/horizon/internal/expingest/processors/ledgers_processor_test.go index f3a0733a8b..c250d8cbb1 100644 --- a/services/horizon/internal/expingest/processors/ledgers_processor_test.go +++ b/services/horizon/internal/expingest/processors/ledgers_processor_test.go @@ -74,10 +74,6 @@ func (s *LedgersProcessorTestSuiteLedger) SetupTest() { s.mockLedgerReader.On("GetSequence").Return(uint32(20)).Maybe() - s.mockLedgerReader. - On("ReadUpgradeChange"). - Return(io.Change{}, stdio.EOF).Once() - s.mockLedgerReader. On("Read"). Return(createTransaction(true, 1), nil).Once() @@ -120,10 +116,6 @@ func (s *LedgersProcessorTestSuiteLedger) TestInsertExpLedgerIgnoredWhenNotDatab On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() - s.mockLedgerReader. - On("ReadUpgradeChange"). - Return(io.Change{}, stdio.EOF).Once() - s.mockLedgerReader. On("Close"). Return(nil).Once() @@ -261,5 +253,5 @@ func (s *LedgersProcessorTestSuiteLedger) TestInsertExpLedgerNoRowsAffected() { s.mockLedgerWriter, ) s.Assert().Error(err) - s.Assert().EqualError(err, "No rows affected when ingesting new ledger: 20") + s.Assert().EqualError(err, "0 rows affected when ingesting new ledger: 20") } diff --git a/services/horizon/internal/expingest/processors/offers_processor_test.go b/services/horizon/internal/expingest/processors/offers_processor_test.go index b4f34c198b..6101f6e74a 100644 --- a/services/horizon/internal/expingest/processors/offers_processor_test.go +++ b/services/horizon/internal/expingest/processors/offers_processor_test.go @@ -5,8 +5,8 @@ import ( stdio "io" "testing" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" - "github.com/stellar/go/exp/ingest/verify" supportPipeline "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -56,8 +56,9 @@ func (s *OffersProcessorTestSuiteState) TearDownTest() { func (s *OffersProcessorTestSuiteState) TestCreateOffer() { offer := xdr.OfferEntry{ - OfferId: xdr.Int64(1), - Price: xdr.Price{1, 2}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(1), + Price: xdr.Price{1, 2}, } lastModifiedLedgerSeq := xdr.Uint32(123) s.mockStateReader. @@ -164,8 +165,9 @@ func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { // add offer offer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 2}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 2}, } lastModifiedLedgerSeq := xdr.Uint32(1234) s.mockLedgerReader.On("Read"). @@ -189,15 +191,10 @@ func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { }), }, nil).Once() - s.mockQ.On( - "InsertOffer", - offer, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedOffer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 6}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 6}, } s.mockLedgerReader.On("Read"). Return(io.LedgerTransaction{ @@ -230,8 +227,10 @@ func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { }, }), }, nil).Once() + + // We use LedgerEntryChangesCache so all changes are squashed s.mockQ.On( - "UpdateOffer", + "InsertOffer", updatedOffer, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -251,19 +250,17 @@ func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { } func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - lastModifiedLedgerSeq := xdr.Uint32(1234) offer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 2}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 2}, } updatedOffer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 6}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 6}, } s.mockLedgerReader.On("Read"). Return(io.LedgerTransaction{ @@ -296,6 +293,8 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { }, }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ.On( "UpdateOffer", updatedOffer, @@ -310,8 +309,8 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "Error in Offers handler: No rows affected when updating offer 2") + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "Error in Offers handler: 0 rows affected when updating offer 2") } func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { @@ -327,8 +326,9 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { Data: xdr.LedgerEntryData{ Type: xdr.LedgerEntryTypeOffer, Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(3), - Price: xdr.Price{3, 1}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), + Price: xdr.Price{3, 1}, }, }, }, @@ -338,7 +338,8 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { Removed: &xdr.LedgerKey{ Type: xdr.LedgerEntryTypeOffer, Offer: &xdr.LedgerKeyOffer{ - OfferId: xdr.Int64(3), + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), }, }, }, @@ -347,15 +348,15 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { }), }, nil).Once() + s.mockLedgerReader. + On("Read"). + Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ.On( "RemoveOffer", xdr.Int64(3), ).Return(int64(1), nil).Once() - s.mockLedgerReader. - On("Read"). - Return(io.LedgerTransaction{}, stdio.EOF).Once() - err := s.processor.ProcessLedger( context.Background(), &supportPipeline.Store{}, @@ -372,8 +373,9 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { // add offer offer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 2}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 2}, } lastModifiedLedgerSeq := xdr.Uint32(1234) s.mockLedgerReader.On("Read"). @@ -397,19 +399,14 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - s.mockQ.On( - "InsertOffer", - offer, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() updatedOffer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 6}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 6}, } s.mockLedgerReader. @@ -433,8 +430,9 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { }, }, nil).Once() + // We use LedgerEntryChangesCache so all changes are squashed s.mockQ.On( - "UpdateOffer", + "InsertOffer", updatedOffer, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -456,10 +454,6 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { } func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - // add offer s.mockLedgerReader.On("Read"). Return(io.LedgerTransaction{ @@ -472,8 +466,9 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { Data: xdr.LedgerEntryData{ Type: xdr.LedgerEntryTypeOffer, Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(3), - Price: xdr.Price{3, 1}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), + Price: xdr.Price{3, 1}, }, }, }, @@ -483,7 +478,8 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { Removed: &xdr.LedgerKey{ Type: xdr.LedgerEntryTypeOffer, Offer: &xdr.LedgerKeyOffer{ - OfferId: xdr.Int64(3), + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), }, }, }, @@ -491,6 +487,7 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { }, }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() s.mockQ.On( "RemoveOffer", @@ -505,6 +502,6 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "Error in Offers handler: No rows affected when removing offer 3") + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "Error in Offers handler: 0 rows affected when removing offer 3") } diff --git a/services/horizon/internal/expingest/processors/trust_lines_processor_test.go b/services/horizon/internal/expingest/processors/trust_lines_processor_test.go index 7e86bdbf87..bf396f2a9d 100644 --- a/services/horizon/internal/expingest/processors/trust_lines_processor_test.go +++ b/services/horizon/internal/expingest/processors/trust_lines_processor_test.go @@ -6,8 +6,8 @@ import ( stdio "io" "testing" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" - "github.com/stellar/go/exp/ingest/verify" supportPipeline "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -271,17 +271,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestInsertTrustLine() { }), }, nil).Once() - s.mockQ.On( - "InsertTrustLine", - trustLine, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - s.mockQ.On( - "InsertTrustLine", - unauthorizedTrustline, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedTrustLine := xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), @@ -347,12 +336,12 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestInsertTrustLine() { }), }, nil).Once() s.mockQ.On( - "UpdateTrustLine", + "InsertTrustLine", updatedTrustLine, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() s.mockQ.On( - "UpdateTrustLine", + "InsertTrustLine", updatedUnauthorizedTrustline, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -478,10 +467,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLine() { } func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLineNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - lastModifiedLedgerSeq := xdr.Uint32(1234) trustLine := xdr.TrustLineEntry{ @@ -527,6 +512,8 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLineNoRowsAffected() }, }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ.On( "UpdateTrustLine", updatedTrustLine, @@ -541,8 +528,8 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLineNoRowsAffected() ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "Error in TrustLines handler: No rows affected when updating trustline: GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB credit_alphanum4/EUR/GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "Error in TrustLines handler: 0 rows affected when updating trustline: GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB credit_alphanum4/EUR/GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") } func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() { @@ -817,12 +804,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - s.mockQ.On( - "InsertTrustLine", - trustLine, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() @@ -856,7 +837,7 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestProcessUpgradeChange() { }, nil).Once() s.mockQ.On( - "UpdateTrustLine", + "InsertTrustLine", updatedTrustLine, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -891,10 +872,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestProcessUpgradeChange() { } func (s *TrustLinesProcessorTestSuiteLedger) TestRemoveTrustlineNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - s.mockLedgerReader.On("Read"). Return(io.LedgerTransaction{ Meta: createTransactionMeta([]xdr.OperationMeta{ @@ -928,6 +905,7 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestRemoveTrustlineNoRowsAffected() }, }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() s.mockQ.On( "RemoveTrustLine", @@ -945,6 +923,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestRemoveTrustlineNoRowsAffected() ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "Error in TrustLines handler: No rows affected when removing trustline: GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB credit_alphanum4/EUR/GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "Error in TrustLines handler: 0 rows affected when removing trustline: GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB credit_alphanum4/EUR/GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") } diff --git a/services/horizon/internal/expingest/run_ingestion_test.go b/services/horizon/internal/expingest/run_ingestion_test.go index b260093f49..8aa15b1069 100644 --- a/services/horizon/internal/expingest/run_ingestion_test.go +++ b/services/horizon/internal/expingest/run_ingestion_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/jmoiron/sqlx" "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/db" @@ -44,6 +45,11 @@ func (m *mockDBQ) Rollback() error { return args.Error(0) } +func (m *mockDBQ) GetTx() *sqlx.Tx { + args := m.Called() + return args.Get(0).(*sqlx.Tx) +} + func (m *mockDBQ) GetLastLedgerExpIngest() (uint32, error) { args := m.Called() return args.Get(0).(uint32), args.Error(1) @@ -74,6 +80,11 @@ func (m *mockDBQ) GetAllOffers() ([]history.Offer, error) { return args.Get(0).([]history.Offer), args.Error(1) } +func (m *mockDBQ) RemoveExpIngestHistory(newerThanSequence uint32) (history.ExpIngestRemovalSummary, error) { + args := m.Called(newerThanSequence) + return args.Get(0).(history.ExpIngestRemovalSummary), args.Error(1) +} + type mockIngestSession struct { mock.Mock } @@ -242,7 +253,6 @@ func (s *RunIngestionTestSuite) TestOutdatedIngestVersion() { s.session.On("TruncateTables", history.ExperimentalIngestionTables).Return(nil).Once() s.ingestSession.On("Run").Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.ingestSession.On("Resume", uint32(4)).Return(nil).Once() s.system.retry = expectError(s.Assert(), "") } diff --git a/services/horizon/internal/expingest/verify.go b/services/horizon/internal/expingest/verify.go index 1b5ebf8865..2c9d577f65 100644 --- a/services/horizon/internal/expingest/verify.go +++ b/services/horizon/internal/expingest/verify.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stellar/go/exp/ingest/adapters" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/exp/ingest/verify" "github.com/stellar/go/services/horizon/internal/db2" @@ -27,7 +28,7 @@ const assetStatsBatchSize = 500 // check them. // There is a test that checks it, to fix it: update the actual `verifyState` // method instead of just updating this value! -const stateVerifierExpectedIngestionVersion = 9 +const stateVerifierExpectedIngestionVersion = 10 // verifyState is called as a go routine from pipeline post hook every 64 // ledgers. It checks if the state is correct. If another go routine is already @@ -195,7 +196,7 @@ func (s *System) verifyState(graphOffers map[xdr.Int64]xdr.OfferEntry) error { localLog.WithField("total", total).Info("Finished writing to StateVerifier") if len(graphOffers) != 0 { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "orderbook graph contains %v offers missing from HAS", len(graphOffers), @@ -255,7 +256,7 @@ func checkAssetStats(set processors.AssetStatSet, q *history.Q) error { for _, assetStat := range assetStats { fromSet, removed := set.Remove(assetStat.AssetType, assetStat.AssetCode, assetStat.AssetIssuer) if !removed { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "db contains asset stat with code %s issuer %s which is missing from HAS", assetStat.AssetCode, assetStat.AssetIssuer, @@ -264,7 +265,7 @@ func checkAssetStats(set processors.AssetStatSet, q *history.Q) error { } if fromSet != assetStat { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "db asset stat with code %s issuer %s does not match asset stat from HAS", assetStat.AssetCode, assetStat.AssetIssuer, @@ -277,7 +278,7 @@ func checkAssetStats(set processors.AssetStatSet, q *history.Q) error { } if len(set) > 0 { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "HAS contains %d more asset stats than db", len(set), @@ -327,7 +328,7 @@ func addAccountsToStateVerifier(verifier *verify.StateVerifier, q *history.Q, id // Ensure master weight matches, if not it's a state error! if int32(row.MasterWeight) != masterWeightMap[row.AccountID] { - return verify.NewStateError(errors.New("Master key weight in accounts does not match ")) + return ingesterrors.NewStateError(errors.New("Master key weight in accounts does not match ")) } account := &xdr.AccountEntry{ @@ -453,14 +454,14 @@ func addOffersToStateVerifier( } graphOffer, ok := graphOffers[row.OfferID] if !ok { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf("offer %v is not in orderbook graph", row.OfferID), ) } if equal, err := offerEntryEquals(graphOffer, *entry.Data.Offer); err != nil { return errors.Wrap(err, "could not compare offers") } else if !equal { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "offer %v from db does not match offer in orderbook graph", row.OfferID, @@ -521,7 +522,7 @@ func addTrustLinesToStateVerifier( return err } if err := assetStats.Add(trustline); err != nil { - return verify.NewStateError( + return ingesterrors.NewStateError( errors.Wrap(err, "could not add trustline to asset stats"), ) } diff --git a/xdr/ledger_entry_change.go b/xdr/ledger_entry_change.go index 97df1f21b5..6330385871 100644 --- a/xdr/ledger_entry_change.go +++ b/xdr/ledger_entry_change.go @@ -1,6 +1,9 @@ package xdr -import "fmt" +import ( + "encoding/base64" + "fmt" +) // EntryType is a helper to get at the entry type for a change. func (change *LedgerEntryChange) EntryType() LedgerEntryType { @@ -27,6 +30,17 @@ func (change *LedgerEntryChange) LedgerKey() LedgerKey { } } +// MarshalBinaryBase64 marshals XDR into a binary form and then encodes it +// using base64. +func (change LedgerEntryChange) MarshalBinaryBase64() (string, error) { + b, err := change.MarshalBinary() + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(b), nil +} + // GetLedgerEntry returns the ledger entry that was changed in `change`, along // with a boolean indicating whether the entry value was valid. func (change *LedgerEntryChange) GetLedgerEntry() (LedgerEntry, bool) { diff --git a/xdr/ledger_key.go b/xdr/ledger_key.go index 170c24d05d..cf58661723 100644 --- a/xdr/ledger_key.go +++ b/xdr/ledger_key.go @@ -1,6 +1,7 @@ package xdr import ( + "encoding/base64" "fmt" "strings" ) @@ -146,3 +147,14 @@ func (key LedgerKey) MarshalBinaryCompress() ([]byte, error) { return m, nil } + +// MarshalBinaryBase64 marshals XDR into a binary form and then encodes it +// using base64. +func (key LedgerKey) MarshalBinaryBase64() (string, error) { + b, err := key.MarshalBinary() + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(b), nil +}