Skip to content

Commit

Permalink
stellar#5099: added 'ledgers-per-flush' reingest flag, default to 0.
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Nov 16, 2023
1 parent fd46a14 commit e43e291
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 42 deletions.
10 changes: 10 additions & 0 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ var dbReingestCmd = &cobra.Command{
var (
reingestForce bool
parallelWorkers uint
maxLedgersPerFlush uint32
parallelJobSize uint32
retries uint
retryBackoffSeconds uint
Expand Down Expand Up @@ -275,6 +276,14 @@ func ingestRangeCmdOpts() support.ConfigOptions {
FlagDefault: uint32(100000),
Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size",
},
{
Name: "ledgers-per-flush",
ConfigKey: &maxLedgersPerFlush,
OptType: types.Uint32,
Required: false,
FlagDefault: uint32(0),
Usage: "[optional] size of ledgers batch for tx processors to retain in memory first, before flushing once to the workers ongoing db transaction, default is 0, which disables batching, effectively flush to db tx per each ledger.",
},
{
Name: "retries",
ConfigKey: &retries,
Expand Down Expand Up @@ -416,6 +425,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
StellarCoreURL: config.StellarCoreURL,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
MaxLedgerPerFlush: maxLedgersPerFlush,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h historyRangeState) run(s *system) (transition, error) {
}).Info("Ledger returned from the backend")
ledgers = append(ledgers, ledgerCloseMeta)

if s.maxLedgerPerFlush > 0 && len(ledgers)%int(s.maxLedgerPerFlush) == 0 {
if s.maxLedgerPerFlush < 1 || len(ledgers)%int(s.maxLedgerPerFlush) == 0 {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u

ledgers = append(ledgers, ledgerCloseMeta)

if s.maxLedgerPerFlush > 0 && len(ledgers)%int(s.maxLedgerPerFlush) == 0 {
if s.maxLedgerPerFlush < 1 || len(ledgers)%int(s.maxLedgerPerFlush) == 0 {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
Expand Down
67 changes: 27 additions & 40 deletions services/horizon/internal/ingest/ingest_history_range_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,22 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeRunTransactionProcess
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once()
s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once()

ledgers := []xdr.LedgerCloseMeta{}
for i := 100; i <= 200; i++ {
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(i),
},
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(100),
},
},
}
ledgers = append(ledgers, meta)
s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once()
},
}
s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(errors.New("my error")).Once()

s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once()

next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system)
s.Assert().Error(err)
s.Assert().EqualError(err, "error processing ledger range 100 - 200: my error")
s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error")
s.Assert().Equal(transition{node: startState{}, sleepDuration: defaultSleep}, next)
}

Expand All @@ -170,7 +167,6 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessNoFlushMax() {
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), nil).Once()
s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(99), nil).Once()

ledgers := []xdr.LedgerCloseMeta{}
for i := 100; i <= 200; i++ {
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
Expand All @@ -181,10 +177,10 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeSuccessNoFlushMax() {
},
},
}
ledgers = append(ledgers, meta)
s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
}
s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once()

s.historyQ.On("Commit").Return(nil).Once()

next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system)
Expand Down Expand Up @@ -244,9 +240,9 @@ func (s *IngestHistoryRangeStateTestSuite) TestHistoryRangeCommitsWorkOnLedgerBa
},
}
s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
s.ledgerBackend.On("GetLedger", s.ctx, uint32(101)).
Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once()
s.runner.AssertNotCalled(s.T(), "RunTransactionProcessorsOnLedgers")

next, err := historyRangeState{fromLedger: 100, toLedger: 200}.run(s.system)
s.Assert().Error(err)
Expand Down Expand Up @@ -365,24 +361,21 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTra
"DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

ledgers := []xdr.LedgerCloseMeta{}
for i := uint32(100); i <= uint32(200); i++ {
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(i),
},
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(100),
},
},
}
ledgers = append(ledgers, meta)
s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once()
},
}
s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(errors.New("my error")).Once()

s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
s.Assert().EqualError(err, "error processing ledger range 100 - 200: my error")
s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error")
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommitFails() {
Expand All @@ -398,7 +391,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommit
"DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

ledgers := []xdr.LedgerCloseMeta{}
for i := uint32(100); i <= uint32(200); i++ {
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
Expand All @@ -409,12 +401,10 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommit
},
},
}
ledgers = append(ledgers, meta)
s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
}

s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
s.Assert().EqualError(err, "Error committing db transaction: my error")
}
Expand All @@ -432,7 +422,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces
s.historyQ.On("Commit").Return(nil).Once()
s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(100), uint32(200), 0).Return(nil).Once()

ledgers := []xdr.LedgerCloseMeta{}
for i := uint32(100); i <= uint32(200); i++ {
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
Expand All @@ -443,11 +432,10 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces
},
},
}
ledgers = append(ledgers, meta)
s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
}

s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once()
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
s.Assert().NoError(err)
}
Expand Down Expand Up @@ -549,7 +537,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW
"DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

ledgers := []xdr.LedgerCloseMeta{}
for i := 100; i <= 200; i++ {
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
Expand All @@ -560,10 +547,10 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW
},
},
}
ledgers = append(ledgers, meta)
s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
}
s.runner.On("RunTransactionProcessorsOnLedgers", ledgers).Return(nil).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
s.Assert().NoError(err)
}
Expand Down

0 comments on commit e43e291

Please sign in to comment.