From 16dd367283de8581a926cd6c3d80671bb4d2e344 Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Thu, 3 Dec 2020 14:31:14 +0100 Subject: [PATCH] Fix streaming from genesis ledger --- ingest/ledgerbackend/captive_core_backend.go | 27 +++++++--- .../captive_core_backend_test.go | 54 +++++++++++++++++-- ingest/ledgerbackend/stellar_core_runner.go | 23 ++++++++ 3 files changed, 94 insertions(+), 10 deletions(-) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index c047a7ad19..c5adcfdbce 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -254,12 +254,26 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { } } - runFrom, ledgerHash, nextLedger, err := c.runFromParams(from) - if err != nil { - return errors.Wrap(err, "error calculating ledger and hash for stelar-core run") - } + var nextLedger uint32 + if latestCheckpointSequence == 0 { + if c.log != nil { + c.log.Info("checkpoint ledger has not been published, attempting to run from genesis ledger") + } + // we haven't published the first checkpoint ledger yet + // so we can run stellar-core run --in-memory without providing a starting point + // and captive core will start streaming from the genesis ledger + err = c.stellarCoreRunner.run() + nextLedger = 2 + } else { + var runFrom uint32 + var ledgerHash string + runFrom, ledgerHash, nextLedger, err = c.runFromParams(from) + if err != nil { + return errors.Wrap(err, "error calculating ledger and hash for stelar-core run") + } - err = c.stellarCoreRunner.runFrom(runFrom, ledgerHash) + err = c.stellarCoreRunner.runFrom(runFrom, ledgerHash) + } if err != nil { return errors.Wrap(err, "error running stellar-core") } @@ -268,8 +282,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { c.lastLedger = nil if c.ledgerHashStore != nil { - var exists bool - ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(nextLedger - 1) + ledgerHash, exists, err := c.ledgerHashStore.GetLedgerHash(nextLedger - 1) if err != nil { return errors.Wrapf(err, "error trying to read ledger hash %d", nextLedger-1) } diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index ad30b47821..71c705523b 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -28,6 +28,11 @@ func (m *stellarCoreRunnerMock) catchup(from, to uint32) error { return a.Error(0) } +func (m *stellarCoreRunnerMock) run() error { + a := m.Called() + return a.Error(0) +} + func (m *stellarCoreRunnerMock) runFrom(from uint32, hash string) error { a := m.Called(from, hash) return a.Error(0) @@ -402,6 +407,47 @@ func TestCaptivePrepareRangeUnboundedRange_ErrGettingRootHAS(t *testing.T) { assert.EqualError(t, err, "opening subprocess: error getting latest checkpoint sequence: error getting root HAS: transient error") } +func TestCaptivePrepareRangeUnboundedRange_LatestCheckPointIsZero(t *testing.T) { + var buf bytes.Buffer + for i := 2; i <= 65; i++ { + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) + } + + mockRunner := &stellarCoreRunnerMock{} + mockRunner.On("run").Return(nil).Once() + mockRunner.On("getMetaPipe").Return(&buf) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) + mockRunner.On("close").Return(nil).Once() + + mockArchive := &historyarchive.MockArchive{} + mockArchive. + On("GetRootHAS"). + Return(historyarchive.HistoryArchiveState{ + CurrentLedger: uint32(0), + }, nil).Twice() + + captiveBackend := CaptiveStellarCore{ + archive: mockArchive, + configPath: "foo", + networkPassphrase: network.PublicNetworkPassphrase, + stellarCoreRunnerFactory: func(configPath string) (stellarCoreRunnerInterface, error) { + return mockRunner, nil + }, + } + + err := captiveBackend.PrepareRange(UnboundedRange(2)) + assert.NoError(t, err) + + assert.NoError(t, captiveBackend.Close()) + + mockRunner.On("run").Return(fmt.Errorf("transient error")).Once() + err = captiveBackend.PrepareRange(UnboundedRange(2)) + assert.EqualError(t, err, "opening subprocess: error running stellar-core: transient error") + + mockArchive.AssertExpectations(t) + mockRunner.AssertExpectations(t) +} + func TestCaptivePrepareRangeUnboundedRange_FromIsTooFarAheadOfLatestHAS(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("close").Return(nil) @@ -999,8 +1045,10 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { Return("", false, fmt.Errorf("transient error")).Once() mockLedgerHashStore.On("GetLedgerHash", uint32(254)). Return("", false, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", uint32(62)). + mockLedgerHashStore.On("GetLedgerHash", uint32(2)). Return("cde", true, nil).Once() + mockLedgerHashStore.On("GetLedgerHash", uint32(62)). + Return("jkl", true, nil).Once() mockLedgerHashStore.On("GetLedgerHash", uint32(126)). Return("ghi", true, nil).Once() @@ -1014,13 +1062,13 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(24) assert.NoError(t, err) assert.Equal(t, uint32(2), runFrom) - assert.Equal(t, "", ledgerHash) + assert.Equal(t, "cde", ledgerHash) assert.Equal(t, uint32(2), nextLedger) runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(86) assert.NoError(t, err) assert.Equal(t, uint32(62), runFrom) - assert.Equal(t, "cde", ledgerHash) + assert.Equal(t, "jkl", ledgerHash) assert.Equal(t, uint32(2), nextLedger) runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(128) diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 7066c47574..1921d8d537 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -20,6 +20,7 @@ import ( type stellarCoreRunnerInterface interface { catchup(from, to uint32) error + run() error runFrom(from uint32, hash string) error getMetaPipe() io.Reader // getProcessExitChan returns a channel that closes on process exit @@ -253,6 +254,28 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { return nil } +func (r *stellarCoreRunner) run() error { + if r.started { + return errors.New("runner already started") + } + var err error + r.cmd, err = r.createCmd( + "run", + "--in-memory", + "--metadata-output-stream", r.getPipeName(), + ) + if err != nil { + return errors.Wrap(err, "error creating `stellar-core run` subprocess") + } + r.metaPipe, err = r.start() + if err != nil { + return errors.Wrap(err, "error starting `stellar-core run` subprocess") + } + r.started = true + + return nil +} + func (r *stellarCoreRunner) getMetaPipe() io.Reader { return r.metaPipe }