Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Captive-Core fixes to support Stellar-Core 17.1.0 #3694

Merged
71 changes: 32 additions & 39 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type CaptiveStellarCore struct {
// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta

prepared *Range // non-nil if any range is prepared
nextLedger uint32 // next ledger expected, error w/ restart if not seen
lastLedger *uint32 // end of current segment if offline, nil if online
previousLedgerHash *string
Expand Down Expand Up @@ -213,6 +214,8 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error

// The next ledger should be the first ledger of the checkpoint containing
// the requested ledger
ran := BoundedRange(from, to)
c.prepared = &ran
c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from)
c.lastLedger = &to
c.previousLedgerHash = nil
Expand Down Expand Up @@ -248,7 +251,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
c.stellarCoreRunner = runner
}

runFrom, ledgerHash, nextLedger, err := c.runFromParams(ctx, from)
runFrom, ledgerHash, err := c.runFromParams(ctx, from)
if err != nil {
return errors.Wrap(err, "error calculating ledger and hash for stelar-core run")
}
Expand All @@ -258,26 +261,20 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
return errors.Wrap(err, "error running stellar-core")
}

c.nextLedger = nextLedger
// In the online mode we update nextLedger after streaming the first ledger.
// This is to support versions before and after/including v17.1.0 that
// introduced minimal persistent DB.
c.nextLedger = 0
ran := UnboundedRange(from)
c.prepared = &ran
c.lastLedger = nil
c.previousLedgerHash = nil

if c.ledgerHashStore != nil {
var exists bool
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, nextLedger-1)
if err != nil {
return errors.Wrapf(err, "error trying to read ledger hash %d", nextLedger-1)
}
if exists {
c.previousLedgerHash = &ledgerHash
}
}

return nil
}

// runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, nextLedger uint32, err error) {
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, err error) {
if from == 1 {
// Trying to start-from 1 results in an error from Stellar-Core:
// Target ledger 1 is not newer than last closed ledger 1 - nothing to do
Expand All @@ -288,28 +285,12 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru
}

if from <= 63 {
// For ledgers before (and including) first checkpoint, get/wait the first
// checkpoint to get the ledger header. It will always start streaming
// from ledger 2.
nextLedger = 2
// The line below is to support a special case for streaming ledger 2
// that works for all other ledgers <= 63 (fast-forward).
// We can't set from=2 because Stellar-Core will not allow starting from 1.
// To solve this we start from 3 and exploit the fast that Stellar-Core
// will stream data from 2 for the first checkpoint.
from = 3
} else {
// For ledgers after the first checkpoint, start at the previous checkpoint
// and fast-forward from there.
if !c.checkpointManager.IsCheckpoint(from) {
from = c.checkpointManager.PrevCheckpoint(from)
}
// Streaming will start from the previous checkpoint + 1
nextLedger = from - 63
if nextLedger < 2 {
// Stellar-Core always streams from ledger 2 at min.
nextLedger = 2
}
}

runFrom = from - 1
Expand All @@ -334,6 +315,18 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru
return
}

// nextExpectedSequence returns nextLedger (if currently set) or start of
// prepared range. Otherwise it returns 0.
// This is done because `nextLedger` is 0 between the moment Stellar-Core is
// started and streaming the first ledger (in such case we return first ledger
// in requested range).
func (c *CaptiveStellarCore) nextExpectedSequence() uint32 {
if c.nextLedger == 0 && c.prepared != nil {
return c.prepared.from
}
return c.nextLedger
}

func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRange Range) (bool, error) {
c.stellarCoreLock.Lock()
defer c.stellarCoreLock.Unlock()
Expand Down Expand Up @@ -408,18 +401,18 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
cachedLedger = c.cachedMeta.LedgerSequence()
}

if c.nextLedger == 0 {
if c.prepared == nil {
return false
}

if lastLedger == 0 {
return c.nextLedger <= ledgerRange.from || cachedLedger == ledgerRange.from
return c.nextExpectedSequence() <= ledgerRange.from || cachedLedger == ledgerRange.from
}

// From now on: lastLedger != 0 so current range is bounded

if ledgerRange.bounded {
return (c.nextLedger <= ledgerRange.from || cachedLedger == ledgerRange.from) &&
return (c.nextExpectedSequence() <= ledgerRange.from || cachedLedger == ledgerRange.from) &&
lastLedger >= ledgerRange.to
}

Expand Down Expand Up @@ -458,11 +451,11 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
return xdr.LedgerCloseMeta{}, errors.New("session is closed, call PrepareRange first")
}

if sequence < c.nextLedger {
if sequence < c.nextExpectedSequence() {
return xdr.LedgerCloseMeta{}, errors.Errorf(
"requested ledger %d is behind the captive core stream (expected=%d)",
sequence,
c.nextLedger,
c.nextExpectedSequence(),
)
}

Expand Down Expand Up @@ -495,7 +488,7 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe
}

seq := result.LedgerCloseMeta.LedgerSequence()
if seq != c.nextLedger {
if c.nextLedger != 0 && seq != c.nextLedger {
bartekn marked this conversation as resolved.
Show resolved Hide resolved
// We got something unexpected; close and reset
c.stellarCoreRunner.close()
return false, xdr.LedgerCloseMeta{}, errors.Errorf(
Expand All @@ -517,7 +510,7 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe
)
}

c.nextLedger++
c.nextLedger = result.LedgerSequence() + 1
currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString()
c.previousLedgerHash = &currentLedgerHash

Expand Down Expand Up @@ -583,13 +576,13 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
}

if c.lastLedger == nil {
return c.nextLedger - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil
return c.nextExpectedSequence() - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil
}
return *c.lastLedger, nil
}

func (c *CaptiveStellarCore) isClosed() bool {
return c.nextLedger == 0 || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil
return c.prepared == nil || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil
}

// Close closes existing Stellar-Core process, streaming sessions and removes all
Expand Down
Loading