From 6398a583abe1d5747f543ee7e80eb377437110e6 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 12 Dec 2019 18:06:11 +0100 Subject: [PATCH 1/6] exp/ingest/io: Add LedgerEntryChangeCache to improve performance of tx meta (#2004) This commit adds `exp/ingest/io.LedgerEntryChangeCache` that squashes all the ledger entry changes. This can be later used to decrease number of DB queries when applying them. See #2003. Some ledgers that add a lot of changes connected to a small set of entries are causing a performance issues because every ledger entry change is applied to a DB. `LedgerEntryChangeCache` solves this problem because it makes holds a final version of a ledger entry after all the changes. Before this fix, extreme cases when two accounts send a payment between each other 1000 times in a ledger required 3000 DB updates (2000 account changes due to payment and 500 fee meta per account). After the fix, it requires just 2 DB updates. Algorithm used in `LedgerEntryChangeCache` is explained below: 1. If the change is CREATED it checks if any change connected to given entry is already in the cache. If not, it adds CREATED change. Otherwise, if existing change is: a. CREATED it returns error because we can't add an entry that already exists. b. UPDATED it returns error because we can't add an entry that already exists. c. REMOVED it means that due to previous transitions we want to remove this from a DB what means that it already exists in a DB so we need to update the type of change to UPDATED. 2. If the change is UPDATE it checks if any change connected to given entry is already in the cache. If not, it adds UPDATE change. Otherwise, if existing change is: a. CREATED it means that due to previous transitions we want to create this in a DB what means that it doesn't exist in a DB so we need to update the entry but stay with CREATED type. b. UPDATED we simply update it with the new value. c. REMOVED it means that at this point in the ledger the entry is removed so updating it returns an error. 3. If the change is REMOVE it checks if any change connected to given entry is already in the cache. If not, it adds REMOVE change. Otherwise, if existing change is: a. CREATED it means that due to previous transitions we want to create this in a DB what means that it doesn't exist in a DB. If it was created and removed in the same ledger it's a noop so we remove entry from the cache. b. UPDATED we simply update it to be a REMOVE change because the UPDATE change means the entry exists in a DB. c. REMOVED it returns error because we can't remove an entry that was already removed. --- exp/ingest/errors/main.go | 13 + exp/ingest/io/ledger_entry_change_cache.go | 217 ++++++++++ .../io/ledger_entry_change_cache_test.go | 397 ++++++++++++++++++ exp/ingest/io/ledger_transaction.go | 14 + exp/ingest/verify/main.go | 29 +- exp/ingest/verify/main_test.go | 3 +- .../horizon/internal/expingest/pipelines.go | 6 +- .../accounts_data_processor_test.go | 17 +- .../processors/accounts_processor_test.go | 18 +- .../accounts_signer_processor_test.go | 67 ++- .../processors/database_processor.go | 108 +++-- .../processors/ledgers_processor_test.go | 10 +- .../processors/offers_processor_test.go | 99 +++-- .../processors/trust_lines_processor_test.go | 44 +- services/horizon/internal/expingest/verify.go | 17 +- xdr/ledger_entry_change.go | 16 +- xdr/ledger_key.go | 12 + 17 files changed, 845 insertions(+), 242 deletions(-) create mode 100644 exp/ingest/errors/main.go create mode 100644 exp/ingest/io/ledger_entry_change_cache.go create mode 100644 exp/ingest/io/ledger_entry_change_cache_test.go diff --git a/exp/ingest/errors/main.go b/exp/ingest/errors/main.go new file mode 100644 index 0000000000..8d41f1538b --- /dev/null +++ b/exp/ingest/errors/main.go @@ -0,0 +1,13 @@ +package errors + +// StateError are errors indicating invalid state. Type is used to differentiate +// between network, i/o, marshaling, bad usage etc. errors and actual state errors. +// You can use type assertion or type switch to check for type. +type StateError struct { + error +} + +// NewStateError creates a new StateError. +func NewStateError(err error) StateError { + return StateError{err} +} diff --git a/exp/ingest/io/ledger_entry_change_cache.go b/exp/ingest/io/ledger_entry_change_cache.go new file mode 100644 index 0000000000..8ce5963791 --- /dev/null +++ b/exp/ingest/io/ledger_entry_change_cache.go @@ -0,0 +1,217 @@ +package io + +import ( + "sync" + + ingesterrors "github.com/stellar/go/exp/ingest/errors" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +// LedgerEntryChangeCache is a cache of ledger entry changes that squashes all +// changes within a single ledger. By doing this, it decreases number of DB +// queries sent to a DB to update the current state of the ledger. +// It has integrity checks built in so ex. removing an account that was +// previously removed returns an error. In such case verify.StateError is +// returned. +// +// It applies changes to the cache using the following algorithm: +// +// 1. If the change is CREATED it checks if any change connected to given entry +// is already in the cache. If not, it adds CREATED change. Otherwise, if +// existing change is: +// a. CREATED it returns error because we can't add an entry that already +// exists. +// b. UPDATED it returns error because we can't add an entry that already +// exists. +// c. REMOVED it means that due to previous transitions we want to remove +// this from a DB what means that it already exists in a DB so we need to +// update the type of change to UPDATED. +// 2. If the change is UPDATE it checks if any change connected to given entry +// is already in the cache. If not, it adds UPDATE change. Otherwise, if +// existing change is: +// a. CREATED it means that due to previous transitions we want to create +// this in a DB what means that it doesn't exist in a DB so we need to +// update the entry but stay with CREATED type. +// b. UPDATED we simply update it with the new value. +// c. REMOVED it means that at this point in the ledger the entry is removed +// so updating it returns an error. +// 3. If the change is REMOVE it checks if any change connected to given entry +// is already in the cache. If not, it adds REMOVE change. Otherwise, if +// existing change is: +// a. CREATED it means that due to previous transitions we want to create +// this in a DB what means that it doesn't exist in a DB. If it was +// created and removed in the same ledger it's a noop so we remove entry +// from the cache. +// b. UPDATED we simply update it to be a REMOVE change because the UPDATE +// change means the entry exists in a DB. +// c. REMOVED it returns error because we can't remove an entry that was +// already removed. +type LedgerEntryChangeCache struct { + // ledger key => Change + cache map[string]Change + mutex sync.Mutex +} + +// NewLedgerEntryChangeCache returns a new LedgerEntryChangeCache. +func NewLedgerEntryChangeCache() *LedgerEntryChangeCache { + return &LedgerEntryChangeCache{ + cache: make(map[string]Change), + } +} + +// AddChange adds a change to LedgerEntryChangeCache. All changes are stored +// in memory. To get the final, squashed changes call GetChanges. +// +// Please note that the current ledger capacity in pubnet (max 1000 ops/ledger) +// makes LedgerEntryChangeCache safe to use in terms of memory usage. If the +// cache takes too much memory, you apply changes returned by GetChanges and +// create a new LedgerEntryChangeCache object to continue ingestion. +func (c *LedgerEntryChangeCache) AddChange(change Change) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + switch { + case change.Pre == nil && change.Post != nil: + return c.addCreatedChange(change) + case change.Pre != nil && change.Post != nil: + return c.addUpdatedChange(change) + case change.Pre != nil && change.Post == nil: + return c.addRemovedChange(change) + default: + return errors.New("Unknown entry change state") + } +} + +// addCreatedChange adds a change to the cache, but returns an error if create +// change is unexpected. +func (c *LedgerEntryChangeCache) addCreatedChange(change Change) error { + ledgerKeyString, err := change.Post.LedgerKey().MarshalBinaryBase64() + if err != nil { + return errors.Wrap(err, "Error MarshalBinaryBase64") + } + + existingChange, exist := c.cache[ledgerKeyString] + if !exist { + c.cache[ledgerKeyString] = change + return nil + } + + switch existingChange.LedgerEntryChangeType() { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + return ingesterrors.NewStateError(errors.Errorf( + "can't create an entry that already exists (ledger key = %s)", + ledgerKeyString, + )) + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + return ingesterrors.NewStateError(errors.Errorf( + "can't create an entry that already exists (ledger key = %s)", + ledgerKeyString, + )) + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + // If existing type is removed it means that this entry does exist + // in a DB so we update entry change. + c.cache[ledgerKeyString] = Change{ + Type: change.Post.LedgerKey().Type, + Pre: existingChange.Pre, + Post: change.Post, + } + default: + return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType()) + } + + return nil +} + +// addUpdatedChange adds a change to the cache, but returns an error if update +// change is unexpected. +func (c *LedgerEntryChangeCache) addUpdatedChange(change Change) error { + ledgerKeyString, err := change.Post.LedgerKey().MarshalBinaryBase64() + if err != nil { + return errors.Wrap(err, "Error MarshalBinaryBase64") + } + + existingChange, exist := c.cache[ledgerKeyString] + if !exist { + c.cache[ledgerKeyString] = change + return nil + } + + switch existingChange.LedgerEntryChangeType() { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + // If existing type is created it means that this entry does not + // exist in a DB so we update entry change. + c.cache[ledgerKeyString] = Change{ + Type: change.Post.LedgerKey().Type, + Pre: existingChange.Pre, // = nil + Post: change.Post, + } + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + c.cache[ledgerKeyString] = Change{ + Type: change.Post.LedgerKey().Type, + Pre: existingChange.Pre, + Post: change.Post, + } + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + return ingesterrors.NewStateError(errors.Errorf( + "can't update an entry that was previously removed (ledger key = %s)", + ledgerKeyString, + )) + default: + return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type) + } + + return nil +} + +// addRemovedChange adds a change to the cache, but returns an error if remove +// change is unexpected. +func (c *LedgerEntryChangeCache) addRemovedChange(change Change) error { + ledgerKeyString, err := change.Pre.LedgerKey().MarshalBinaryBase64() + if err != nil { + return errors.Wrap(err, "Error MarshalBinaryBase64") + } + + existingChange, exist := c.cache[ledgerKeyString] + if !exist { + c.cache[ledgerKeyString] = change + return nil + } + + switch existingChange.LedgerEntryChangeType() { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + // If existing type is created it means that this will be no op. + // Entry was created and is now removed in a single ledger. + delete(c.cache, ledgerKeyString) + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + c.cache[ledgerKeyString] = Change{ + Type: change.Pre.LedgerKey().Type, + Pre: existingChange.Pre, + Post: nil, + } + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + return ingesterrors.NewStateError(errors.Errorf( + "can't remove an entry that was previously removed (ledger key = %s)", + ledgerKeyString, + )) + default: + return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type) + } + + return nil +} + +// GetChanges returns a slice of Changes in the cache. The order of changes is +// random but each change is connected to a separate entry. +func (c *LedgerEntryChangeCache) GetChanges() []Change { + c.mutex.Lock() + defer c.mutex.Unlock() + + changes := make([]Change, 0, len(c.cache)) + + for _, entryChange := range c.cache { + changes = append(changes, entryChange) + } + + return changes +} diff --git a/exp/ingest/io/ledger_entry_change_cache_test.go b/exp/ingest/io/ledger_entry_change_cache_test.go new file mode 100644 index 0000000000..c2132655f2 --- /dev/null +++ b/exp/ingest/io/ledger_entry_change_cache_test.go @@ -0,0 +1,397 @@ +package io + +import ( + "testing" + + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +func TestLedgerEntryChangeCacheExistingCreated(t *testing.T) { + suite.Run(t, new(TestLedgerEntryChangeCacheExistingCreatedSuite)) +} + +// TestLedgerEntryChangeCacheExistingCreatedSuite tests transitions from +// existing CREATED state in the cache. +type TestLedgerEntryChangeCacheExistingCreatedSuite struct { + suite.Suite + cache *LedgerEntryChangeCache +} + +func (s *TestLedgerEntryChangeCacheExistingCreatedSuite) SetupTest() { + s.cache = NewLedgerEntryChangeCache() + + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryCreated) +} + +func (s *TestLedgerEntryChangeCacheExistingCreatedSuite) TestChangeCreated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().EqualError( + s.cache.AddChange(change), + "can't create an entry that already exists (ledger key = AAAAAAAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWg==)", + ) +} + +func (s *TestLedgerEntryChangeCacheExistingCreatedSuite) TestChangeUpdated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryCreated) +} + +func (s *TestLedgerEntryChangeCacheExistingCreatedSuite) TestChangeRemoved() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: nil, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 0) +} + +func TestLedgerEntryChangeCacheExistingUpdated(t *testing.T) { + suite.Run(t, new(TestLedgerEntryChangeCacheExistingUpdatedSuite)) +} + +// TestLedgerEntryChangeCacheExistingUpdatedSuite tests transitions from +// existing UPDATED state in the cache. +type TestLedgerEntryChangeCacheExistingUpdatedSuite struct { + suite.Suite + cache *LedgerEntryChangeCache +} + +func (s *TestLedgerEntryChangeCacheExistingUpdatedSuite) SetupTest() { + s.cache = NewLedgerEntryChangeCache() + + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryUpdated) +} + +func (s *TestLedgerEntryChangeCacheExistingUpdatedSuite) TestChangeCreated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().EqualError( + s.cache.AddChange(change), + "can't create an entry that already exists (ledger key = AAAAAAAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWg==)", + ) +} + +func (s *TestLedgerEntryChangeCacheExistingUpdatedSuite) TestChangeUpdated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryUpdated) + s.Assert().Equal(changes[0].Post.LastModifiedLedgerSeq, xdr.Uint32(12)) +} + +func (s *TestLedgerEntryChangeCacheExistingUpdatedSuite) TestChangeRemoved() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: nil, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryRemoved) +} + +func TestLedgerEntryChangeCacheExistingRemoved(t *testing.T) { + suite.Run(t, new(TestLedgerEntryChangeCacheExistingRemovedSuite)) +} + +// TestLedgerEntryChangeCacheExistingRemovedSuite tests transitions from +// existing REMOVED state in the cache. +type TestLedgerEntryChangeCacheExistingRemovedSuite struct { + suite.Suite + cache *LedgerEntryChangeCache +} + +func (s *TestLedgerEntryChangeCacheExistingRemovedSuite) SetupTest() { + s.cache = NewLedgerEntryChangeCache() + + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: nil, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryRemoved) +} + +func (s *TestLedgerEntryChangeCacheExistingRemovedSuite) TestChangeCreated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().NoError(s.cache.AddChange(change)) + changes := s.cache.GetChanges() + s.Assert().Len(changes, 1) + s.Assert().Equal(changes[0].LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryUpdated) + s.Assert().Equal(changes[0].Post.LastModifiedLedgerSeq, xdr.Uint32(12)) +} + +func (s *TestLedgerEntryChangeCacheExistingRemovedSuite) TestChangeUpdated() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + } + s.Assert().EqualError( + s.cache.AddChange(change), + "can't update an entry that was previously removed (ledger key = AAAAAAAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWg==)", + ) +} + +func (s *TestLedgerEntryChangeCacheExistingRemovedSuite) TestChangeRemoved() { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 11, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + }, + }, + }, + Post: nil, + } + s.Assert().EqualError( + s.cache.AddChange(change), + "can't remove an entry that was previously removed (ledger key = AAAAAAAAAAC2LgFRDBZ3J52nLm30kq2iMgrO7dYzYAN3hvjtf1IHWg==)", + ) +} + +// TestLedgerEntryChangeCacheSquashMultiplePayments simulates sending multiple +// payments between two accounts. Ledger cache should squash multiple changes +// into just two. +// GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ sends money +// GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML receives money +func TestLedgerEntryChangeCacheSquashMultiplePayments(t *testing.T) { + cache := NewLedgerEntryChangeCache() + + for i := 1; i <= 1000; i++ { + change := Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ"), + Balance: xdr.Int64(2000 - i + 1), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ"), + Balance: xdr.Int64(2000 - i), + }, + }, + }, + } + assert.NoError(t, cache.AddChange(change)) + + change = Change{ + Type: xdr.LedgerEntryTypeAccount, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: xdr.Int64(2000 + i - 1), + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + Balance: xdr.Int64(2000 + i), + }, + }, + }, + } + assert.NoError(t, cache.AddChange(change)) + } + + changes := cache.GetChanges() + assert.Len(t, changes, 2) + for _, change := range changes { + assert.Equal(t, change.LedgerEntryChangeType(), xdr.LedgerEntryChangeTypeLedgerEntryUpdated) + account := change.Post.Data.MustAccount() + switch account.AccountId.Address() { + case "GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ": + assert.Equal(t, account.Balance, xdr.Int64(1000)) + case "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML": + assert.Equal(t, account.Balance, xdr.Int64(3000)) + default: + assert.Fail(t, "Invalid account") + } + } +} diff --git a/exp/ingest/io/ledger_transaction.go b/exp/ingest/io/ledger_transaction.go index 3af2f1346c..2c4623731e 100644 --- a/exp/ingest/io/ledger_transaction.go +++ b/exp/ingest/io/ledger_transaction.go @@ -20,6 +20,20 @@ type Change struct { Post *xdr.LedgerEntry } +// LedgerEntryChangeType returns type in terms of LedgerEntryChangeType. +func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { + switch { + case c.Pre == nil && c.Post != nil: + return xdr.LedgerEntryChangeTypeLedgerEntryCreated + case c.Pre != nil && c.Post == nil: + return xdr.LedgerEntryChangeTypeLedgerEntryRemoved + case c.Pre != nil && c.Post != nil: + return xdr.LedgerEntryChangeTypeLedgerEntryUpdated + default: + panic("Invalid state of Change (Pre == nil && Post == nil)") + } +} + // AccountChangedExceptSigners returns true if account has changed WITHOUT // checking the signers (except master key weight!). In other words, if the only // change is connected to signers, this function will return false. diff --git a/exp/ingest/verify/main.go b/exp/ingest/verify/main.go index eb7d1f8b5a..07245689f5 100644 --- a/exp/ingest/verify/main.go +++ b/exp/ingest/verify/main.go @@ -7,6 +7,7 @@ import ( "encoding/base64" stdio "io" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" @@ -20,18 +21,6 @@ import ( // that will be used for equality check. type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) -// StateError are errors indicating invalid state. Type is used to differentiate -// between network, i/o, marshaling, bad usage etc. errors and actual state errors. -// You can use type assertion or type switch to check for type. -type StateError struct { - error -} - -// NewStateError creates a new StateError. -func NewStateError(err error) StateError { - return StateError{err} -} - // StateVerifier verifies if ledger entries provided by Add method are the same // as in the checkpoint ledger entries provided by SingleLedgerStateReader. // The algorithm works in the following way: @@ -123,11 +112,11 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { expectedEntry, exist := v.currentEntries[key] if !exist { - return StateError{errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Cannot find entry in currentEntries map: %s (key = %s)", base64.StdEncoding.EncodeToString(actualEntryMarshaled), key, - )} + )) } delete(v.currentEntries, key) @@ -156,12 +145,12 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { } if !bytes.Equal(actualEntryMarshaled, expectedEntryMarshaled) { - return StateError{errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Entry does not match the fetched entry. Expected: %s (pretransform = %s), actual: %s", base64.StdEncoding.EncodeToString(expectedEntryMarshaled), base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled), base64.StdEncoding.EncodeToString(actualEntryMarshaled), - )} + )) } return nil @@ -186,11 +175,11 @@ func (v *StateVerifier) Verify(countAll int) error { } if v.readEntries != countAll { - return StateError{errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Number of entries read using GetEntries (%d) does not match number of entries in your storage (%d).", v.readEntries, countAll, - )} + )) } return nil @@ -206,11 +195,11 @@ func (v *StateVerifier) checkUnreadEntries() error { // Ignore error as StateError below is more important entryString, _ := xdr.MarshalBase64(entry) - return StateError{errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Entries (%d) not found locally, example: %s", len(v.currentEntries), entryString, - )} + )) } return nil diff --git a/exp/ingest/verify/main_test.go b/exp/ingest/verify/main_test.go index 8bd4a1fff7..0274dfb3bf 100644 --- a/exp/ingest/verify/main_test.go +++ b/exp/ingest/verify/main_test.go @@ -5,6 +5,7 @@ import ( stdio "io" "testing" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" @@ -13,7 +14,7 @@ import ( ) func assertStateError(t *testing.T, err error, expectStateError bool) { - _, ok := err.(StateError) + _, ok := err.(ingesterrors.StateError) if expectStateError { assert.True(t, ok, "err should be StateError") } else { diff --git a/services/horizon/internal/expingest/pipelines.go b/services/horizon/internal/expingest/pipelines.go index 0c0df48aa7..377289395c 100644 --- a/services/horizon/internal/expingest/pipelines.go +++ b/services/horizon/internal/expingest/pipelines.go @@ -5,9 +5,9 @@ import ( "fmt" "github.com/stellar/go/exp/ingest" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/pipeline" "github.com/stellar/go/exp/ingest/processors" - "github.com/stellar/go/exp/ingest/verify" "github.com/stellar/go/exp/orderbook" supportPipeline "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -230,7 +230,7 @@ func postProcessingHook( } switch errors.Cause(err).(type) { - case verify.StateError: + case ingesterrors.StateError: markStateInvalid(historySession, err) default: log. @@ -305,7 +305,7 @@ func postProcessingHook( if err != nil { errorCount := system.incrementStateVerificationErrors() switch errors.Cause(err).(type) { - case verify.StateError: + case ingesterrors.StateError: markStateInvalid(historySession, err) default: logger := log.WithField("err", err).Warn diff --git a/services/horizon/internal/expingest/processors/accounts_data_processor_test.go b/services/horizon/internal/expingest/processors/accounts_data_processor_test.go index 9621479626..591f463488 100644 --- a/services/horizon/internal/expingest/processors/accounts_data_processor_test.go +++ b/services/horizon/internal/expingest/processors/accounts_data_processor_test.go @@ -210,12 +210,6 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestNewAccount() { }), }, nil).Once() - s.mockQ.On( - "InsertAccountData", - data, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedData := xdr.DataEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), DataName: "test", @@ -252,8 +246,9 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestNewAccount() { }, }), }, nil).Once() + // We use LedgerEntryChangesCache so all changes are squashed s.mockQ.On( - "UpdateAccountData", + "InsertAccountData", updatedData, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -360,12 +355,6 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - s.mockQ.On( - "InsertAccountData", - data, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() @@ -402,7 +391,7 @@ func (s *AccountsDataProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.mockLedgerReader.On("Close").Return(nil).Once() s.mockQ.On( - "UpdateAccountData", + "InsertAccountData", modifiedData, lastModifiedLedgerSeq+1, ).Return(int64(1), nil).Once() diff --git a/services/horizon/internal/expingest/processors/accounts_processor_test.go b/services/horizon/internal/expingest/processors/accounts_processor_test.go index 1adfe3e692..2cf52c976a 100644 --- a/services/horizon/internal/expingest/processors/accounts_processor_test.go +++ b/services/horizon/internal/expingest/processors/accounts_processor_test.go @@ -208,12 +208,6 @@ func (s *AccountsProcessorTestSuiteLedger) TestNewAccount() { }), }, nil).Once() - s.mockQ.On( - "InsertAccount", - account, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedAccount := xdr.AccountEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Thresholds: [4]byte{0, 1, 2, 3}, @@ -259,8 +253,10 @@ func (s *AccountsProcessorTestSuiteLedger) TestNewAccount() { }, }), }, nil).Once() + + // We use LedgerEntryChangesCache so all changes are squashed s.mockQ.On( - "UpdateAccount", + "InsertAccount", updatedAccount, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -362,12 +358,6 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - s.mockQ.On( - "InsertAccount", - account, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedAccount := xdr.AccountEntry{ AccountId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), Thresholds: [4]byte{0, 1, 2, 3}, @@ -406,7 +396,7 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.mockLedgerReader.On("Close").Return(nil).Once() s.mockQ.On( - "UpdateAccount", + "InsertAccount", updatedAccount, lastModifiedLedgerSeq+1, ).Return(int64(1), nil).Once() diff --git a/services/horizon/internal/expingest/processors/accounts_signer_processor_test.go b/services/horizon/internal/expingest/processors/accounts_signer_processor_test.go index 951dd1b152..5ea019ec48 100644 --- a/services/horizon/internal/expingest/processors/accounts_signer_processor_test.go +++ b/services/horizon/internal/expingest/processors/accounts_signer_processor_test.go @@ -5,8 +5,8 @@ import ( stdio "io" "testing" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" - "github.com/stellar/go/exp/ingest/verify" supportPipeline "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -603,10 +603,6 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestRemoveAccount() { } func (s *AccountsSignerProcessorTestSuiteLedger) TestNewAccountNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{ @@ -630,6 +626,8 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestNewAccountNoRowsAffected() }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ. On( "CreateAccountSigner", @@ -647,20 +645,16 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestNewAccountNoRowsAffected() ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) s.Assert().EqualError( err, - "Error in AccountsForSigner handler: No rows affected when inserting "+ + "Error in AccountsForSigner handler: 0 rows affected when inserting "+ "account=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML "+ "signer=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML to database", ) } func (s *AccountsSignerProcessorTestSuiteLedger) TestRemoveAccountNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{ @@ -693,6 +687,8 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestRemoveAccountNoRowsAffected }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ. On( "RemoveAccountSigner", @@ -709,12 +705,13 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestRemoveAccountNoRowsAffected ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) s.Assert().EqualError( err, "Error in AccountsForSigner handler: Expected "+ "account=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML "+ - "signer=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML in database but not found when removing", + "signer=GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML in database but not found when removing "+ + "(rows affected = 0)", ) } @@ -773,34 +770,6 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - // Remove old signer - s.mockQ. - On( - "RemoveAccountSigner", - "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - "GCBBDQLCTNASZJ3MTKAOYEOWRGSHDFAJVI7VPZUOP7KXNHYR3HP2BUKV", - ). - Return(int64(1), nil).Once() - - // Create new and old signer - s.mockQ. - On( - "CreateAccountSigner", - "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - "GCBBDQLCTNASZJ3MTKAOYEOWRGSHDFAJVI7VPZUOP7KXNHYR3HP2BUKV", - int32(10), - ). - Return(int64(1), nil).Once() - - s.mockQ. - On( - "CreateAccountSigner", - "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - "GCAHY6JSXQFKWKP6R7U5JPXDVNV4DJWOWRFLY3Y6YPBF64QRL4BPFDNS", - int32(15), - ). - Return(int64(1), nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() @@ -836,13 +805,17 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestProcessUpgradeChange() { Key: xdr.MustSigner("GCBBDQLCTNASZJ3MTKAOYEOWRGSHDFAJVI7VPZUOP7KXNHYR3HP2BUKV"), Weight: 12, }, + xdr.Signer{ + Key: xdr.MustSigner("GCAHY6JSXQFKWKP6R7U5JPXDVNV4DJWOWRFLY3Y6YPBF64QRL4BPFDNS"), + Weight: 15, + }, }, }, }, }, }, nil).Once() - // Update signer + // Remove old signer s.mockQ. On( "RemoveAccountSigner", @@ -851,6 +824,7 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestProcessUpgradeChange() { ). Return(int64(1), nil).Once() + // Create new and old (updated) signer s.mockQ. On( "CreateAccountSigner", @@ -860,6 +834,15 @@ func (s *AccountsSignerProcessorTestSuiteLedger) TestProcessUpgradeChange() { ). Return(int64(1), nil).Once() + s.mockQ. + On( + "CreateAccountSigner", + "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + "GCAHY6JSXQFKWKP6R7U5JPXDVNV4DJWOWRFLY3Y6YPBF64QRL4BPFDNS", + int32(15), + ). + Return(int64(1), nil).Once() + s.mockLedgerReader. On("ReadUpgradeChange"). Return(io.Change{}, stdio.EOF).Once() diff --git a/services/horizon/internal/expingest/processors/database_processor.go b/services/horizon/internal/expingest/processors/database_processor.go index 3610ea1e21..44f95b790f 100644 --- a/services/horizon/internal/expingest/processors/database_processor.go +++ b/services/horizon/internal/expingest/processors/database_processor.go @@ -7,9 +7,9 @@ import ( stdio "io" "math/big" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" ingestpipeline "github.com/stellar/go/exp/ingest/pipeline" - "github.com/stellar/go/exp/ingest/verify" "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -191,6 +191,7 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S }() defer w.Close() + ledgerCache := io.NewLedgerEntryChangeCache() p.AssetStatSet = AssetStatSet{} actionHandlers := map[DatabaseProcessorActionType]func(change io.Change) error{ @@ -231,22 +232,13 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S failedTxCount++ } - for _, action := range actions { - handler, ok := actionHandlers[action] - if !ok { - return errors.New("Unknown action") - } - - // Remember that it's possible that transaction can remove a preauth - // tx signer even when it's a failed transaction. - + // Remember that it's possible that transaction can remove a preauth + // tx signer even when it's a failed transaction. + if p.Action != Ledgers { for _, change := range transaction.GetChanges() { - err := handler(change) + err := ledgerCache.AddChange(change) if err != nil { - return errors.Wrap( - err, - fmt.Sprintf("Error in %s handler", action), - ) + return errors.Wrap(err, "error addint to ledgerCache") } } } @@ -260,22 +252,39 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S } // Process upgrades meta - for { - change, err := r.ReadUpgradeChange() - if err != nil { - if err == stdio.EOF { - break - } else { - return err + if p.Action != Ledgers { + for { + change, err := r.ReadUpgradeChange() + if err != nil { + if err == stdio.EOF { + break + } else { + return err + } + } + + err = ledgerCache.AddChange(change) + if err != nil { + return errors.Wrap(err, "error addint to ledgerCache") } - } - for _, action := range actions { - handler, ok := actionHandlers[action] - if !ok { - return errors.New("Unknown action") + select { + case <-ctx.Done(): + return nil + default: + continue } + } + } + + changes := ledgerCache.GetChanges() + for _, action := range actions { + handler, ok := actionHandlers[action] + if !ok { + return errors.New("Unknown action") + } + for _, change := range changes { err := handler(change) if err != nil { return errors.Wrap( @@ -312,7 +321,7 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S if assetStatNotFound { // Insert if delta.NumAccounts < 0 { - return verify.NewStateError(errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "NumAccounts negative but DB entry does not exist for asset: %s %s %s", delta.AssetType, delta.AssetCode, @@ -343,7 +352,7 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S if statAccounts == 0 { // Remove stats if statBalance.Cmp(big.NewInt(0)) != 0 { - return verify.NewStateError(errors.Errorf( + return ingesterrors.NewStateError(errors.Errorf( "Removing asset stat by final amount non-zero for: %s %s %s", delta.AssetType, delta.AssetCode, @@ -374,8 +383,9 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when adjusting asset stat for asset: %s %s %s", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when adjusting asset stat for asset: %s %s %s", + rowsAffected, delta.AssetType, delta.AssetCode, delta.AssetIssuer, @@ -415,9 +425,9 @@ func (p *DatabaseProcessor) ingestLedgerHeader( if rowsAffected != 1 { log.WithField("rowsAffected", rowsAffected). WithField("sequence", r.GetSequence()). - Error("No rows affected when ingesting new ledger") + Error("Invalid number of rows affected when ingesting new ledger") return errors.Errorf( - "No rows affected when ingesting new ledger: %v", + "0 rows affected when ingesting new ledger: %v", r.GetSequence(), ) } @@ -495,8 +505,9 @@ func (p *DatabaseProcessor) processLedgerAccounts(change io.Change) error { } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when %s account %s", + return ingesterrors.NewStateError(errors.Errorf( + "%d No rows affected when %s account %s", + rowsAffected, action, accountID, )) @@ -550,8 +561,9 @@ func (p *DatabaseProcessor) processLedgerAccountData(change io.Change) error { } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when %s data: %s %s", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when %s data: %s %s", + rowsAffected, action, ledgerKey.Data.AccountId.Address(), ledgerKey.Data.DataName, @@ -581,10 +593,11 @@ func (p *DatabaseProcessor) processLedgerAccountSigners(change io.Change) error } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "Expected account=%s signer=%s in database but not found when removing", + return ingesterrors.NewStateError(errors.Errorf( + "Expected account=%s signer=%s in database but not found when removing (rows affected = %d)", preAccountEntry.AccountId.Address(), signer, + rowsAffected, )) } } @@ -599,8 +612,9 @@ func (p *DatabaseProcessor) processLedgerAccountSigners(change io.Change) error } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when inserting account=%s signer=%s to database", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when inserting account=%s signer=%s to database", + rowsAffected, postAccountEntry.AccountId.Address(), signer, )) @@ -647,8 +661,9 @@ func (p *DatabaseProcessor) processLedgerOffers(change io.Change) error { } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when %s offer %d", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when %s offer %d", + rowsAffected, action, offerID, )) @@ -700,7 +715,7 @@ func (p *DatabaseProcessor) adjustAssetStat( // else, trustline was unauthorized and remains unauthorized // so there is no change to accounts or balances } else { - return verify.NewStateError(errors.New("both pre and post trustlines cannot be nil")) + return ingesterrors.NewStateError(errors.New("both pre and post trustlines cannot be nil")) } err := p.AssetStatSet.AddDelta(trustline.Asset, int64(deltaBalance), deltaAccounts) @@ -768,8 +783,9 @@ func (p *DatabaseProcessor) processLedgerTrustLines(change io.Change) error { } if rowsAffected != 1 { - return verify.NewStateError(errors.Errorf( - "No rows affected when %s trustline: %s %s", + return ingesterrors.NewStateError(errors.Errorf( + "%d rows affected when %s trustline: %s %s", + rowsAffected, action, ledgerKey.TrustLine.AccountId.Address(), ledgerKey.TrustLine.Asset.String(), diff --git a/services/horizon/internal/expingest/processors/ledgers_processor_test.go b/services/horizon/internal/expingest/processors/ledgers_processor_test.go index f3a0733a8b..c250d8cbb1 100644 --- a/services/horizon/internal/expingest/processors/ledgers_processor_test.go +++ b/services/horizon/internal/expingest/processors/ledgers_processor_test.go @@ -74,10 +74,6 @@ func (s *LedgersProcessorTestSuiteLedger) SetupTest() { s.mockLedgerReader.On("GetSequence").Return(uint32(20)).Maybe() - s.mockLedgerReader. - On("ReadUpgradeChange"). - Return(io.Change{}, stdio.EOF).Once() - s.mockLedgerReader. On("Read"). Return(createTransaction(true, 1), nil).Once() @@ -120,10 +116,6 @@ func (s *LedgersProcessorTestSuiteLedger) TestInsertExpLedgerIgnoredWhenNotDatab On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() - s.mockLedgerReader. - On("ReadUpgradeChange"). - Return(io.Change{}, stdio.EOF).Once() - s.mockLedgerReader. On("Close"). Return(nil).Once() @@ -261,5 +253,5 @@ func (s *LedgersProcessorTestSuiteLedger) TestInsertExpLedgerNoRowsAffected() { s.mockLedgerWriter, ) s.Assert().Error(err) - s.Assert().EqualError(err, "No rows affected when ingesting new ledger: 20") + s.Assert().EqualError(err, "0 rows affected when ingesting new ledger: 20") } diff --git a/services/horizon/internal/expingest/processors/offers_processor_test.go b/services/horizon/internal/expingest/processors/offers_processor_test.go index b4f34c198b..6101f6e74a 100644 --- a/services/horizon/internal/expingest/processors/offers_processor_test.go +++ b/services/horizon/internal/expingest/processors/offers_processor_test.go @@ -5,8 +5,8 @@ import ( stdio "io" "testing" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" - "github.com/stellar/go/exp/ingest/verify" supportPipeline "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -56,8 +56,9 @@ func (s *OffersProcessorTestSuiteState) TearDownTest() { func (s *OffersProcessorTestSuiteState) TestCreateOffer() { offer := xdr.OfferEntry{ - OfferId: xdr.Int64(1), - Price: xdr.Price{1, 2}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(1), + Price: xdr.Price{1, 2}, } lastModifiedLedgerSeq := xdr.Uint32(123) s.mockStateReader. @@ -164,8 +165,9 @@ func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { // add offer offer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 2}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 2}, } lastModifiedLedgerSeq := xdr.Uint32(1234) s.mockLedgerReader.On("Read"). @@ -189,15 +191,10 @@ func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { }), }, nil).Once() - s.mockQ.On( - "InsertOffer", - offer, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedOffer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 6}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 6}, } s.mockLedgerReader.On("Read"). Return(io.LedgerTransaction{ @@ -230,8 +227,10 @@ func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { }, }), }, nil).Once() + + // We use LedgerEntryChangesCache so all changes are squashed s.mockQ.On( - "UpdateOffer", + "InsertOffer", updatedOffer, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -251,19 +250,17 @@ func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { } func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - lastModifiedLedgerSeq := xdr.Uint32(1234) offer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 2}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 2}, } updatedOffer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 6}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 6}, } s.mockLedgerReader.On("Read"). Return(io.LedgerTransaction{ @@ -296,6 +293,8 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { }, }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ.On( "UpdateOffer", updatedOffer, @@ -310,8 +309,8 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "Error in Offers handler: No rows affected when updating offer 2") + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "Error in Offers handler: 0 rows affected when updating offer 2") } func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { @@ -327,8 +326,9 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { Data: xdr.LedgerEntryData{ Type: xdr.LedgerEntryTypeOffer, Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(3), - Price: xdr.Price{3, 1}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), + Price: xdr.Price{3, 1}, }, }, }, @@ -338,7 +338,8 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { Removed: &xdr.LedgerKey{ Type: xdr.LedgerEntryTypeOffer, Offer: &xdr.LedgerKeyOffer{ - OfferId: xdr.Int64(3), + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), }, }, }, @@ -347,15 +348,15 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { }), }, nil).Once() + s.mockLedgerReader. + On("Read"). + Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ.On( "RemoveOffer", xdr.Int64(3), ).Return(int64(1), nil).Once() - s.mockLedgerReader. - On("Read"). - Return(io.LedgerTransaction{}, stdio.EOF).Once() - err := s.processor.ProcessLedger( context.Background(), &supportPipeline.Store{}, @@ -372,8 +373,9 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { // add offer offer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 2}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 2}, } lastModifiedLedgerSeq := xdr.Uint32(1234) s.mockLedgerReader.On("Read"). @@ -397,19 +399,14 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - s.mockQ.On( - "InsertOffer", - offer, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() updatedOffer := xdr.OfferEntry{ - OfferId: xdr.Int64(2), - Price: xdr.Price{1, 6}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(2), + Price: xdr.Price{1, 6}, } s.mockLedgerReader. @@ -433,8 +430,9 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { }, }, nil).Once() + // We use LedgerEntryChangesCache so all changes are squashed s.mockQ.On( - "UpdateOffer", + "InsertOffer", updatedOffer, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -456,10 +454,6 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { } func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - // add offer s.mockLedgerReader.On("Read"). Return(io.LedgerTransaction{ @@ -472,8 +466,9 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { Data: xdr.LedgerEntryData{ Type: xdr.LedgerEntryTypeOffer, Offer: &xdr.OfferEntry{ - OfferId: xdr.Int64(3), - Price: xdr.Price{3, 1}, + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), + Price: xdr.Price{3, 1}, }, }, }, @@ -483,7 +478,8 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { Removed: &xdr.LedgerKey{ Type: xdr.LedgerEntryTypeOffer, Offer: &xdr.LedgerKeyOffer{ - OfferId: xdr.Int64(3), + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), }, }, }, @@ -491,6 +487,7 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { }, }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() s.mockQ.On( "RemoveOffer", @@ -505,6 +502,6 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOfferNoRowsAffected() { ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "Error in Offers handler: No rows affected when removing offer 3") + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "Error in Offers handler: 0 rows affected when removing offer 3") } diff --git a/services/horizon/internal/expingest/processors/trust_lines_processor_test.go b/services/horizon/internal/expingest/processors/trust_lines_processor_test.go index 7e86bdbf87..bf396f2a9d 100644 --- a/services/horizon/internal/expingest/processors/trust_lines_processor_test.go +++ b/services/horizon/internal/expingest/processors/trust_lines_processor_test.go @@ -6,8 +6,8 @@ import ( stdio "io" "testing" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" - "github.com/stellar/go/exp/ingest/verify" supportPipeline "github.com/stellar/go/exp/support/pipeline" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" @@ -271,17 +271,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestInsertTrustLine() { }), }, nil).Once() - s.mockQ.On( - "InsertTrustLine", - trustLine, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - s.mockQ.On( - "InsertTrustLine", - unauthorizedTrustline, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - updatedTrustLine := xdr.TrustLineEntry{ AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"), Asset: xdr.MustNewCreditAsset("EUR", trustLineIssuer.Address()), @@ -347,12 +336,12 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestInsertTrustLine() { }), }, nil).Once() s.mockQ.On( - "UpdateTrustLine", + "InsertTrustLine", updatedTrustLine, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() s.mockQ.On( - "UpdateTrustLine", + "InsertTrustLine", updatedUnauthorizedTrustline, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -478,10 +467,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLine() { } func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLineNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - lastModifiedLedgerSeq := xdr.Uint32(1234) trustLine := xdr.TrustLineEntry{ @@ -527,6 +512,8 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLineNoRowsAffected() }, }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() + s.mockQ.On( "UpdateTrustLine", updatedTrustLine, @@ -541,8 +528,8 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLineNoRowsAffected() ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "Error in TrustLines handler: No rows affected when updating trustline: GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB credit_alphanum4/EUR/GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "Error in TrustLines handler: 0 rows affected when updating trustline: GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB credit_alphanum4/EUR/GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") } func (s *TrustLinesProcessorTestSuiteLedger) TestUpdateTrustLineAuthorization() { @@ -817,12 +804,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestProcessUpgradeChange() { }), }, nil).Once() - s.mockQ.On( - "InsertTrustLine", - trustLine, - lastModifiedLedgerSeq, - ).Return(int64(1), nil).Once() - s.mockLedgerReader. On("Read"). Return(io.LedgerTransaction{}, stdio.EOF).Once() @@ -856,7 +837,7 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestProcessUpgradeChange() { }, nil).Once() s.mockQ.On( - "UpdateTrustLine", + "InsertTrustLine", updatedTrustLine, lastModifiedLedgerSeq, ).Return(int64(1), nil).Once() @@ -891,10 +872,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestProcessUpgradeChange() { } func (s *TrustLinesProcessorTestSuiteLedger) TestRemoveTrustlineNoRowsAffected() { - // Removes ReadUpgradeChange assertion - s.mockLedgerReader = &io.MockLedgerReader{} - s.mockLedgerReader.On("Close").Return(nil).Once() - s.mockLedgerReader.On("Read"). Return(io.LedgerTransaction{ Meta: createTransactionMeta([]xdr.OperationMeta{ @@ -928,6 +905,7 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestRemoveTrustlineNoRowsAffected() }, }), }, nil).Once() + s.mockLedgerReader.On("Read").Return(io.LedgerTransaction{}, stdio.EOF).Once() s.mockQ.On( "RemoveTrustLine", @@ -945,6 +923,6 @@ func (s *TrustLinesProcessorTestSuiteLedger) TestRemoveTrustlineNoRowsAffected() ) s.Assert().Error(err) - s.Assert().IsType(verify.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "Error in TrustLines handler: No rows affected when removing trustline: GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB credit_alphanum4/EUR/GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") + s.Assert().IsType(ingesterrors.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "Error in TrustLines handler: 0 rows affected when removing trustline: GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB credit_alphanum4/EUR/GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H") } diff --git a/services/horizon/internal/expingest/verify.go b/services/horizon/internal/expingest/verify.go index 1b5ebf8865..ef0e7011a2 100644 --- a/services/horizon/internal/expingest/verify.go +++ b/services/horizon/internal/expingest/verify.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stellar/go/exp/ingest/adapters" + ingesterrors "github.com/stellar/go/exp/ingest/errors" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/exp/ingest/verify" "github.com/stellar/go/services/horizon/internal/db2" @@ -195,7 +196,7 @@ func (s *System) verifyState(graphOffers map[xdr.Int64]xdr.OfferEntry) error { localLog.WithField("total", total).Info("Finished writing to StateVerifier") if len(graphOffers) != 0 { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "orderbook graph contains %v offers missing from HAS", len(graphOffers), @@ -255,7 +256,7 @@ func checkAssetStats(set processors.AssetStatSet, q *history.Q) error { for _, assetStat := range assetStats { fromSet, removed := set.Remove(assetStat.AssetType, assetStat.AssetCode, assetStat.AssetIssuer) if !removed { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "db contains asset stat with code %s issuer %s which is missing from HAS", assetStat.AssetCode, assetStat.AssetIssuer, @@ -264,7 +265,7 @@ func checkAssetStats(set processors.AssetStatSet, q *history.Q) error { } if fromSet != assetStat { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "db asset stat with code %s issuer %s does not match asset stat from HAS", assetStat.AssetCode, assetStat.AssetIssuer, @@ -277,7 +278,7 @@ func checkAssetStats(set processors.AssetStatSet, q *history.Q) error { } if len(set) > 0 { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "HAS contains %d more asset stats than db", len(set), @@ -327,7 +328,7 @@ func addAccountsToStateVerifier(verifier *verify.StateVerifier, q *history.Q, id // Ensure master weight matches, if not it's a state error! if int32(row.MasterWeight) != masterWeightMap[row.AccountID] { - return verify.NewStateError(errors.New("Master key weight in accounts does not match ")) + return ingesterrors.NewStateError(errors.New("Master key weight in accounts does not match ")) } account := &xdr.AccountEntry{ @@ -453,14 +454,14 @@ func addOffersToStateVerifier( } graphOffer, ok := graphOffers[row.OfferID] if !ok { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf("offer %v is not in orderbook graph", row.OfferID), ) } if equal, err := offerEntryEquals(graphOffer, *entry.Data.Offer); err != nil { return errors.Wrap(err, "could not compare offers") } else if !equal { - return verify.NewStateError( + return ingesterrors.NewStateError( fmt.Errorf( "offer %v from db does not match offer in orderbook graph", row.OfferID, @@ -521,7 +522,7 @@ func addTrustLinesToStateVerifier( return err } if err := assetStats.Add(trustline); err != nil { - return verify.NewStateError( + return ingesterrors.NewStateError( errors.Wrap(err, "could not add trustline to asset stats"), ) } diff --git a/xdr/ledger_entry_change.go b/xdr/ledger_entry_change.go index 551d4543dc..277b671d44 100644 --- a/xdr/ledger_entry_change.go +++ b/xdr/ledger_entry_change.go @@ -1,6 +1,9 @@ package xdr -import "fmt" +import ( + "encoding/base64" + "fmt" +) // EntryType is a helper to get at the entry type for a change. func (change *LedgerEntryChange) EntryType() LedgerEntryType { @@ -26,3 +29,14 @@ func (change *LedgerEntryChange) LedgerKey() LedgerKey { panic(fmt.Errorf("Unknown change type: %v", change.Type)) } } + +// MarshalBinaryBase64 marshals XDR into a binary form and then encodes it +// using base64. +func (change LedgerEntryChange) MarshalBinaryBase64() (string, error) { + b, err := change.MarshalBinary() + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(b), nil +} diff --git a/xdr/ledger_key.go b/xdr/ledger_key.go index 170c24d05d..cf58661723 100644 --- a/xdr/ledger_key.go +++ b/xdr/ledger_key.go @@ -1,6 +1,7 @@ package xdr import ( + "encoding/base64" "fmt" "strings" ) @@ -146,3 +147,14 @@ func (key LedgerKey) MarshalBinaryCompress() ([]byte, error) { return m, nil } + +// MarshalBinaryBase64 marshals XDR into a binary form and then encodes it +// using base64. +func (key LedgerKey) MarshalBinaryBase64() (string, error) { + b, err := key.MarshalBinary() + if err != nil { + return "", err + } + + return base64.StdEncoding.EncodeToString(b), nil +} From 76835eb61689d9de783e602ae116d3a852c44431 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 12 Dec 2019 20:23:26 +0100 Subject: [PATCH 2/6] Fix circleci publish_latest_horizon_docker_image job (#2049) Because CircleCI does not build tags by default, any child jobs that rely on a job that only builds on tags will also require the tag filter. In this case, the hold and publish_latest_horizon_docker_image jobs will need the filter for /^horizon-v.*/ tags as well, otherwise, they will not be picked up for running. --- .circleci/config.yml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c978708769..bf7c53f13b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -328,10 +328,20 @@ workflows: branches: ignore: /.*/ - hold: # <<< A job that will require manual approval in the CircleCI web application. + filters: + tags: + only: /^horizon-v.*/ + branches: + ignore: /.*/ type: approval # <<< This key-value pair will set your workflow to a status of "On Hold" requires: # We only run the "hold" job when publish_horizon_docker_image has succeeded - publish_horizon_docker_image - # Pushing stellar/horizon:latest to docker hub requires manual approval - publish_latest_horizon_docker_image: + filters: + tags: + only: /^horizon-v.*/ + branches: + ignore: /.*/ + # Pushing stellar/horizon:latest to docker hub requires manual approval requires: - hold From fb5f48910be5b2985ef38415f4e3fe7a0e846282 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 12 Dec 2019 22:57:33 +0100 Subject: [PATCH 3/6] exp/ingest: Fix fee and transaction meta processing (#2050) This commit changes `io.LedgerTransaction` to return fee and tx meta separately and updates Horizon processors to apply changes in correct order. This issue was found by `StateVerifier`. The order of applying meta changes in Horizon processors was incorrect. Fee changes must be applied before everything else [1]. In other words instead of processing meta like: TX1_FEE_META, TX1_TX_META, TX2_FEE_META, TX2_TX_META, ... we should do it like: TX1_FEE_META, TX2_FEE_META, TX1_TX_META, TX2_TX_META, ... ### Known limitations The current interface of pipeline processor doesn't make sense because all transactions need to be read into memory first to apply fee changes. We either need to refactor the processors or use @tamirms design where pipeline is removed. [1] https://github.com/stellar/stellar-core/blob/master/docs/integration.md --- exp/ingest/io/ledger_transaction.go | 12 ++- exp/ingest/io/ledger_transaction_test.go | 72 +++++++++++++++ services/horizon/internal/expingest/main.go | 4 +- .../processors/accounts_processor_test.go | 90 +++++++++++++++++++ .../processors/database_processor.go | 49 +++++++--- services/horizon/internal/expingest/verify.go | 2 +- 6 files changed, 210 insertions(+), 19 deletions(-) diff --git a/exp/ingest/io/ledger_transaction.go b/exp/ingest/io/ledger_transaction.go index 2c4623731e..6719f2d323 100644 --- a/exp/ingest/io/ledger_transaction.go +++ b/exp/ingest/io/ledger_transaction.go @@ -146,12 +146,16 @@ func (c *Change) AccountSignersChanged() bool { return false } +// GetFeeChanges returns a developer friendly representation of LedgerEntryChanges +// connected to fees. +func (t *LedgerTransaction) GetFeeChanges() []Change { + return getChangesFromLedgerEntryChanges(t.FeeChanges) +} + // GetChanges returns a developer friendly representation of LedgerEntryChanges. -// It contains fee changes, transaction changes and operation changes in that -// order. +// It contains transaction changes and operation changes in that order. func (t *LedgerTransaction) GetChanges() []Change { - // Fee meta - changes := getChangesFromLedgerEntryChanges(t.FeeChanges) + var changes []Change // Transaction meta switch t.Meta.V { diff --git a/exp/ingest/io/ledger_transaction_test.go b/exp/ingest/io/ledger_transaction_test.go index 0c2a312c49..fe80f02b27 100644 --- a/exp/ingest/io/ledger_transaction_test.go +++ b/exp/ingest/io/ledger_transaction_test.go @@ -17,6 +17,78 @@ func TestChangeAccountChangedExceptSignersInvalidType(t *testing.T) { }) } +func TestFeeAndMetaChangesSeparate(t *testing.T) { + tx := LedgerTransaction{ + FeeChanges: xdr.LedgerEntryChanges{ + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 100, + }, + }, + }, + }, + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 200, + }, + }, + }, + }, + }, + Meta: xdr.TransactionMeta{ + Operations: &[]xdr.OperationMeta{ + { + Changes: xdr.LedgerEntryChanges{ + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 300, + }, + }, + }, + }, + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 400, + }, + }, + }, + }, + }, + }, + }, + }} + + feeChanges := tx.GetFeeChanges() + assert.Len(t, feeChanges, 1) + assert.Equal(t, feeChanges[0].Pre.Data.MustAccount().Balance, xdr.Int64(100)) + assert.Equal(t, feeChanges[0].Post.Data.MustAccount().Balance, xdr.Int64(200)) + + metaChanges := tx.GetChanges() + assert.Len(t, metaChanges, 1) + assert.Equal(t, metaChanges[0].Pre.Data.MustAccount().Balance, xdr.Int64(300)) + assert.Equal(t, metaChanges[0].Post.Data.MustAccount().Balance, xdr.Int64(400)) +} + func TestChangeAccountChangedExceptSignersLastModifiedLedgerSeq(t *testing.T) { change := Change{ Type: xdr.LedgerEntryTypeAccount, diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 9159f203b1..77412c5e39 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -39,7 +39,9 @@ const ( // when preauth tx is failed. // - 9: Fixes a bug in asset stats processor that counted unauthorized // trustlines. - CurrentVersion = 9 + // - 10: Fixes a bug in meta processing (fees are now processed before + // everything else). + CurrentVersion = 10 ) var log = logpkg.DefaultLogger.WithField("service", "expingest") diff --git a/services/horizon/internal/expingest/processors/accounts_processor_test.go b/services/horizon/internal/expingest/processors/accounts_processor_test.go index 2cf52c976a..32c7bfe3a4 100644 --- a/services/horizon/internal/expingest/processors/accounts_processor_test.go +++ b/services/horizon/internal/expingest/processors/accounts_processor_test.go @@ -410,3 +410,93 @@ func (s *AccountsProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.Assert().NoError(err) } + +func (s *AccountsProcessorTestSuiteLedger) TestFeeProcessedBeforeEverythingElse() { + s.mockLedgerReader.On("Read"). + Return(io.LedgerTransaction{ + Meta: createTransactionMeta([]xdr.OperationMeta{ + xdr.OperationMeta{ + Changes: []xdr.LedgerEntryChange{ + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 100, + }, + }, + }, + }, + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 300, + }, + }, + }, + }, + }, + }, + }), + }, nil).Once() + + s.mockLedgerReader.On("Read"). + Return(io.LedgerTransaction{ + FeeChanges: []xdr.LedgerEntryChange{ + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 200, + }, + }, + }, + }, + xdr.LedgerEntryChange{ + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + Balance: 100, + }, + }, + }, + }, + }, + Meta: xdr.TransactionMeta{ + Operations: &[]xdr.OperationMeta{}, + }, + }, nil).Once() + + expectedAccount := xdr.AccountEntry{ + AccountId: xdr.MustAddress("GAHK7EEG2WWHVKDNT4CEQFZGKF2LGDSW2IVM4S5DP42RBW3K6BTODB4A"), + // If fee meta wasn't procesed before everything else, this would be 100 + Balance: 300, + } + + s.mockQ.On("UpdateAccount", expectedAccount, xdr.Uint32(0)).Return(int64(1), nil).Once() + + s.mockLedgerReader. + On("Read"). + Return(io.LedgerTransaction{}, stdio.EOF).Once() + + err := s.processor.ProcessLedger( + context.Background(), + &supportPipeline.Store{}, + s.mockLedgerReader, + s.mockLedgerWriter, + ) + + s.Assert().NoError(err) +} diff --git a/services/horizon/internal/expingest/processors/database_processor.go b/services/horizon/internal/expingest/processors/database_processor.go index 44f95b790f..7cf0ef442a 100644 --- a/services/horizon/internal/expingest/processors/database_processor.go +++ b/services/horizon/internal/expingest/processors/database_processor.go @@ -214,7 +214,8 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S var successTxCount, failedTxCount, opCount int - // Process transaction meta + // Get all transactions + var transactions []io.LedgerTransaction for { transaction, err := r.Read() if err != nil { @@ -232,27 +233,49 @@ func (p *DatabaseProcessor) ProcessLedger(ctx context.Context, store *pipeline.S failedTxCount++ } + transactions = append(transactions, transaction) + } + + if p.Action != Ledgers { // Remember that it's possible that transaction can remove a preauth - // tx signer even when it's a failed transaction. - if p.Action != Ledgers { - for _, change := range transaction.GetChanges() { + // tx signer even when it's a failed transaction so we need to check + // failed transactions too. + + // Fees are processed before everything else. + for _, transaction := range transactions { + for _, change := range transaction.GetFeeChanges() { err := ledgerCache.AddChange(change) if err != nil { - return errors.Wrap(err, "error addint to ledgerCache") + return errors.Wrap(err, "error adding to ledgerCache") } } + + select { + case <-ctx.Done(): + return nil + default: + continue + } } - select { - case <-ctx.Done(): - return nil - default: - continue + // Tx meta + for _, transaction := range transactions { + for _, change := range transaction.GetChanges() { + err := ledgerCache.AddChange(change) + if err != nil { + return errors.Wrap(err, "error adding to ledgerCache") + } + } + + select { + case <-ctx.Done(): + return nil + default: + continue + } } - } - // Process upgrades meta - if p.Action != Ledgers { + // Process upgrades meta for { change, err := r.ReadUpgradeChange() if err != nil { diff --git a/services/horizon/internal/expingest/verify.go b/services/horizon/internal/expingest/verify.go index ef0e7011a2..2c9d577f65 100644 --- a/services/horizon/internal/expingest/verify.go +++ b/services/horizon/internal/expingest/verify.go @@ -28,7 +28,7 @@ const assetStatsBatchSize = 500 // check them. // There is a test that checks it, to fix it: update the actual `verifyState` // method instead of just updating this value! -const stateVerifierExpectedIngestionVersion = 9 +const stateVerifierExpectedIngestionVersion = 10 // verifyState is called as a go routine from pipeline post hook every 64 // ledgers. It checks if the state is correct. If another go routine is already From 5e4d247fd1d8e1f6a8ff364de0954de07d0fd6b7 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 13 Dec 2019 15:28:17 +0100 Subject: [PATCH 4/6] services/horizon/internal/expingest: Trim experimental ingestion historical data when starting ingestion (#2055) Horizon always reingests state using the latest checkpoint ledger and in most cases the sequence of this ledger will be smaller than the latest ingested ledger. Because of this Horizon will attempt to insert old ledgers into exp_history_ledger again. Inserting old ledgers into exp_history_ledger triggers unique constraint errors. To fix this issue we remove rows from historical ingestion tables that ingestion will insert during the ledger pipeline phase. --- .../horizon/internal/db2/history/ledger.go | 22 ++ .../internal/db2/history/ledger_test.go | 77 ++++++ services/horizon/internal/expingest/main.go | 3 + .../internal/expingest/pipeline_hooks_test.go | 228 +++++++++++------- .../horizon/internal/expingest/pipelines.go | 21 +- .../internal/expingest/run_ingestion_test.go | 11 + 6 files changed, 268 insertions(+), 94 deletions(-) diff --git a/services/horizon/internal/db2/history/ledger.go b/services/horizon/internal/db2/history/ledger.go index 8d238d3784..a410f92df3 100644 --- a/services/horizon/internal/db2/history/ledger.go +++ b/services/horizon/internal/db2/history/ledger.go @@ -165,6 +165,28 @@ func (q *Q) InsertExpLedger( return result.RowsAffected() } +// ExpIngestRemovalSummary describes how many rows in the experimental ingestion +// history tables have been deleted by RemoveExpIngestHistory() +type ExpIngestRemovalSummary struct { + LedgersRemoved int64 +} + +// RemoveExpIngestHistory removes all rows in the experimental ingestion +// history tables which have a ledger sequence higher than `newerThanSequence` +func (q *Q) RemoveExpIngestHistory(newerThanSequence uint32) (ExpIngestRemovalSummary, error) { + result, err := q.Exec( + sq.Delete("exp_history_ledgers"). + Where("sequence > ?", newerThanSequence), + ) + if err != nil { + return ExpIngestRemovalSummary{}, err + } + + summary := ExpIngestRemovalSummary{} + summary.LedgersRemoved, err = result.RowsAffected() + return summary, err +} + func ledgerHeaderToMap( ledger xdr.LedgerHeaderHistoryEntry, successTxsCount int, diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index 2037f64e84..603ae5a968 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -247,3 +247,80 @@ func TestCheckExpLedger(t *testing.T) { tt.Assert.True(valid) } } + +func TestRemoveExpIngestHistory(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + summary, err := q.RemoveExpIngestHistory(69859) + tt.Assert.Equal(ExpIngestRemovalSummary{}, summary) + tt.Assert.NoError(err) + + ledger := Ledger{ + Sequence: 69859, + PreviousLedgerHash: null.NewString("4b0b8bace3b2438b2404776ce57643966855487ba6384724a3c664c7aa4cd9e4", true), + ImporterVersion: 321, + TransactionCount: 12, + SuccessfulTransactionCount: new(int32), + FailedTransactionCount: new(int32), + OperationCount: 23, + TotalCoins: 23451, + FeePool: 213, + BaseReserve: 687, + MaxTxSetSize: 345, + ProtocolVersion: 12, + BaseFee: 100, + ClosedAt: time.Now().UTC().Truncate(time.Second), + LedgerHeaderXDR: null.NewString("temp", true), + } + *ledger.SuccessfulTransactionCount = 12 + *ledger.FailedTransactionCount = 3 + hashes := []string{ + "4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "5db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "6db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "7db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "8db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + } + + for i, hash := range hashes { + ledger.TotalOrderID.ID = toid.New(ledger.Sequence, 0, 0).ToInt64() + ledger.LedgerHash = hash + if i > 0 { + ledger.PreviousLedgerHash = null.NewString(hashes[i-1], true) + } + + insertSQL := sq.Insert("exp_history_ledgers").SetMap(ledgerToMap(ledger)) + _, err = q.Exec(insertSQL) + tt.Assert.NoError(err) + + ledger.Sequence++ + } + + var ledgers []Ledger + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 5) + + summary, err = q.RemoveExpIngestHistory(69863) + tt.Assert.Equal(ExpIngestRemovalSummary{}, summary) + tt.Assert.NoError(err) + + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 5) + + summary, err = q.RemoveExpIngestHistory(69861) + tt.Assert.Equal(ExpIngestRemovalSummary{2}, summary) + tt.Assert.NoError(err) + + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 3) + + for _, ledger := range ledgers { + tt.Assert.LessOrEqual(ledger.Sequence, int32(69861)) + } +} diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 77412c5e39..42bb0e1106 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/jmoiron/sqlx" "github.com/stellar/go/clients/stellarcore" "github.com/stellar/go/exp/ingest" "github.com/stellar/go/exp/ingest/io" @@ -66,12 +67,14 @@ type Config struct { type dbQ interface { Begin() error Rollback() error + GetTx() *sqlx.Tx GetLastLedgerExpIngest() (uint32, error) GetExpIngestVersion() (int, error) UpdateLastLedgerExpIngest(uint32) error UpdateExpStateInvalid(bool) error GetExpStateInvalid() (bool, error) GetAllOffers() ([]history.Offer, error) + RemoveExpIngestHistory(uint32) (history.ExpIngestRemovalSummary, error) } type dbSession interface { diff --git a/services/horizon/internal/expingest/pipeline_hooks_test.go b/services/horizon/internal/expingest/pipeline_hooks_test.go index e85ffcbd9e..ebd4ed6536 100644 --- a/services/horizon/internal/expingest/pipeline_hooks_test.go +++ b/services/horizon/internal/expingest/pipeline_hooks_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/jmoiron/sqlx" "github.com/stellar/go/exp/ingest/pipeline" "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2" @@ -12,102 +13,157 @@ import ( "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" + "github.com/stretchr/testify/suite" ) -func TestStatePreProcessingHook(t *testing.T) { - tt := test.Start(t).Scenario("base") - defer tt.Finish() +type PreProcessingHookTestSuite struct { + suite.Suite + historyQ *mockDBQ + system *System + ctx context.Context + ledgerSeqFromContext uint32 +} - system := &System{} - session := tt.HorizonSession() - defer session.Rollback() - ctx := context.WithValue( +func TestPreProcessingHookTestSuite(t *testing.T) { + suite.Run(t, new(PreProcessingHookTestSuite)) +} + +func (s *PreProcessingHookTestSuite) SetupTest() { + s.system = &System{} + s.historyQ = &mockDBQ{} + s.ledgerSeqFromContext = uint32(5) + + s.ctx = context.WithValue( context.Background(), pipeline.LedgerSequenceContextKey, - uint32(0), + s.ledgerSeqFromContext, ) - pipelineType := statePipeline - historyQ := &history.Q{session} - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(0)) - - tt.Assert.Nil(session.GetTx()) - newCtx, err := preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.False(system.StateReady()) } -func TestLedgerPreProcessingHook(t *testing.T) { - tt := test.Start(t).Scenario("base") - defer tt.Finish() +func (s *PreProcessingHookTestSuite) TearDownTest() { + s.historyQ.AssertExpectations(s.T()) +} - system := &System{} - session := tt.HorizonSession() - defer session.Rollback() - ctx := context.WithValue( - context.Background(), - pipeline.LedgerSequenceContextKey, - uint32(2), +func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithPreExistingTx() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{3}, nil, ) - pipelineType := ledgerPipeline - historyQ := &history.Q{session} - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(1)) - - tt.Assert.Nil(session.GetTx()) - newCtx, err := preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(2)) - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.Nil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.Nil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.True(system.StateReady()) + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithoutPreExistingTx() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{3}, nil, + ) + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnGetLastLedgerExpIngestError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error getting last ledger: transient error") +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnRemoveExpIngestHistoryError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{}, errors.New("transient error"), + ) + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error removing exp ingest history: transient error") +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnBeginError() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error starting a transaction: transient error") +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithPreExistingTx() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithoutPreExistingTx() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsAsMaster() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(s.ledgerSeqFromContext-1, nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().NotNil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnGetLastLedgerExpIngestError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error getting last ledger: transient error") +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnBeginError() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error starting a transaction: transient error") } func TestPostProcessingHook(t *testing.T) { diff --git a/services/horizon/internal/expingest/pipelines.go b/services/horizon/internal/expingest/pipelines.go index 377289395c..19713241fb 100644 --- a/services/horizon/internal/expingest/pipelines.go +++ b/services/horizon/internal/expingest/pipelines.go @@ -141,7 +141,7 @@ func preProcessingHook( ctx context.Context, pipelineType pType, system *System, - historySession *db.Session, + historyQ dbQ, ) (context.Context, error) { var err error defer func() { @@ -149,18 +149,16 @@ func preProcessingHook( // queries will fail indefinietly with a following error: // current transaction is aborted, commands ignored until end of transaction block if err != nil { - historySession.Rollback() + historyQ.Rollback() } }() - historyQ := &history.Q{historySession} - // Start a transaction only if not in a transaction already. // The only case this can happen is during the first run when // a transaction is started to get the latest ledger `FOR UPDATE` // in `System.Run()`. - if tx := historySession.GetTx(); tx == nil { - err = historySession.Begin() + if tx := historyQ.GetTx(); tx == nil { + err = historyQ.Begin() if err != nil { return ctx, errors.Wrap(err, "Error starting a transaction") } @@ -180,6 +178,12 @@ func preProcessingHook( // State pipeline is always fully run because loading offers // from a database is done outside the pipeline. updateDatabase = true + var summary history.ExpIngestRemovalSummary + summary, err = historyQ.RemoveExpIngestHistory(ledgerSeq) + if err != nil { + return ctx, errors.Wrap(err, "Error removing exp ingest history") + } + log.WithField("historyRemoved", summary).Info("removed entries from historical ingestion tables") } else { // mark the system as ready because we have progressed to running // the ledger pipeline @@ -197,7 +201,7 @@ func preProcessingHook( // If we are not going to update a DB release a lock by rolling back a // transaction. if !updateDatabase { - historySession.Rollback() + historyQ.Rollback() } log.WithFields(logpkg.F{ @@ -348,9 +352,10 @@ func addPipelineHooks( default: panic(fmt.Sprintf("Unknown pipeline type %T", p)) } + historyQ := &history.Q{historySession} p.AddPreProcessingHook(func(ctx context.Context) (context.Context, error) { - return preProcessingHook(ctx, pipelineType, system, historySession) + return preProcessingHook(ctx, pipelineType, system, historyQ) }) p.AddPostProcessingHook(func(ctx context.Context, err error) error { diff --git a/services/horizon/internal/expingest/run_ingestion_test.go b/services/horizon/internal/expingest/run_ingestion_test.go index b260093f49..a62de34053 100644 --- a/services/horizon/internal/expingest/run_ingestion_test.go +++ b/services/horizon/internal/expingest/run_ingestion_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/jmoiron/sqlx" "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/db" @@ -44,6 +45,11 @@ func (m *mockDBQ) Rollback() error { return args.Error(0) } +func (m *mockDBQ) GetTx() *sqlx.Tx { + args := m.Called() + return args.Get(0).(*sqlx.Tx) +} + func (m *mockDBQ) GetLastLedgerExpIngest() (uint32, error) { args := m.Called() return args.Get(0).(uint32), args.Error(1) @@ -74,6 +80,11 @@ func (m *mockDBQ) GetAllOffers() ([]history.Offer, error) { return args.Get(0).([]history.Offer), args.Error(1) } +func (m *mockDBQ) RemoveExpIngestHistory(newerThanSequence uint32) (history.ExpIngestRemovalSummary, error) { + args := m.Called(newerThanSequence) + return args.Get(0).(history.ExpIngestRemovalSummary), args.Error(1) +} + type mockIngestSession struct { mock.Mock } From 837b12c09b832c3e2cb2187081d0c6e68252844c Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Mon, 16 Dec 2019 13:07:56 +0100 Subject: [PATCH 5/6] exp/ingest/pipeline: Fix pipeline data race during shutdown (#2058) This commit fixes data race in `exp/ingest/pipeline` that can occur when `LiveSession` (and Horizon) is shut down. It also removes `updateStats` method that was known to have a data race (see comment in that method). It is not actively used right now but was being reported by race detector. Previous code handling shutdown signal in `LiveSession` can be found below: ``` errChan := s.LedgerPipeline.Process(ledgerReader) select { case err2 := <-errChan: if err2 != nil { // Return with no errors if pipeline shutdown if err2 == pipeline.ErrShutdown { s.LedgerReporter.OnEndLedger(nil, true) return nil } if s.LedgerReporter != nil { s.LedgerReporter.OnEndLedger(err2, false) } return errors.Wrap(err2, "Ledger pipeline errored") } case <-s.standardSession.shutdown: if s.LedgerReporter != nil { s.LedgerReporter.OnEndLedger(nil, true) } s.LedgerPipeline.Shutdown() return nil } ``` The problem is when shutdown signal is received, `Resume` returns `nil` so Horizon starts it's shutdown code which calls `Rollback()` (using internal `tx` object) but at the same time pipeline is still running until the code receiving from `ctx.Done` channel is executed. It means that pipeline processors can execute transactions using `tx` transaction object in DB session. To fix this: 1. We don't `select` ingest session shutdown signal when waiting for pipeline to finish processing. 2. Instead we call `Shutdown` on pipelines inside `LiveSession.Shutdown`. 3. Then we wait/block until pipelines gracefully shutdown by calling `Pipeline.IsRunning` method. 4. Finally we `close(s.shutdown)` inside `expingest/System.Shutdown()`. So the components now shut down exactly in the following order: 1. Pipelines. 2. Session. 3. Horizon Expingest System. One comment on `-1` change in tests. When `ingestSession.Run()` returns `nil` we shouldn't continue to `ingestSession.Resume()` because `nil` value means that session ended. I updated the comment in `LiveSession` and also fixed Horizon code. --- exp/ingest/live_session.go | 92 ++++++++++++------- exp/support/pipeline/pipeline.go | 53 +++-------- services/horizon/internal/expingest/main.go | 4 + .../internal/expingest/run_ingestion_test.go | 1 - 4 files changed, 76 insertions(+), 74 deletions(-) diff --git a/exp/ingest/live_session.go b/exp/ingest/live_session.go index 7a68b3e40c..277d5c7148 100644 --- a/exp/ingest/live_session.go +++ b/exp/ingest/live_session.go @@ -17,6 +17,8 @@ var _ Session = &LiveSession{} const defaultCoreCursorName = "EXPINGESTLIVESESSION" +// Run runs the session starting from the last checkpoint ledger. +// Returns nil when session has been shutdown. func (s *LiveSession) Run() error { s.standardSession.shutdown = make(chan bool) @@ -96,6 +98,7 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error { } // Resume resumes the session from `ledgerSequence`. +// Returns nil when session has been shutdown. // // WARNING: it's likely that developers will use `GetLatestSuccessfullyProcessedLedger()` // to get the latest successfuly processed ledger after `Resume` returns error. @@ -107,6 +110,14 @@ func (s *LiveSession) updateCursor(ledgerSequence uint32) error { func (s *LiveSession) Resume(ledgerSequence uint32) error { s.standardSession.shutdown = make(chan bool) + err := s.validate() + if err != nil { + return errors.Wrap(err, "Validation error") + } + + s.setRunningState(true) + defer s.setRunningState(false) + ledgerAdapter := &adapters.LedgerBackendAdapter{ Backend: s.LedgerBackend, } @@ -176,7 +187,6 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg select { case <-s.standardSession.shutdown: - s.LedgerPipeline.Shutdown() return nil case <-time.After(time.Second): // TODO make the idle time smaller @@ -193,27 +203,20 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg ledgerReader = reporterLedgerReader{ledgerReader, s.LedgerReporter} } - errChan := s.LedgerPipeline.Process(ledgerReader) - select { - case err2 := <-errChan: - if err2 != nil { - // Return with no errors if pipeline shutdown - if err2 == pipeline.ErrShutdown { - s.LedgerReporter.OnEndLedger(nil, true) - return nil - } - + err = <-s.LedgerPipeline.Process(ledgerReader) + if err != nil { + // Return with no errors if pipeline shutdown + if err == pipeline.ErrShutdown { if s.LedgerReporter != nil { - s.LedgerReporter.OnEndLedger(err2, false) + s.LedgerReporter.OnEndLedger(nil, true) } - return errors.Wrap(err2, "Ledger pipeline errored") + return nil } - case <-s.standardSession.shutdown: + if s.LedgerReporter != nil { - s.LedgerReporter.OnEndLedger(nil, true) + s.LedgerReporter.OnEndLedger(err, false) } - s.LedgerPipeline.Shutdown() - return nil + return errors.Wrap(err, "Ledger pipeline errored") } if s.LedgerReporter != nil { @@ -229,6 +232,14 @@ func (s *LiveSession) resume(ledgerSequence uint32, ledgerAdapter *adapters.Ledg } ledgerSequence++ + + // Exit early if Shutdown() was called. + select { + case <-s.standardSession.shutdown: + return nil + default: + // Continue + } } return nil @@ -276,26 +287,20 @@ func (s *LiveSession) initState(historyAdapter *adapters.HistoryArchiveAdapter, stateReader = reporterStateReader{stateReader, s.StateReporter} } - errChan := s.StatePipeline.Process(stateReader) - select { - case err := <-errChan: - if err != nil { - // Return with no errors if pipeline shutdown - if err == pipeline.ErrShutdown { - s.StateReporter.OnEndState(nil, true) - return nil - } - + err = <-s.StatePipeline.Process(stateReader) + if err != nil { + // Return with no errors if pipeline shutdown + if err == pipeline.ErrShutdown { if s.StateReporter != nil { - s.StateReporter.OnEndState(err, false) + s.StateReporter.OnEndState(nil, true) } - return errors.Wrap(err, "State pipeline errored") + return nil } - case <-s.standardSession.shutdown: + if s.StateReporter != nil { - s.StateReporter.OnEndState(nil, true) + s.StateReporter.OnEndState(err, false) } - s.StatePipeline.Shutdown() + return errors.Wrap(err, "State pipeline errored") } if s.StateReporter != nil { @@ -303,3 +308,26 @@ func (s *LiveSession) initState(historyAdapter *adapters.HistoryArchiveAdapter, } return nil } + +// Shutdown gracefully stops the pipelines and the session. This method blocks +// until pipelines are gracefully shutdown. +func (s *LiveSession) Shutdown() { + // Send shutdown signal + s.standardSession.Shutdown() + + // Shutdown pipelines + s.StatePipeline.Shutdown() + s.LedgerPipeline.Shutdown() + + // Shutdown signals sent, block/wait until pipelines are done + // shutting down. + for { + stateRunning := s.StatePipeline.IsRunning() + ledgerRunning := s.LedgerPipeline.IsRunning() + if stateRunning || ledgerRunning { + time.Sleep(time.Second) + continue + } + break + } +} diff --git a/exp/support/pipeline/pipeline.go b/exp/support/pipeline/pipeline.go index 24eed9c10f..edefb14dd2 100644 --- a/exp/support/pipeline/pipeline.go +++ b/exp/support/pipeline/pipeline.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" "sync" - "time" "github.com/stellar/go/support/errors" ) @@ -86,6 +85,14 @@ func (p *Pipeline) setRunning(setRunning bool) error { return nil } +// IsRunning returns true if pipeline is running +func (p *Pipeline) IsRunning() bool { + // Protects internal fields + p.mutex.Lock() + defer p.mutex.Unlock() + return p.running +} + // reset resets internal state of the pipeline and all the nodes and processors. func (p *Pipeline) reset() { p.cancelled = false @@ -192,8 +199,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip } }() - finishUpdatingStats := p.updateStats(node, reader, writer) - for i, child := range node.Children { wg.Add(1) go func(i int, child *PipelineNode) { @@ -209,8 +214,6 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip go func() { wg.Wait() - finishUpdatingStats <- true - if node == p.root { // If pipeline processing is finished run post-hooks and send error // if not already sent. @@ -259,40 +262,8 @@ func (p *Pipeline) Shutdown() { } p.shutDown = true p.cancelled = true - p.cancelFunc() -} - -func (p *Pipeline) updateStats(node *PipelineNode, reader Reader, writer *multiWriter) chan<- bool { - // Update stats - interval := time.Second - done := make(chan bool) - ticker := time.NewTicker(interval) - - go func() { - defer ticker.Stop() - - for { - // This is not thread-safe: check if Mutex slows it down a lot... - readBuffer, readBufferIsBufferedReadWriter := reader.(*BufferedReadWriter) - - node.writesPerSecond = (writer.wroteEntries - node.wroteEntries) * int(time.Second/interval) - node.wroteEntries = writer.wroteEntries - - if readBufferIsBufferedReadWriter { - node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval) - node.readEntries = readBuffer.readEntries - node.queuedEntries = readBuffer.QueuedEntries() - } - - select { - case <-ticker.C: - continue - case <-done: - // Pipeline done - return - } - } - }() - - return done + // It's possible that Shutdown will be called before first run. + if p.cancelFunc != nil { + p.cancelFunc() + } } diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 42bb0e1106..cec34705cc 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -297,6 +297,10 @@ func (s *System) Run() { "err": err, "last_ingested_ledger": lastIngestedLedger, }).Error("Error running session, resuming from the last ingested ledger") + } else { + // LiveSession.Run returns nil => shutdown + log.Info("Session shut down") + return nil } } else { // The other node already ingested a state (just now or in the past) diff --git a/services/horizon/internal/expingest/run_ingestion_test.go b/services/horizon/internal/expingest/run_ingestion_test.go index a62de34053..8aa15b1069 100644 --- a/services/horizon/internal/expingest/run_ingestion_test.go +++ b/services/horizon/internal/expingest/run_ingestion_test.go @@ -253,7 +253,6 @@ func (s *RunIngestionTestSuite) TestOutdatedIngestVersion() { s.session.On("TruncateTables", history.ExperimentalIngestionTables).Return(nil).Once() s.ingestSession.On("Run").Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() - s.ingestSession.On("Resume", uint32(4)).Return(nil).Once() s.system.retry = expectError(s.Assert(), "") } From e632f45c553b24f210ebe955779a4025fbeb250c Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 16 Dec 2019 18:03:27 +0100 Subject: [PATCH 6/6] Add changelog for v0.24.1 release (#2062) --- services/horizon/CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 20ea319175..8acded941a 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -6,6 +6,13 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). As this project is pre 1.0, breaking changes may happen for minor version bumps. A breaking change will get clearly notified in this log. +## v0.24.1 + +* Add cache to improve performance of experimental ingestion system (#[2004](https://github.com/stellar/go/pull/2004)). +* Fix experimental ingestion bug where ledger changes were not applied in the correct order (#[2050](https://github.com/stellar/go/pull/2050)). +* Fix experimental ingestion bug where unique constraint errors are incurred when the ingestion system has to reingest state from history archive checkpoints (#[2055](https://github.com/stellar/go/pull/2055)). +* Fix experimental ingestion bug where a race condition during shutdown leads to a crash (#[2058](https://github.com/stellar/go/pull/2058)). + ## v0.24.0 * Add `fee_charged` and `max_fee` objects to `/fee_stats` endpoint ([#1964](https://github.com/stellar/go/pull/1964)).