From 00e000798e3b66d18072f18ca8213e90377fef32 Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Fri, 13 Dec 2019 05:32:31 +0100 Subject: [PATCH 1/2] Trim experimental ingestion historical data when starting ingestion from a HAS Horizon always reingests state using the latest checkpoint ledger and in most cases the sequence of this ledger will be smaller than the latest ingested ledger. Because of this Horizon will attempt to insert old ledgers into exp_history_ledger again. Inserting old ledgers into exp_history_ledger triggers unique constraint errors. To fix this issue we remove rows from historical ingestion tables that we will insert during the ingestion pipeline. --- .../horizon/internal/db2/history/ledger.go | 22 ++ .../internal/db2/history/ledger_test.go | 77 ++++++ services/horizon/internal/expingest/main.go | 3 + .../internal/expingest/pipeline_hooks_test.go | 228 +++++++++++------- .../horizon/internal/expingest/pipelines.go | 21 +- .../internal/expingest/run_ingestion_test.go | 11 + 6 files changed, 268 insertions(+), 94 deletions(-) diff --git a/services/horizon/internal/db2/history/ledger.go b/services/horizon/internal/db2/history/ledger.go index 8d238d3784..a410f92df3 100644 --- a/services/horizon/internal/db2/history/ledger.go +++ b/services/horizon/internal/db2/history/ledger.go @@ -165,6 +165,28 @@ func (q *Q) InsertExpLedger( return result.RowsAffected() } +// ExpIngestRemovalSummary describes how many rows in the experimental ingestion +// history tables have been deleted by RemoveExpIngestHistory() +type ExpIngestRemovalSummary struct { + LedgersRemoved int64 +} + +// RemoveExpIngestHistory removes all rows in the experimental ingestion +// history tables which have a ledger sequence higher than `newerThanSequence` +func (q *Q) RemoveExpIngestHistory(newerThanSequence uint32) (ExpIngestRemovalSummary, error) { + result, err := q.Exec( + sq.Delete("exp_history_ledgers"). + Where("sequence > ?", newerThanSequence), + ) + if err != nil { + return ExpIngestRemovalSummary{}, err + } + + summary := ExpIngestRemovalSummary{} + summary.LedgersRemoved, err = result.RowsAffected() + return summary, err +} + func ledgerHeaderToMap( ledger xdr.LedgerHeaderHistoryEntry, successTxsCount int, diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index 2037f64e84..f327b1bcf0 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -247,3 +247,80 @@ func TestCheckExpLedger(t *testing.T) { tt.Assert.True(valid) } } + +func TestRemoveExpIngestHistory(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + summary, err := q.RemoveExpIngestHistory(69859) + tt.Assert.Equal(ExpIngestRemovalSummary{}, summary) + tt.Assert.NoError(err) + + ledger := Ledger{ + Sequence: 69859, + PreviousLedgerHash: null.NewString("4b0b8bace3b2438b2404776ce57643966855487ba6384724a3c664c7aa4cd9e4", true), + ImporterVersion: 321, + TransactionCount: 12, + SuccessfulTransactionCount: new(int32), + FailedTransactionCount: new(int32), + OperationCount: 23, + TotalCoins: 23451, + FeePool: 213, + BaseReserve: 687, + MaxTxSetSize: 345, + ProtocolVersion: 12, + BaseFee: 100, + ClosedAt: time.Now().UTC().Truncate(time.Second), + LedgerHeaderXDR: null.NewString("temp", true), + } + *ledger.SuccessfulTransactionCount = 12 + *ledger.FailedTransactionCount = 3 + hashes := []string{ + "4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "5db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "6db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "7db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + "8db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118", + } + + for i, hash := range hashes { + ledger.TotalOrderID.ID = toid.New(ledger.Sequence, 0, 0).ToInt64() + ledger.LedgerHash = hash + if i > 0 { + ledger.PreviousLedgerHash = null.NewString(hashes[i-1], true) + } + + insertSQL := sq.Insert("exp_history_ledgers").SetMap(ledgerToMap(ledger)) + _, err := q.Exec(insertSQL) + tt.Assert.NoError(err) + + ledger.Sequence++ + } + + var ledgers []Ledger + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 5) + + summary, err = q.RemoveExpIngestHistory(69863) + tt.Assert.Equal(ExpIngestRemovalSummary{}, summary) + tt.Assert.NoError(err) + + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 5) + + summary, err = q.RemoveExpIngestHistory(69861) + tt.Assert.Equal(ExpIngestRemovalSummary{2}, summary) + tt.Assert.NoError(err) + + err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl")) + tt.Assert.NoError(err) + tt.Assert.Len(ledgers, 3) + + for _, ledger := range ledgers { + tt.Assert.LessOrEqual(ledger.Sequence, int32(69861)) + } +} diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 77412c5e39..42bb0e1106 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/jmoiron/sqlx" "github.com/stellar/go/clients/stellarcore" "github.com/stellar/go/exp/ingest" "github.com/stellar/go/exp/ingest/io" @@ -66,12 +67,14 @@ type Config struct { type dbQ interface { Begin() error Rollback() error + GetTx() *sqlx.Tx GetLastLedgerExpIngest() (uint32, error) GetExpIngestVersion() (int, error) UpdateLastLedgerExpIngest(uint32) error UpdateExpStateInvalid(bool) error GetExpStateInvalid() (bool, error) GetAllOffers() ([]history.Offer, error) + RemoveExpIngestHistory(uint32) (history.ExpIngestRemovalSummary, error) } type dbSession interface { diff --git a/services/horizon/internal/expingest/pipeline_hooks_test.go b/services/horizon/internal/expingest/pipeline_hooks_test.go index e85ffcbd9e..ebd4ed6536 100644 --- a/services/horizon/internal/expingest/pipeline_hooks_test.go +++ b/services/horizon/internal/expingest/pipeline_hooks_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/jmoiron/sqlx" "github.com/stellar/go/exp/ingest/pipeline" "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2" @@ -12,102 +13,157 @@ import ( "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" + "github.com/stretchr/testify/suite" ) -func TestStatePreProcessingHook(t *testing.T) { - tt := test.Start(t).Scenario("base") - defer tt.Finish() +type PreProcessingHookTestSuite struct { + suite.Suite + historyQ *mockDBQ + system *System + ctx context.Context + ledgerSeqFromContext uint32 +} - system := &System{} - session := tt.HorizonSession() - defer session.Rollback() - ctx := context.WithValue( +func TestPreProcessingHookTestSuite(t *testing.T) { + suite.Run(t, new(PreProcessingHookTestSuite)) +} + +func (s *PreProcessingHookTestSuite) SetupTest() { + s.system = &System{} + s.historyQ = &mockDBQ{} + s.ledgerSeqFromContext = uint32(5) + + s.ctx = context.WithValue( context.Background(), pipeline.LedgerSequenceContextKey, - uint32(0), + s.ledgerSeqFromContext, ) - pipelineType := statePipeline - historyQ := &history.Q{session} - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(0)) - - tt.Assert.Nil(session.GetTx()) - newCtx, err := preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.False(system.StateReady()) } -func TestLedgerPreProcessingHook(t *testing.T) { - tt := test.Start(t).Scenario("base") - defer tt.Finish() +func (s *PreProcessingHookTestSuite) TearDownTest() { + s.historyQ.AssertExpectations(s.T()) +} - system := &System{} - session := tt.HorizonSession() - defer session.Rollback() - ctx := context.WithValue( - context.Background(), - pipeline.LedgerSequenceContextKey, - uint32(2), +func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithPreExistingTx() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{3}, nil, ) - pipelineType := ledgerPipeline - historyQ := &history.Q{session} - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(1)) - - tt.Assert.Nil(session.GetTx()) - newCtx, err := preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.NotNil(session.GetTx()) - tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Rollback()) - tt.Assert.Nil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(2)) - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.Nil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.True(system.StateReady()) - - tt.Assert.Nil(session.Begin()) - tt.Assert.NotNil(session.GetTx()) - system.stateReady = false - tt.Assert.False(system.StateReady()) - - newCtx, err = preProcessingHook(ctx, pipelineType, system, session) - tt.Assert.NoError(err) - tt.Assert.Nil(session.GetTx()) - tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) - tt.Assert.True(system.StateReady()) + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithoutPreExistingTx() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{3}, nil, + ) + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnGetLastLedgerExpIngestError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error getting last ledger: transient error") +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnRemoveExpIngestHistoryError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once() + s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return( + history.ExpIngestRemovalSummary{}, errors.New("transient error"), + ) + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error removing exp ingest history: transient error") +} + +func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnBeginError() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error starting a transaction: transient error") +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithPreExistingTx() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithoutPreExistingTx() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsAsMaster() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(s.ledgerSeqFromContext-1, nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().NoError(err) + s.Assert().NotNil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().True(s.system.StateReady()) +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnGetLastLedgerExpIngestError() { + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error getting last ledger: transient error") +} + +func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnBeginError() { + var nilTx *sqlx.Tx + s.historyQ.On("GetTx").Return(nilTx).Once() + s.historyQ.On("Begin").Return(errors.New("transient error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + + newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ) + s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase)) + s.Assert().False(s.system.StateReady()) + s.Assert().EqualError(err, "Error starting a transaction: transient error") } func TestPostProcessingHook(t *testing.T) { diff --git a/services/horizon/internal/expingest/pipelines.go b/services/horizon/internal/expingest/pipelines.go index 377289395c..19713241fb 100644 --- a/services/horizon/internal/expingest/pipelines.go +++ b/services/horizon/internal/expingest/pipelines.go @@ -141,7 +141,7 @@ func preProcessingHook( ctx context.Context, pipelineType pType, system *System, - historySession *db.Session, + historyQ dbQ, ) (context.Context, error) { var err error defer func() { @@ -149,18 +149,16 @@ func preProcessingHook( // queries will fail indefinietly with a following error: // current transaction is aborted, commands ignored until end of transaction block if err != nil { - historySession.Rollback() + historyQ.Rollback() } }() - historyQ := &history.Q{historySession} - // Start a transaction only if not in a transaction already. // The only case this can happen is during the first run when // a transaction is started to get the latest ledger `FOR UPDATE` // in `System.Run()`. - if tx := historySession.GetTx(); tx == nil { - err = historySession.Begin() + if tx := historyQ.GetTx(); tx == nil { + err = historyQ.Begin() if err != nil { return ctx, errors.Wrap(err, "Error starting a transaction") } @@ -180,6 +178,12 @@ func preProcessingHook( // State pipeline is always fully run because loading offers // from a database is done outside the pipeline. updateDatabase = true + var summary history.ExpIngestRemovalSummary + summary, err = historyQ.RemoveExpIngestHistory(ledgerSeq) + if err != nil { + return ctx, errors.Wrap(err, "Error removing exp ingest history") + } + log.WithField("historyRemoved", summary).Info("removed entries from historical ingestion tables") } else { // mark the system as ready because we have progressed to running // the ledger pipeline @@ -197,7 +201,7 @@ func preProcessingHook( // If we are not going to update a DB release a lock by rolling back a // transaction. if !updateDatabase { - historySession.Rollback() + historyQ.Rollback() } log.WithFields(logpkg.F{ @@ -348,9 +352,10 @@ func addPipelineHooks( default: panic(fmt.Sprintf("Unknown pipeline type %T", p)) } + historyQ := &history.Q{historySession} p.AddPreProcessingHook(func(ctx context.Context) (context.Context, error) { - return preProcessingHook(ctx, pipelineType, system, historySession) + return preProcessingHook(ctx, pipelineType, system, historyQ) }) p.AddPostProcessingHook(func(ctx context.Context, err error) error { diff --git a/services/horizon/internal/expingest/run_ingestion_test.go b/services/horizon/internal/expingest/run_ingestion_test.go index b260093f49..a62de34053 100644 --- a/services/horizon/internal/expingest/run_ingestion_test.go +++ b/services/horizon/internal/expingest/run_ingestion_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/jmoiron/sqlx" "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/db" @@ -44,6 +45,11 @@ func (m *mockDBQ) Rollback() error { return args.Error(0) } +func (m *mockDBQ) GetTx() *sqlx.Tx { + args := m.Called() + return args.Get(0).(*sqlx.Tx) +} + func (m *mockDBQ) GetLastLedgerExpIngest() (uint32, error) { args := m.Called() return args.Get(0).(uint32), args.Error(1) @@ -74,6 +80,11 @@ func (m *mockDBQ) GetAllOffers() ([]history.Offer, error) { return args.Get(0).([]history.Offer), args.Error(1) } +func (m *mockDBQ) RemoveExpIngestHistory(newerThanSequence uint32) (history.ExpIngestRemovalSummary, error) { + args := m.Called(newerThanSequence) + return args.Get(0).(history.ExpIngestRemovalSummary), args.Error(1) +} + type mockIngestSession struct { mock.Mock } From c981ad9119737096dce87eb8804722a0c3ff505f Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Fri, 13 Dec 2019 05:46:22 +0100 Subject: [PATCH 2/2] Fix shadow declaration --- services/horizon/internal/db2/history/ledger_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index f327b1bcf0..603ae5a968 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -293,7 +293,7 @@ func TestRemoveExpIngestHistory(t *testing.T) { } insertSQL := sq.Insert("exp_history_ledgers").SetMap(ledgerToMap(ledger)) - _, err := q.Exec(insertSQL) + _, err = q.Exec(insertSQL) tt.Assert.NoError(err) ledger.Sequence++