Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Allow captive core to start from any ledger. #3160

Merged
merged 3 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions exp/tools/captive-core-start-tester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"fmt"

"github.com/stellar/go/ingest/ledgerbackend"
)

// This little app helped testing CaptiveStellarCore.runFromParams on a living
// Stellar-Core. Adding it to the repo because it can be useful in a future if
// Stellar-Core behaviour changes again.
// To make it work, run standalone network (RUN_STANDALONE=false to allow outside
// connections) and update paths below.
func main() {
// check(1) // err expected, cannot stream in captive core
checkLedgers := []uint32{2, 3, 62, 63, 64, 65, 126, 127, 128}
for _, ledger := range checkLedgers {
ok := check(ledger)
if !ok {
panic("test failed error")
}
}
}

func check(ledger uint32) bool {
c, err := ledgerbackend.NewCaptive(
"stellar-core",
"stellar-core-standalone2.cfg",
"Standalone Network ; February 2017",
[]string{"http://localhost:1570"},
)
if err != nil {
panic(err)
}
defer c.Close()

err = c.PrepareRange(ledgerbackend.UnboundedRange(ledger))
if err != nil {
fmt.Println(err)
return false
}

ok, meta, err := c.GetLedger(ledger)
if err != nil {
fmt.Println(err)
return false
}

if !ok {
fmt.Println("no ledger")
return false
}

if meta.LedgerSequence() != ledger {
fmt.Println("wrong ledger", meta.LedgerSequence())
return false
}

fmt.Println(ledger, "ok")
return true
}
77 changes: 43 additions & 34 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,52 +254,61 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {

// if nextLedger is behind - fast-forward until expected ledger
if c.nextLedger < from {
// make GetFrom blocking temporarily
c.blocking = true
_, _, err := c.GetLedger(from)
return errors.Wrapf(err, "Error fast-forwarding to %d", from)
c.blocking = false
if err != nil {
return errors.Wrapf(err, "Error fast-forwarding to %d", from)
}
}

return nil
}

// runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash
func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerHash string, nextLedger uint32, err error) {
if historyarchive.IsCheckpoint(from) {
// To start replaying ledger metadata from a checkpoint ledger
// (including that ledger), we need to start at the previous ledger
// which forces the stream to start at the first ledger in the
// checkpoint.
//
// If we start at the checkpoint ledger, then the first ledger metadata in the stream would be for L+1 (not L)
//
ledgerHeader, err2 := c.archive.GetLedgerHeader(from)
if err2 != nil {
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from)
return
}
runFrom = from - 1
ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
nextLedger = roundDownToFirstReplayAfterCheckpointStart(runFrom)
if from == 1 {
// Trying to start-from 1 results in an error from Stellar-Core:
// Target ledger 1 is not newer than last closed ledger 1 - nothing to do
// TODO maybe we can fix it by generating 1st ledger meta
// like GenesisLedgerStateReader?
err = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2")
return
}

if from <= 63 {
// For ledgers before (and including) first checkpoint, get/wait the first
// checkpoint to get the ledger header. It will always start streaming
// from ledger 2.
nextLedger = 2
// The line below is to support a special case for streaming ledger 2
// that works for all other ledgers <= 63 (fast-forward).
// We can't set from=2 because Stellar-Core will not allow starting from 1.
// To solve this we start from 3 and exploit the fast that Stellar-Core
// will stream data from 2 for the first checkpoint.
from = 3
} else {
// This is a workaround for now since we are not passing the ledger
// header hash.
//
// We need a way to get the hash from the previous ledger without having
// to rely on the history archive
//
// For now, we run stellar-core starting at the previous checkpoint
// ledger and then fast-forward to the desire ledger
//
//
runFrom = roundDownToFirstReplayAfterCheckpointStart(from) - 1
ledgerHeader, err2 := c.archive.GetLedgerHeader(runFrom)
if err2 != nil {
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", runFrom)
return
// For ledgers after the first checkpoint, start at the previous checkpoint
// and fast-forward from there.
if !historyarchive.IsCheckpoint(from) {
from = historyarchive.PrevCheckpoint(from)
}
// Streaming will start from the previous checkpoint + 1
nextLedger = from - 63
if nextLedger < 2 {
// Stellar-Core always streams from ledger 2 at min.
nextLedger = 2
}
ledgerHash = hex.EncodeToString(ledgerHeader.Hash[:])
nextLedger = runFrom + 1
}

runFrom = from - 1
ledgerHeader, err2 := c.archive.GetLedgerHeader(from)
if err2 != nil {
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from)
return
}
ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
return
}

