Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Refactor stellarCoreRunner #4480

Merged
merged 9 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type CaptiveStellarCore struct {
stellarCoreLock sync.RWMutex

// For testing
stellarCoreRunnerFactory func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error)
stellarCoreRunnerFactory func() stellarCoreRunnerInterface

// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta
Expand Down Expand Up @@ -175,8 +175,8 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency),
}

c.stellarCoreRunnerFactory = func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return newStellarCoreRunner(config, mode)
c.stellarCoreRunnerFactory = func() stellarCoreRunnerInterface {
return newStellarCoreRunner(config)
}
return c, nil
}
Expand Down Expand Up @@ -212,15 +212,7 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error
)
}

var runner stellarCoreRunnerInterface
if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOffline); err != nil {
return errors.Wrap(err, "error creating stellar-core runner")
} else {
// only assign c.stellarCoreRunner if runner is not nil to avoid nil interface check
// see https://golang.org/doc/faq#nil_error
c.stellarCoreRunner = runner
}

c.stellarCoreRunner = c.stellarCoreRunnerFactory()
err = c.stellarCoreRunner.catchup(from, to)
if err != nil {
return errors.Wrap(err, "error running stellar-core")
Expand Down Expand Up @@ -256,12 +248,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
)
}

var runner stellarCoreRunnerInterface
if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOnline); err != nil {
return errors.Wrap(err, "error creating stellar-core runner")
}
c.stellarCoreRunner = runner

c.stellarCoreRunner = c.stellarCoreRunnerFactory()
runFrom, ledgerHash, err := c.runFromParams(ctx, from)
if err != nil {
return errors.Wrap(err, "error calculating ledger and hash for stellar-core run")
Expand Down
76 changes: 38 additions & 38 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ func TestCaptivePrepareRange(t *testing.T) {
cancelCalled := false
captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
cancel: context.CancelFunc(func() {
Expand Down Expand Up @@ -222,8 +222,8 @@ func TestCaptivePrepareRangeCrash(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -261,8 +261,8 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -300,8 +300,8 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -393,8 +393,8 @@ func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -422,8 +422,8 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) {
cancelCalled := false
captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
cancel: context.CancelFunc(func() {
cancelCalled = true
Expand Down Expand Up @@ -461,8 +461,8 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) {
cancelCalled := false
captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
cancel: context.CancelFunc(func() {
Expand Down Expand Up @@ -513,8 +513,8 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -561,8 +561,8 @@ func TestGetLatestLedgerSequence(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -606,8 +606,8 @@ func TestCaptiveGetLedger(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -704,8 +704,8 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -759,8 +759,8 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T)

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -809,8 +809,8 @@ func TestCaptiveGetLedger_NextLedger0RangeFromIsSmallerThanLedgerFromBuffer(t *t

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -910,8 +910,8 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -963,8 +963,8 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -1005,8 +1005,8 @@ func TestCaptiveAfterClose(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
cancel: cancel,
Expand Down Expand Up @@ -1059,8 +1059,8 @@ func TestGetLedgerBoundsCheck(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -1161,8 +1161,8 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
checkpointManager: historyarchive.NewCheckpointManager(64),
}
Expand Down Expand Up @@ -1444,8 +1444,8 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) {

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
stellarCoreRunnerFactory: func() stellarCoreRunnerInterface {
return mockRunner
},
ledgerHashStore: mockLedgerHashStore,
checkpointManager: historyarchive.NewCheckpointManager(64),
Expand Down
10 changes: 4 additions & 6 deletions ingest/ledgerbackend/file_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ func createFWFixtures(t *testing.T) (*mockHash, *stellarCoreRunner, *fileWatcher
captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

runner, err := newStellarCoreRunner(CaptiveCoreConfig{
runner := newStellarCoreRunner(CaptiveCoreConfig{
BinaryPath: "/some/path",
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: storagePath,
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)
})

fw, err := newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond)
assert.NoError(t, err)
Expand All @@ -90,15 +89,14 @@ func TestNewFileWatcherError(t *testing.T) {
captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

runner, err := newStellarCoreRunner(CaptiveCoreConfig{
runner := newStellarCoreRunner(CaptiveCoreConfig{
BinaryPath: "/some/path",
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: storagePath,
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)
})

_, err = newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond)
assert.EqualError(t, err, "could not hash captive core binary: test error")
Expand Down
90 changes: 90 additions & 0 deletions ingest/ledgerbackend/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package ledgerbackend

import (
"io"
"io/fs"
"io/ioutil"
"os"
"os/exec"
)

type isDir interface {
IsDir() bool
}

type systemCaller interface {
removeAll(path string) error
writeFile(filename string, data []byte, perm fs.FileMode) error
mkdirAll(path string, perm os.FileMode) error
stat(name string) (isDir, error)
command(name string, arg ...string) cmdI
}

type realSystemCaller struct{}

func (realSystemCaller) removeAll(path string) error {
return os.RemoveAll(path)
}

func (realSystemCaller) writeFile(filename string, data []byte, perm fs.FileMode) error {
return ioutil.WriteFile(filename, data, perm)
}

func (realSystemCaller) mkdirAll(path string, perm os.FileMode) error {
return os.MkdirAll(path, perm)
}

func (realSystemCaller) stat(name string) (isDir, error) {
return os.Stat(name)
}

func (realSystemCaller) command(name string, arg ...string) cmdI {
cmd := exec.Command(name, arg...)
return &realCmd{cmd}
}

type cmdI interface {
Output() ([]byte, error)
Wait() error
Start() error
Run() error
setDir(dir string)
setStdout(stdout io.Writer)
getStdout() io.Writer
setStderr(stderr io.Writer)
getStderr() io.Writer
getProcess() *os.Process
setExtraFiles([]*os.File)
}

type realCmd struct {
*exec.Cmd
}

func (r *realCmd) setDir(dir string) {
r.Cmd.Dir = dir
}

func (r *realCmd) setStdout(stdout io.Writer) {
r.Cmd.Stdout = stdout
}

func (r *realCmd) getStdout() io.Writer {
return r.Cmd.Stdout
}

func (r *realCmd) setStderr(stderr io.Writer) {
r.Cmd.Stderr = stderr
}

func (r *realCmd) getStderr() io.Writer {
return r.Cmd.Stderr
}

func (r *realCmd) getProcess() *os.Process {
return r.Cmd.Process
}

func (r *realCmd) setExtraFiles(extraFiles []*os.File) {
r.ExtraFiles = extraFiles
}
Loading