Skip to content

Commit

Permalink
stellar#5099: review feedback, commit already flushed batches prior t…
Browse files Browse the repository at this point in the history
…o err
  • Loading branch information
sreuland committed Nov 27, 2023
1 parent c2aea6a commit 2386a20
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil {
return stop(), err
if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger); ingestErr != nil {
if err := s.historyQ.Commit(); err != nil {
return stop(), errors.Wrap(ingestErr, commitErrMsg)
}
return stop(), ingestErr
}

if err := s.historyQ.Commit(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,74 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce(
s.Assert().NoError(err)
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceLedgerRetrievalError() {
s.historyQ.On("Begin", s.ctx).Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once()
s.historyQ.On("Commit").Return(nil).Once()

toidFrom := toid.New(100, 0, 0)
toidTo := toid.New(201, 0, 0)
s.historyQ.On(
"DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

for i := 100; i <= 105; i++ {
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(i),
},
},
},
}
s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
}

s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once()

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
s.Assert().EqualError(err, "error getting ledger: my error")
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceLedgerRetrievalAndCommitError() {
s.historyQ.On("Begin", s.ctx).Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(190), nil).Once()
s.historyQ.On("Commit").Return(errors.New("commit error")).Once()

toidFrom := toid.New(100, 0, 0)
toidTo := toid.New(201, 0, 0)
s.historyQ.On(
"DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(nil).Once()

for i := 100; i <= 105; i++ {
meta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(i),
},
},
},
}
s.ledgerBackend.On("GetLedger", s.ctx, uint32(i)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
}

s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once()

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
s.Assert().EqualError(err, "Error committing db transaction: error getting ledger: my error")
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceWithFlushMax() {
s.historyQ.On("Begin", s.ctx).Return(nil).Once()
s.historyQ.On("Rollback").Return(nil).Once()
Expand Down

0 comments on commit 2386a20

Please sign in to comment.