Expand Down
125 changes: 59 additions & 66 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TestCaptivePrepareRangeUnboundedRange_FromIsTooFarAheadOfLatestHAS(t *testi

func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(127), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once()
mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once()
mockRunner.On("close").Return(nil)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -434,13 +434,15 @@ func TestCaptivePrepareRangeUnboundedRange_ErrClosingExistingSession(t *testing.
func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
var buf bytes.Buffer

for i := 60; i <= 65; i++ {
for i := 2; i <= 65; i++ {
writeLedgerHeader(&buf, uint32(i))
}

mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(62), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return(&buf)
mockRunner.On("getProcessExitChan").Return(make(chan error))
mockRunner.On("close").Return(nil)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -470,12 +472,12 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
func TestGetLatestLedgerSequence(t *testing.T) {
var buf bytes.Buffer

for i := 64; i <= 99; i++ {
for i := 2; i <= 99; i++ {
writeLedgerHeader(&buf, uint32(i))
}

mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("runFrom", uint32(62), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return(&buf)
mockRunner.On("getProcessExitChan").Return(make(chan error))
mockRunner.On("close").Return(nil).Once()
Expand Down Expand Up @@ -505,8 +507,9 @@ func TestGetLatestLedgerSequence(t *testing.T) {

latest, err := captiveBackend.GetLatestLedgerSequence()
assert.NoError(t, err)
// readAheadBufferSize is 2 so 2 ledgers are buffered: 64 and 65
assert.Equal(t, uint32(65), latest)
// readAheadBufferSize is 2 so 2 ledgers are buffered: 65 and 66.
// 64 is already read and in the cache.
assert.Equal(t, uint32(66), latest)

exists, _, err := captiveBackend.GetLedger(64)
assert.NoError(t, err)
Expand Down Expand Up @@ -795,66 +798,56 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) {
}

func TestCaptiveRunFromParams(t *testing.T) {
tt := assert.New(t)
mockRunner := &stellarCoreRunnerMock{}
mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetLedgerHeader", uint32(63)).
Return(xdr.LedgerHeaderHistoryEntry{
Hash: xdr.Hash{1, 1, 1, 1},
}, nil)

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
stellarCoreRunner: mockRunner,
var tests = []struct {
from uint32
runFrom uint32
ledgerArchives uint32
nextLedger uint32
}{
// Before and including 1st checkpoint:
{2, 2, 3, 2},
{3, 2, 3, 2},
{3, 2, 3, 2},
{4, 2, 3, 2},
{62, 2, 3, 2},
{63, 2, 3, 2},

// Starting from 64 we go normal path: between 1st and 2nd checkpoint:
{64, 62, 63, 2},
{65, 62, 63, 2},
{66, 62, 63, 2},
{126, 62, 63, 2},

// between 2nd and 3rd checkpoint... and so on.
{127, 126, 127, 64},
{128, 126, 127, 64},
{129, 126, 127, 64},
}

runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(70)
tt.NoError(err)
tt.Equal(uint32(63), runFrom)
tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash)
tt.Equal(uint32(64), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(64)
tt.NoError(err)
tt.Equal(uint32(63), runFrom)
tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash)
tt.Equal(uint32(64), nextLedger)

mockArchive.
On("GetLedgerHeader", uint32(127)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{1},
},
}, nil)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(127)
tt.NoError(err)
tt.Equal(uint32(126), runFrom)
tt.Equal("0100000000000000000000000000000000000000000000000000000000000000", ledgerHash)
tt.Equal(uint32(64), nextLedger)

mockArchive.
On("GetLedgerHeader", uint32(319)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{1},
},
}, errors.New("missing ledger checkpoint"))

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(319)
tt.EqualError(err, "error trying to read ledger header 319 from HAS: missing ledger checkpoint")

mockArchive.
On("GetLedgerHeader", uint32(191)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{1},
},
}, errors.New("missing ledger checkpoint"))

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(195)
tt.EqualError(err, "error trying to read ledger header 191 from HAS: missing ledger checkpoint")
for _, tc := range tests {
t.Run(fmt.Sprintf("from_%d", tc.from), func(t *testing.T) {
tt := assert.New(t)
mockRunner := &stellarCoreRunnerMock{}
mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetLedgerHeader", uint32(tc.ledgerArchives)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{1, 1, 1, 1},
},
}, nil)

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
stellarCoreRunner: mockRunner,
}

runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(tc.from)
tt.NoError(err)
tt.Equal(tc.runFrom, runFrom, "runFrom")
tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash)
tt.Equal(tc.nextLedger, nextLedger, "nextLedger")
})
}
}
2 changes: 1 addition & 1 deletion ingest/ledgerbackend/ledger_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type LedgerBackend interface {
GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error)
// PrepareRange prepares the given range (including from and to) to be loaded.
// Some backends (like captive stellar-core) need to initalize data to be
// able to stream ledgers.
// able to stream ledgers. Blocks until the first ledger is available.
PrepareRange(ledgerRange Range) error
// IsPrepared returns true if a given ledgerRange is prepared.
IsPrepared(ledgerRange Range) (bool, error)
Expand Down