Skip to content

Commit

Permalink
Fix streaming from genesis ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Dec 4, 2020
1 parent ab882e6 commit 16dd367
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 10 deletions.
27 changes: 20 additions & 7 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,26 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {
}
}

runFrom, ledgerHash, nextLedger, err := c.runFromParams(from)
if err != nil {
return errors.Wrap(err, "error calculating ledger and hash for stelar-core run")
}
var nextLedger uint32
if latestCheckpointSequence == 0 {
if c.log != nil {
c.log.Info("checkpoint ledger has not been published, attempting to run from genesis ledger")
}
// we haven't published the first checkpoint ledger yet
// so we can run stellar-core run --in-memory without providing a starting point
// and captive core will start streaming from the genesis ledger
err = c.stellarCoreRunner.run()
nextLedger = 2
} else {
var runFrom uint32
var ledgerHash string
runFrom, ledgerHash, nextLedger, err = c.runFromParams(from)
if err != nil {
return errors.Wrap(err, "error calculating ledger and hash for stelar-core run")
}

err = c.stellarCoreRunner.runFrom(runFrom, ledgerHash)
err = c.stellarCoreRunner.runFrom(runFrom, ledgerHash)
}
if err != nil {
return errors.Wrap(err, "error running stellar-core")
}
Expand All @@ -268,8 +282,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {
c.lastLedger = nil

if c.ledgerHashStore != nil {
var exists bool
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(nextLedger - 1)
ledgerHash, exists, err := c.ledgerHashStore.GetLedgerHash(nextLedger - 1)
if err != nil {
return errors.Wrapf(err, "error trying to read ledger hash %d", nextLedger-1)
}
Expand Down
54 changes: 51 additions & 3 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (m *stellarCoreRunnerMock) catchup(from, to uint32) error {
return a.Error(0)
}

func (m *stellarCoreRunnerMock) run() error {
a := m.Called()
return a.Error(0)
}

func (m *stellarCoreRunnerMock) runFrom(from uint32, hash string) error {
a := m.Called(from, hash)
return a.Error(0)
Expand Down Expand Up @@ -402,6 +407,47 @@ func TestCaptivePrepareRangeUnboundedRange_ErrGettingRootHAS(t *testing.T) {
assert.EqualError(t, err, "opening subprocess: error getting latest checkpoint sequence: error getting root HAS: transient error")
}

func TestCaptivePrepareRangeUnboundedRange_LatestCheckPointIsZero(t *testing.T) {
var buf bytes.Buffer
for i := 2; i <= 65; i++ {
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("run").Return(nil).Once()
mockRunner.On("getMetaPipe").Return(&buf)
mockRunner.On("getProcessExitChan").Return(make(chan struct{}))
mockRunner.On("close").Return(nil).Once()

mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetRootHAS").
Return(historyarchive.HistoryArchiveState{
CurrentLedger: uint32(0),
}, nil).Twice()

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
configPath: "foo",
networkPassphrase: network.PublicNetworkPassphrase,
stellarCoreRunnerFactory: func(configPath string) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
},
}

err := captiveBackend.PrepareRange(UnboundedRange(2))
assert.NoError(t, err)

assert.NoError(t, captiveBackend.Close())

mockRunner.On("run").Return(fmt.Errorf("transient error")).Once()
err = captiveBackend.PrepareRange(UnboundedRange(2))
assert.EqualError(t, err, "opening subprocess: error running stellar-core: transient error")

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
}

func TestCaptivePrepareRangeUnboundedRange_FromIsTooFarAheadOfLatestHAS(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("close").Return(nil)
Expand Down Expand Up @@ -999,8 +1045,10 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) {
Return("", false, fmt.Errorf("transient error")).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(254)).
Return("", false, nil).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(62)).
mockLedgerHashStore.On("GetLedgerHash", uint32(2)).
Return("cde", true, nil).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(62)).
Return("jkl", true, nil).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(126)).
Return("ghi", true, nil).Once()

Expand All @@ -1014,13 +1062,13 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) {
runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(24)
assert.NoError(t, err)
assert.Equal(t, uint32(2), runFrom)
assert.Equal(t, "", ledgerHash)
assert.Equal(t, "cde", ledgerHash)
assert.Equal(t, uint32(2), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(86)
assert.NoError(t, err)
assert.Equal(t, uint32(62), runFrom)
assert.Equal(t, "cde", ledgerHash)
assert.Equal(t, "jkl", ledgerHash)
assert.Equal(t, uint32(2), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(128)
Expand Down
23 changes: 23 additions & 0 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

type stellarCoreRunnerInterface interface {
catchup(from, to uint32) error
run() error
runFrom(from uint32, hash string) error
getMetaPipe() io.Reader
// getProcessExitChan returns a channel that closes on process exit
Expand Down Expand Up @@ -253,6 +254,28 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return nil
}

func (r *stellarCoreRunner) run() error {
if r.started {
return errors.New("runner already started")
}
var err error
r.cmd, err = r.createCmd(
"run",
"--in-memory",
"--metadata-output-stream", r.getPipeName(),
)
if err != nil {
return errors.Wrap(err, "error creating `stellar-core run` subprocess")
}
r.metaPipe, err = r.start()
if err != nil {
return errors.Wrap(err, "error starting `stellar-core run` subprocess")
}
r.started = true

return nil
}

func (r *stellarCoreRunner) getMetaPipe() io.Reader {
return r.metaPipe
}
Expand Down

0 comments on commit 16dd367

Please sign in to comment.