diff --git a/exp/tools/captive-core-start-tester/main.go b/exp/tools/captive-core-start-tester/main.go new file mode 100644 index 0000000000..547942bb70 --- /dev/null +++ b/exp/tools/captive-core-start-tester/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "fmt" + + "github.com/stellar/go/ingest/ledgerbackend" +) + +// This little app helped testing CaptiveStellarCore.runFromParams on a living +// Stellar-Core. Adding it to the repo because it can be useful in a future if +// Stellar-Core behaviour changes again. +// To make it work, run standalone network (RUN_STANDALONE=false to allow outside +// connections) and update paths below. +func main() { + // check(1) // err expected, cannot stream in captive core + checkLedgers := []uint32{2, 3, 62, 63, 64, 65, 126, 127, 128} + for _, ledger := range checkLedgers { + ok := check(ledger) + if !ok { + panic("test failed error") + } + } +} + +func check(ledger uint32) bool { + c, err := ledgerbackend.NewCaptive( + "stellar-core", + "stellar-core-standalone2.cfg", + "Standalone Network ; February 2017", + []string{"http://localhost:1570"}, + ) + if err != nil { + panic(err) + } + defer c.Close() + + err = c.PrepareRange(ledgerbackend.UnboundedRange(ledger)) + if err != nil { + fmt.Println(err) + return false + } + + ok, meta, err := c.GetLedger(ledger) + if err != nil { + fmt.Println(err) + return false + } + + if !ok { + fmt.Println("no ledger") + return false + } + + if meta.LedgerSequence() != ledger { + fmt.Println("wrong ledger", meta.LedgerSequence()) + return false + } + + fmt.Println(ledger, "ok") + return true +} diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index fd1bbb87a6..ca682b428b 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -254,8 +254,13 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { // if nextLedger is behind - fast-forward until expected ledger if c.nextLedger < from { + // make GetFrom blocking temporarily + c.blocking = true _, _, err := c.GetLedger(from) - return errors.Wrapf(err, "Error fast-forwarding to %d", from) + c.blocking = false + if err != nil { + return errors.Wrapf(err, "Error fast-forwarding to %d", from) + } } return nil @@ -263,43 +268,47 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { // runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerHash string, nextLedger uint32, err error) { - if historyarchive.IsCheckpoint(from) { - // To start replaying ledger metadata from a checkpoint ledger - // (including that ledger), we need to start at the previous ledger - // which forces the stream to start at the first ledger in the - // checkpoint. - // - // If we start at the checkpoint ledger, then the first ledger metadata in the stream would be for L+1 (not L) - // - ledgerHeader, err2 := c.archive.GetLedgerHeader(from) - if err2 != nil { - err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from) - return - } - runFrom = from - 1 - ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:]) - nextLedger = roundDownToFirstReplayAfterCheckpointStart(runFrom) + if from == 1 { + // Trying to start-from 1 results in an error from Stellar-Core: + // Target ledger 1 is not newer than last closed ledger 1 - nothing to do + // TODO maybe we can fix it by generating 1st ledger meta + // like GenesisLedgerStateReader? + err = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2") + return + } + + if from <= 63 { + // For ledgers before (and including) first checkpoint, get/wait the first + // checkpoint to get the ledger header. It will always start streaming + // from ledger 2. + nextLedger = 2 + // The line below is to support a special case for streaming ledger 2 + // that works for all other ledgers <= 63 (fast-forward). + // We can't set from=2 because Stellar-Core will not allow starting from 1. + // To solve this we start from 3 and exploit the fast that Stellar-Core + // will stream data from 2 for the first checkpoint. + from = 3 } else { - // This is a workaround for now since we are not passing the ledger - // header hash. - // - // We need a way to get the hash from the previous ledger without having - // to rely on the history archive - // - // For now, we run stellar-core starting at the previous checkpoint - // ledger and then fast-forward to the desire ledger - // - // - runFrom = roundDownToFirstReplayAfterCheckpointStart(from) - 1 - ledgerHeader, err2 := c.archive.GetLedgerHeader(runFrom) - if err2 != nil { - err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", runFrom) - return + // For ledgers after the first checkpoint, start at the previous checkpoint + // and fast-forward from there. + if !historyarchive.IsCheckpoint(from) { + from = historyarchive.PrevCheckpoint(from) + } + // Streaming will start from the previous checkpoint + 1 + nextLedger = from - 63 + if nextLedger < 2 { + // Stellar-Core always streams from ledger 2 at min. + nextLedger = 2 } - ledgerHash = hex.EncodeToString(ledgerHeader.Hash[:]) - nextLedger = runFrom + 1 } + runFrom = from - 1 + ledgerHeader, err2 := c.archive.GetLedgerHeader(from) + if err2 != nil { + err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from) + return + } + ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:]) return } diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 705e55cd15..cce39fcf66 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -384,7 +384,7 @@ func TestCaptivePrepareRangeUnboundedRange_FromIsTooFarAheadOfLatestHAS(t *testi func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(127), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once() + mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once() mockRunner.On("close").Return(nil) mockArchive := &historyarchive.MockArchive{} @@ -434,13 +434,15 @@ func TestCaptivePrepareRangeUnboundedRange_ErrClosingExistingSession(t *testing. func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { var buf bytes.Buffer - for i := 60; i <= 65; i++ { + for i := 2; i <= 65; i++ { writeLedgerHeader(&buf, uint32(i)) } mockRunner := &stellarCoreRunnerMock{} + mockRunner.On("runFrom", uint32(62), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return(&buf) + mockRunner.On("getProcessExitChan").Return(make(chan error)) mockRunner.On("close").Return(nil) mockArchive := &historyarchive.MockArchive{} @@ -470,12 +472,12 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { func TestGetLatestLedgerSequence(t *testing.T) { var buf bytes.Buffer - for i := 64; i <= 99; i++ { + for i := 2; i <= 99; i++ { writeLedgerHeader(&buf, uint32(i)) } mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(62), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("getProcessExitChan").Return(make(chan error)) mockRunner.On("close").Return(nil).Once() @@ -505,8 +507,9 @@ func TestGetLatestLedgerSequence(t *testing.T) { latest, err := captiveBackend.GetLatestLedgerSequence() assert.NoError(t, err) - // readAheadBufferSize is 2 so 2 ledgers are buffered: 64 and 65 - assert.Equal(t, uint32(65), latest) + // readAheadBufferSize is 2 so 2 ledgers are buffered: 65 and 66. + // 64 is already read and in the cache. + assert.Equal(t, uint32(66), latest) exists, _, err := captiveBackend.GetLedger(64) assert.NoError(t, err) @@ -795,66 +798,56 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) { } func TestCaptiveRunFromParams(t *testing.T) { - tt := assert.New(t) - mockRunner := &stellarCoreRunnerMock{} - mockArchive := &historyarchive.MockArchive{} - mockArchive. - On("GetLedgerHeader", uint32(63)). - Return(xdr.LedgerHeaderHistoryEntry{ - Hash: xdr.Hash{1, 1, 1, 1}, - }, nil) - - captiveBackend := CaptiveStellarCore{ - archive: mockArchive, - networkPassphrase: network.PublicNetworkPassphrase, - stellarCoreRunner: mockRunner, + var tests = []struct { + from uint32 + runFrom uint32 + ledgerArchives uint32 + nextLedger uint32 + }{ + // Before and including 1st checkpoint: + {2, 2, 3, 2}, + {3, 2, 3, 2}, + {3, 2, 3, 2}, + {4, 2, 3, 2}, + {62, 2, 3, 2}, + {63, 2, 3, 2}, + + // Starting from 64 we go normal path: between 1st and 2nd checkpoint: + {64, 62, 63, 2}, + {65, 62, 63, 2}, + {66, 62, 63, 2}, + {126, 62, 63, 2}, + + // between 2nd and 3rd checkpoint... and so on. + {127, 126, 127, 64}, + {128, 126, 127, 64}, + {129, 126, 127, 64}, } - runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(70) - tt.NoError(err) - tt.Equal(uint32(63), runFrom) - tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) - tt.Equal(uint32(64), nextLedger) - - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(64) - tt.NoError(err) - tt.Equal(uint32(63), runFrom) - tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) - tt.Equal(uint32(64), nextLedger) - - mockArchive. - On("GetLedgerHeader", uint32(127)). - Return(xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - PreviousLedgerHash: xdr.Hash{1}, - }, - }, nil) - - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(127) - tt.NoError(err) - tt.Equal(uint32(126), runFrom) - tt.Equal("0100000000000000000000000000000000000000000000000000000000000000", ledgerHash) - tt.Equal(uint32(64), nextLedger) - - mockArchive. - On("GetLedgerHeader", uint32(319)). - Return(xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - PreviousLedgerHash: xdr.Hash{1}, - }, - }, errors.New("missing ledger checkpoint")) - - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(319) - tt.EqualError(err, "error trying to read ledger header 319 from HAS: missing ledger checkpoint") - - mockArchive. - On("GetLedgerHeader", uint32(191)). - Return(xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - PreviousLedgerHash: xdr.Hash{1}, - }, - }, errors.New("missing ledger checkpoint")) - - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(195) - tt.EqualError(err, "error trying to read ledger header 191 from HAS: missing ledger checkpoint") + for _, tc := range tests { + t.Run(fmt.Sprintf("from_%d", tc.from), func(t *testing.T) { + tt := assert.New(t) + mockRunner := &stellarCoreRunnerMock{} + mockArchive := &historyarchive.MockArchive{} + mockArchive. + On("GetLedgerHeader", uint32(tc.ledgerArchives)). + Return(xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + PreviousLedgerHash: xdr.Hash{1, 1, 1, 1}, + }, + }, nil) + + captiveBackend := CaptiveStellarCore{ + archive: mockArchive, + networkPassphrase: network.PublicNetworkPassphrase, + stellarCoreRunner: mockRunner, + } + + runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(tc.from) + tt.NoError(err) + tt.Equal(tc.runFrom, runFrom, "runFrom") + tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) + tt.Equal(tc.nextLedger, nextLedger, "nextLedger") + }) + } } diff --git a/ingest/ledgerbackend/ledger_backend.go b/ingest/ledgerbackend/ledger_backend.go index b34813f7ec..b2c65f0263 100644 --- a/ingest/ledgerbackend/ledger_backend.go +++ b/ingest/ledgerbackend/ledger_backend.go @@ -84,7 +84,7 @@ type LedgerBackend interface { GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) // PrepareRange prepares the given range (including from and to) to be loaded. // Some backends (like captive stellar-core) need to initalize data to be - // able to stream ledgers. + // able to stream ledgers. Blocks until the first ledger is available. PrepareRange(ledgerRange Range) error // IsPrepared returns true if a given ledgerRange is prepared. IsPrepared(ledgerRange Range) (bool, error)