Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/expingest/main.go: Trim experimental ingestion historical data when starting ingestion #2055

Merged
merged 2 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,28 @@ func (q *Q) InsertExpLedger(
return result.RowsAffected()
}

// ExpIngestRemovalSummary describes how many rows in the experimental ingestion
// history tables have been deleted by RemoveExpIngestHistory()
type ExpIngestRemovalSummary struct {
LedgersRemoved int64
}

// RemoveExpIngestHistory removes all rows in the experimental ingestion
// history tables which have a ledger sequence higher than `newerThanSequence`
func (q *Q) RemoveExpIngestHistory(newerThanSequence uint32) (ExpIngestRemovalSummary, error) {
result, err := q.Exec(
sq.Delete("exp_history_ledgers").
Where("sequence > ?", newerThanSequence),
)
if err != nil {
return ExpIngestRemovalSummary{}, err
}

summary := ExpIngestRemovalSummary{}
summary.LedgersRemoved, err = result.RowsAffected()
return summary, err
}

func ledgerHeaderToMap(
ledger xdr.LedgerHeaderHistoryEntry,
successTxsCount int,
Expand Down
77 changes: 77 additions & 0 deletions services/horizon/internal/db2/history/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,80 @@ func TestCheckExpLedger(t *testing.T) {
tt.Assert.True(valid)
}
}

func TestRemoveExpIngestHistory(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

summary, err := q.RemoveExpIngestHistory(69859)
tt.Assert.Equal(ExpIngestRemovalSummary{}, summary)
tt.Assert.NoError(err)

ledger := Ledger{
Sequence: 69859,
PreviousLedgerHash: null.NewString("4b0b8bace3b2438b2404776ce57643966855487ba6384724a3c664c7aa4cd9e4", true),
ImporterVersion: 321,
TransactionCount: 12,
SuccessfulTransactionCount: new(int32),
FailedTransactionCount: new(int32),
OperationCount: 23,
TotalCoins: 23451,
FeePool: 213,
BaseReserve: 687,
MaxTxSetSize: 345,
ProtocolVersion: 12,
BaseFee: 100,
ClosedAt: time.Now().UTC().Truncate(time.Second),
LedgerHeaderXDR: null.NewString("temp", true),
}
*ledger.SuccessfulTransactionCount = 12
*ledger.FailedTransactionCount = 3
hashes := []string{
"4db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
"5db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
"6db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
"7db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
"8db1e4f145e9ee75162040d26284795e0697e2e84084624e7c6c723ebbf80118",
}

for i, hash := range hashes {
ledger.TotalOrderID.ID = toid.New(ledger.Sequence, 0, 0).ToInt64()
ledger.LedgerHash = hash
if i > 0 {
ledger.PreviousLedgerHash = null.NewString(hashes[i-1], true)
}

insertSQL := sq.Insert("exp_history_ledgers").SetMap(ledgerToMap(ledger))
_, err = q.Exec(insertSQL)
tt.Assert.NoError(err)

ledger.Sequence++
}

var ledgers []Ledger
err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl"))
tt.Assert.NoError(err)
tt.Assert.Len(ledgers, 5)

summary, err = q.RemoveExpIngestHistory(69863)
tt.Assert.Equal(ExpIngestRemovalSummary{}, summary)
tt.Assert.NoError(err)

err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl"))
tt.Assert.NoError(err)
tt.Assert.Len(ledgers, 5)

summary, err = q.RemoveExpIngestHistory(69861)
tt.Assert.Equal(ExpIngestRemovalSummary{2}, summary)
tt.Assert.NoError(err)

err = q.Select(&ledgers, selectLedgerFields.From("exp_history_ledgers hl"))
tt.Assert.NoError(err)
tt.Assert.Len(ledgers, 3)

for _, ledger := range ledgers {
tt.Assert.LessOrEqual(ledger.Sequence, int32(69861))
}
}
3 changes: 3 additions & 0 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/jmoiron/sqlx"
"github.com/stellar/go/clients/stellarcore"
"github.com/stellar/go/exp/ingest"
"github.com/stellar/go/exp/ingest/io"
Expand Down Expand Up @@ -66,12 +67,14 @@ type Config struct {
type dbQ interface {
Begin() error
Rollback() error
GetTx() *sqlx.Tx
GetLastLedgerExpIngest() (uint32, error)
GetExpIngestVersion() (int, error)
UpdateLastLedgerExpIngest(uint32) error
UpdateExpStateInvalid(bool) error
GetExpStateInvalid() (bool, error)
GetAllOffers() ([]history.Offer, error)
RemoveExpIngestHistory(uint32) (history.ExpIngestRemovalSummary, error)
}

type dbSession interface {
Expand Down
228 changes: 142 additions & 86 deletions services/horizon/internal/expingest/pipeline_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/jmoiron/sqlx"
"github.com/stellar/go/exp/ingest/pipeline"
"github.com/stellar/go/exp/orderbook"
"github.com/stellar/go/services/horizon/internal/db2"
Expand All @@ -12,102 +13,157 @@ import (
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/suite"
)

func TestStatePreProcessingHook(t *testing.T) {
tt := test.Start(t).Scenario("base")
defer tt.Finish()
type PreProcessingHookTestSuite struct {
suite.Suite
historyQ *mockDBQ
system *System
ctx context.Context
ledgerSeqFromContext uint32
}

system := &System{}
session := tt.HorizonSession()
defer session.Rollback()
ctx := context.WithValue(
func TestPreProcessingHookTestSuite(t *testing.T) {
suite.Run(t, new(PreProcessingHookTestSuite))
}

func (s *PreProcessingHookTestSuite) SetupTest() {
s.system = &System{}
s.historyQ = &mockDBQ{}
s.ledgerSeqFromContext = uint32(5)

s.ctx = context.WithValue(
context.Background(),
pipeline.LedgerSequenceContextKey,
uint32(0),
s.ledgerSeqFromContext,
)
pipelineType := statePipeline
historyQ := &history.Q{session}
tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(0))

tt.Assert.Nil(session.GetTx())
newCtx, err := preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.NotNil(session.GetTx())
tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
tt.Assert.False(system.StateReady())

tt.Assert.Nil(session.Rollback())
tt.Assert.Nil(session.GetTx())

tt.Assert.Nil(session.Begin())
tt.Assert.NotNil(session.GetTx())

newCtx, err = preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.NotNil(session.GetTx())
tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
tt.Assert.False(system.StateReady())
}

func TestLedgerPreProcessingHook(t *testing.T) {
tt := test.Start(t).Scenario("base")
defer tt.Finish()
func (s *PreProcessingHookTestSuite) TearDownTest() {
s.historyQ.AssertExpectations(s.T())
}

system := &System{}
session := tt.HorizonSession()
defer session.Rollback()
ctx := context.WithValue(
context.Background(),
pipeline.LedgerSequenceContextKey,
uint32(2),
func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithPreExistingTx() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once()
s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return(
history.ExpIngestRemovalSummary{3}, nil,
)
pipelineType := ledgerPipeline
historyQ := &history.Q{session}
tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(1))

tt.Assert.Nil(session.GetTx())
newCtx, err := preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.NotNil(session.GetTx())
tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true)
tt.Assert.True(system.StateReady())

tt.Assert.Nil(session.Rollback())
tt.Assert.Nil(session.GetTx())
system.stateReady = false
tt.Assert.False(system.StateReady())

tt.Assert.Nil(session.Begin())
tt.Assert.NotNil(session.GetTx())
newCtx, err = preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.NotNil(session.GetTx())
tt.Assert.Equal(newCtx.Value(horizonProcessors.IngestUpdateDatabase), true)
tt.Assert.True(system.StateReady())

tt.Assert.Nil(session.Rollback())
tt.Assert.Nil(session.GetTx())
system.stateReady = false
tt.Assert.False(system.StateReady())

tt.Assert.Nil(historyQ.UpdateLastLedgerExpIngest(2))
newCtx, err = preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.Nil(session.GetTx())
tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
tt.Assert.True(system.StateReady())

tt.Assert.Nil(session.Begin())
tt.Assert.NotNil(session.GetTx())
system.stateReady = false
tt.Assert.False(system.StateReady())

newCtx, err = preProcessingHook(ctx, pipelineType, system, session)
tt.Assert.NoError(err)
tt.Assert.Nil(session.GetTx())
tt.Assert.Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
tt.Assert.True(system.StateReady())

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestStateHookSucceedsWithoutPreExistingTx() {
var nilTx *sqlx.Tx
s.historyQ.On("GetTx").Return(nilTx).Once()
s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once()
s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return(
history.ExpIngestRemovalSummary{3}, nil,
)

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnGetLastLedgerExpIngestError() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error getting last ledger: transient error")
}

func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnRemoveExpIngestHistoryError() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), nil).Once()
s.historyQ.On("RemoveExpIngestHistory", s.ledgerSeqFromContext).Return(
history.ExpIngestRemovalSummary{}, errors.New("transient error"),
)
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error removing exp ingest history: transient error")
}

