Skip to content

Commit

Permalink
services/horizon/internal/expingest: Trim experimental ingestion hist…
Browse files Browse the repository at this point in the history
…orical data when starting ingestion (#2055)

Horizon always reingests state using the latest checkpoint ledger and in most cases the sequence of this ledger will be smaller than the latest ingested ledger. Because of this Horizon will attempt to insert old ledgers into exp_history_ledger again. Inserting old ledgers into exp_history_ledger triggers unique constraint errors.

To fix this issue we remove rows from historical ingestion tables that ingestion will insert during the ledger pipeline phase.
  • Loading branch information
tamirms committed Dec 13, 2019
1 parent 86c43bd commit bb82e6e
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 94 deletions.
22 changes: 22 additions & 0 deletions services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,28 @@ func (q *Q) InsertExpLedger(
return result.RowsAffected()
}

// ExpIngestRemovalSummary describes how many rows in the experimental ingestion
// history tables have been deleted by RemoveExpIngestHistory()
type ExpIngestRemovalSummary struct {
LedgersRemoved int64
}

// RemoveExpIngestHistory removes all rows in the experimental ingestion
// history tables which have a ledger sequence higher than `newerThanSequence`
func (q *Q) RemoveExpIngestHistory(newerThanSequence uint32) (ExpIngestRemovalSummary, error) {
result, err := q.Exec(
sq.Delete("exp_history_ledgers").
Where("sequence > ?", newerThanSequence),
)
if err != nil {
return ExpIngestRemovalSummary{}, err
}

summary := ExpIngestRemovalSummary{}
summary.LedgersRemoved, err = result.RowsAffected()
return summary, err
}

func ledgerHeaderToMap(
ledger xdr.LedgerHeaderHistoryEntry,
successTxsCount int,
Expand Down
77 changes: 77 additions & 0 deletions services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,80 @@ func TestCheckExpLedger(t *testing.T) {
tt.Assert.True(valid)
}
}

func TestRemoveExpIngestHistory(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

summary, err := q.RemoveExpIngestHistory(69859)
tt.Assert.Equal(ExpIngestRemovalSummary{}, summary)
tt.Assert.NoError(err)

ledger := Ledger{
Sequence: 69859,
PreviousLedgerHash: null.NewString("4b0b8bace3b2438b2404776ce57643966855487ba6384724a3c664c7aa4cd9e4", true),
ImporterVersion: 321,
TransactionCount: 12,
SuccessfulTransactionCount: new(int32),
FailedTransactionCount: new(int32),
OperationCount: 23,
TotalCoins: 23451,
FeePool: 213,
BaseReserve: 687,
MaxTxSetSize: 345,
ProtocolVersion: 12,
BaseFee: 100,
ClosedAt: time.Now().UTC().Truncate(time.Second),
LedgerHeaderXDR: null.NewString("temp", true),
}
*ledger.SuccessfulTransactionCount = 12
*ledger.FailedTransactionCount = 3
hashes := []string{
"4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
"5db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
"6db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
"7db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
"8db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
}

for i, hash := range hashes {
ledger.TotalOrderID.ID = toid.New(ledger.Sequence, 0, 0).ToInt64()
ledger.LedgerHash = hash
if i > 0 {
ledger.PreviousLedgerHash = null.NewString(hashes[i-1], true)
}

insertSQL := sq.Insert("exp_history_ledgers").SetMap(ledgerToMap(ledger))
_, err = q.Exec(insertSQL)
tt.Assert.NoError(err)

ledger.Sequence++
}

var ledgers []Ledger
err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl"))
tt.Assert.NoError(err)
tt.Assert.Len(ledgers, 5)

summary, err = q.RemoveExpIngestHistory(69863)
tt.Assert.Equal(ExpIngestRemovalSummary{}, summary)
tt.Assert.NoError(err)

err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl"))
tt.Assert.NoError(err)
tt.Assert.Len(ledgers, 5)

summary, err = q.RemoveExpIngestHistory(69861)
tt.Assert.Equal(ExpIngestRemovalSummary{2}, summary)
tt.Assert.NoError(err)

err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl"))
tt.Assert.NoError(err)
tt.Assert.Len(ledgers, 3)

for _, ledger := range ledgers {
tt.Assert.LessOrEqual(ledger.Sequence, int32(69861))
}
}
3 changes: 3 additions & 0 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/jmoiron/sqlx"
"github.com/stellar/go/clients/stellarcore"
"github.com/stellar/go/exp/ingest"
"github.com/stellar/go/exp/ingest/io"
Expand Down Expand Up @@ -64,12 +65,14 @@ type Config struct {
type dbQ interface {
Begin() error
Rollback() error
GetTx() *sqlx.Tx
GetLastLedgerExpIngest() (uint32, error)
GetExpIngestVersion() (int, error)
UpdateLastLedgerExpIngest(uint32) error
UpdateExpStateInvalid(bool) error
GetExpStateInvalid() (bool, error)
GetAllOffers() ([]history.Offer, error)
RemoveExpIngestHistory(uint32) (history.ExpIngestRemovalSummary, error)
}

type dbSession interface {
Expand Down
228 changes: 142 additions & 86 deletions services/horizon/internal/expingest/pipeline_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/jmoiron/sqlx"
"github.com/stellar/go/exp/ingest/pipeline"
"github.com/stellar/go/exp/orderbook"
"github.com/stellar/go/services/horizon/internal/db2"
Expand All @@ -12,102 +13,157 @@ import (
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/suite"
)

func TestStatePreProcessingHook(t *testing.T) {
tt := test.Start(t).Scenario("base")
defer tt.Finish()
type PreProcessingHookTestSuite struct {
suite.Suite
historyQ *mockDBQ
system *System
ctx context.Context
ledgerSeqFromContext uint32
}

system := &System{}
session := tt.HorizonSession()
defer session.Rollback()
ctx := context.WithValue(
func TestPreProcessingHookTestSuite(t *testing.T) {
suite.Run(t, new(PreProcessingHookTestSuite))
}

func (s *PreProcessingHookTestSuite) SetupTest() {
s.system = &System{}
s.historyQ = &mockDBQ{}
s.ledgerSeqFromContext = uint32(5)

s.ctx = context.WithValue(
context.Background(),
pipeline.LedgerSequenceContextKey,
uint32(0),
s.ledgerSeqFromContext,
)
pipelineType := statePipeline
historyQ := &history.Q{session}
tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(0))

tt.Assert.Nil(session.GetTx())
newCtx, err := preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.NotNil(session.GetTx())
tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
tt.Assert.False(system.StateReady())

tt.Assert.Nil(session.Rollback())
tt.Assert.Nil(session.GetTx())

tt.Assert.Nil(session.Begin())
tt.Assert.NotNil(session.GetTx())

newCtx, err = preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.NotNil(session.GetTx())
tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
tt.Assert.False(system.StateReady())
}

func TestLedgerPreProcessingHook(t *testing.T) {
tt := test.Start(t).Scenario("base")
defer tt.Finish()
func (s *PreProcessingHookTestSuite) TearDownTest() {
s.historyQ.AssertExpectations(s.T())
}

system := &System{}
session := tt.HorizonSession()
defer session.Rollback()
ctx := context.WithValue(
context.Background(),
pipeline.LedgerSequenceContextKey,
uint32(2),
func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithPreExistingTx() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once()
s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return(
history.ExpIngestRemovalSummary{3}, nil,
)
pipelineType := ledgerPipeline
historyQ := &history.Q{session}
tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(1))

tt.Assert.Nil(session.GetTx())
newCtx, err := preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.NotNil(session.GetTx())
tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true)
tt.Assert.True(system.StateReady())

tt.Assert.Nil(session.Rollback())
tt.Assert.Nil(session.GetTx())
system.stateReady = false
tt.Assert.False(system.StateReady())

tt.Assert.Nil(session.Begin())
tt.Assert.NotNil(session.GetTx())
newCtx, err = preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.NotNil(session.GetTx())
tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true)
tt.Assert.True(system.StateReady())

tt.Assert.Nil(session.Rollback())
tt.Assert.Nil(session.GetTx())
system.stateReady = false
tt.Assert.False(system.StateReady())

tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(2))
newCtx, err = preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.Nil(session.GetTx())
tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
tt.Assert.True(system.StateReady())

tt.Assert.Nil(session.Begin())
tt.Assert.NotNil(session.GetTx())
system.stateReady = false
tt.Assert.False(system.StateReady())

newCtx, err = preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.Nil(session.GetTx())
tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
tt.Assert.True(system.StateReady())

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithoutPreExistingTx() {
var nilTx *sqlx.Tx
s.historyQ.On("GetTx").Return(nilTx).Once()
s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once()
s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return(
history.ExpIngestRemovalSummary{3}, nil,
)

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnGetLastLedgerExpIngestError() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error getting last ledger: transient error")
}

func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnRemoveExpIngestHistoryError() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once()
s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return(
history.ExpIngestRemovalSummary{}, errors.New("transient error"),
)
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error removing exp ingest history: transient error")
}

func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnBeginError() {
var nilTx *sqlx.Tx
s.historyQ.On("GetTx").Return(nilTx).Once()
s.historyQ.On("Begin").Return(errors.New("transient error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error starting a transaction: transient error")
}

func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithPreExistingTx() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().True(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithoutPreExistingTx() {
var nilTx *sqlx.Tx
s.historyQ.On("GetTx").Return(nilTx).Once()
s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().True(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsAsMaster() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(s.ledgerSeqFromContext-1, nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().NotNil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().True(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnGetLastLedgerExpIngestError() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error getting last ledger: transient error")
}

func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnBeginError() {
var nilTx *sqlx.Tx
s.historyQ.On("GetTx").Return(nilTx).Once()
s.historyQ.On("Begin").Return(errors.New("transient error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error starting a transaction: transient error")
}

func TestPostProcessingHook(t *testing.T) {
Expand Down
Loading

0 comments on commit bb82e6e

Please sign in to comment.