func (s *PreProcessingHookTestSuite) TestStateHookRollsbackOnBeginError() {
var nilTx *sqlx.Tx
s.historyQ.On("GetTx").Return(nilTx).Once()
s.historyQ.On("Begin").Return(errors.New("transient error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, statePipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error starting a transaction: transient error")
}

func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithPreExistingTx() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().True(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsWithoutPreExistingTx() {
var nilTx *sqlx.Tx
s.historyQ.On("GetTx").Return(nilTx).Once()
s.historyQ.On("Begin").Return(nil).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(1), nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().True(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestLedgerHookSucceedsAsMaster() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(s.ledgerSeqFromContext-1, nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().NoError(err)
s.Assert().NotNil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().True(s.system.StateReady())
}

func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnGetLastLedgerExpIngestError() {
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(0), errors.New("transient error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error getting last ledger: transient error")
}

func (s *PreProcessingHookTestSuite) TestLedgerHookRollsbackOnBeginError() {
var nilTx *sqlx.Tx
s.historyQ.On("GetTx").Return(nilTx).Once()
s.historyQ.On("Begin").Return(errors.New("transient error")).Once()
s.historyQ.On("Rollback").Return(nil).Once()

newCtx, err := preProcessingHook(s.ctx, ledgerPipeline, s.system, s.historyQ)
s.Assert().Nil(newCtx.Value(horizonProcessors.IngestUpdateDatabase))
s.Assert().False(s.system.StateReady())
s.Assert().EqualError(err, "Error starting a transaction: transient error")
}

func TestPostProcessingHook(t *testing.T) {
Expand Down
Loading