diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 7543abb660..8fe5a46f3b 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -83,7 +83,7 @@ type CaptiveStellarCore struct { stellarCoreLock sync.RWMutex // For testing - stellarCoreRunnerFactory func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) + stellarCoreRunnerFactory func() stellarCoreRunnerInterface // cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger(). cachedMeta *xdr.LedgerCloseMeta @@ -175,8 +175,8 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency), } - c.stellarCoreRunnerFactory = func(mode stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return newStellarCoreRunner(config, mode) + c.stellarCoreRunnerFactory = func() stellarCoreRunnerInterface { + return newStellarCoreRunner(config) } return c, nil } @@ -212,15 +212,7 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error ) } - var runner stellarCoreRunnerInterface - if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOffline); err != nil { - return errors.Wrap(err, "error creating stellar-core runner") - } else { - // only assign c.stellarCoreRunner if runner is not nil to avoid nil interface check - // see https://golang.org/doc/faq#nil_error - c.stellarCoreRunner = runner - } - + c.stellarCoreRunner = c.stellarCoreRunnerFactory() err = c.stellarCoreRunner.catchup(from, to) if err != nil { return errors.Wrap(err, "error running stellar-core") @@ -256,12 +248,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro ) } - var runner stellarCoreRunnerInterface - if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOnline); err != nil { - return errors.Wrap(err, "error creating stellar-core runner") - } - c.stellarCoreRunner = runner - + c.stellarCoreRunner = c.stellarCoreRunnerFactory() runFrom, ledgerHash, err := c.runFromParams(ctx, from) if err != nil { return errors.Wrap(err, "error calculating ledger and hash for stellar-core run") diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 6138bb1293..f0d7d30316 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -183,8 +183,8 @@ func TestCaptivePrepareRange(t *testing.T) { cancelCalled := false captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), cancel: context.CancelFunc(func() { @@ -222,8 +222,8 @@ func TestCaptivePrepareRangeCrash(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -261,8 +261,8 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -300,8 +300,8 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -393,8 +393,8 @@ func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -422,8 +422,8 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { cancelCalled := false captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, cancel: context.CancelFunc(func() { cancelCalled = true @@ -461,8 +461,8 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { cancelCalled := false captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), cancel: context.CancelFunc(func() { @@ -513,8 +513,8 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -561,8 +561,8 @@ func TestGetLatestLedgerSequence(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -606,8 +606,8 @@ func TestCaptiveGetLedger(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -704,8 +704,8 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -759,8 +759,8 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -809,8 +809,8 @@ func TestCaptiveGetLedger_NextLedger0RangeFromIsSmallerThanLedgerFromBuffer(t *t captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -910,8 +910,8 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -963,8 +963,8 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -1005,8 +1005,8 @@ func TestCaptiveAfterClose(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), cancel: cancel, @@ -1059,8 +1059,8 @@ func TestGetLedgerBoundsCheck(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -1161,8 +1161,8 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, checkpointManager: historyarchive.NewCheckpointManager(64), } @@ -1444,8 +1444,8 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { captiveBackend := CaptiveStellarCore{ archive: mockArchive, - stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { - return mockRunner, nil + stellarCoreRunnerFactory: func() stellarCoreRunnerInterface { + return mockRunner }, ledgerHashStore: mockLedgerHashStore, checkpointManager: historyarchive.NewCheckpointManager(64), diff --git a/ingest/ledgerbackend/file_watcher_test.go b/ingest/ledgerbackend/file_watcher_test.go index 837ab1166b..7e84bbfcf2 100644 --- a/ingest/ledgerbackend/file_watcher_test.go +++ b/ingest/ledgerbackend/file_watcher_test.go @@ -58,15 +58,14 @@ func createFWFixtures(t *testing.T) (*mockHash, *stellarCoreRunner, *fileWatcher captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) - runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + runner := newStellarCoreRunner(CaptiveCoreConfig{ BinaryPath: "/some/path", HistoryArchiveURLs: []string{"http://localhost"}, Log: log.New(), Context: context.Background(), Toml: captiveCoreToml, StoragePath: storagePath, - }, stellarCoreRunnerModeOffline) - assert.NoError(t, err) + }) fw, err := newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond) assert.NoError(t, err) @@ -90,15 +89,14 @@ func TestNewFileWatcherError(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) - runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + runner := newStellarCoreRunner(CaptiveCoreConfig{ BinaryPath: "/some/path", HistoryArchiveURLs: []string{"http://localhost"}, Log: log.New(), Context: context.Background(), Toml: captiveCoreToml, StoragePath: storagePath, - }, stellarCoreRunnerModeOffline) - assert.NoError(t, err) + }) _, err = newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond) assert.EqualError(t, err, "could not hash captive core binary: test error") diff --git a/ingest/ledgerbackend/main.go b/ingest/ledgerbackend/main.go new file mode 100644 index 0000000000..4a5d119de2 --- /dev/null +++ b/ingest/ledgerbackend/main.go @@ -0,0 +1,90 @@ +package ledgerbackend + +import ( + "io" + "io/fs" + "io/ioutil" + "os" + "os/exec" +) + +type isDir interface { + IsDir() bool +} + +type systemCaller interface { + removeAll(path string) error + writeFile(filename string, data []byte, perm fs.FileMode) error + mkdirAll(path string, perm os.FileMode) error + stat(name string) (isDir, error) + command(name string, arg ...string) cmdI +} + +type realSystemCaller struct{} + +func (realSystemCaller) removeAll(path string) error { + return os.RemoveAll(path) +} + +func (realSystemCaller) writeFile(filename string, data []byte, perm fs.FileMode) error { + return ioutil.WriteFile(filename, data, perm) +} + +func (realSystemCaller) mkdirAll(path string, perm os.FileMode) error { + return os.MkdirAll(path, perm) +} + +func (realSystemCaller) stat(name string) (isDir, error) { + return os.Stat(name) +} + +func (realSystemCaller) command(name string, arg ...string) cmdI { + cmd := exec.Command(name, arg...) + return &realCmd{cmd} +} + +type cmdI interface { + Output() ([]byte, error) + Wait() error + Start() error + Run() error + setDir(dir string) + setStdout(stdout io.Writer) + getStdout() io.Writer + setStderr(stderr io.Writer) + getStderr() io.Writer + getProcess() *os.Process + setExtraFiles([]*os.File) +} + +type realCmd struct { + *exec.Cmd +} + +func (r *realCmd) setDir(dir string) { + r.Cmd.Dir = dir +} + +func (r *realCmd) setStdout(stdout io.Writer) { + r.Cmd.Stdout = stdout +} + +func (r *realCmd) getStdout() io.Writer { + return r.Cmd.Stdout +} + +func (r *realCmd) setStderr(stderr io.Writer) { + r.Cmd.Stderr = stderr +} + +func (r *realCmd) getStderr() io.Writer { + return r.Cmd.Stderr +} + +func (r *realCmd) getProcess() *os.Process { + return r.Cmd.Process +} + +func (r *realCmd) setExtraFiles(extraFiles []*os.File) { + r.ExtraFiles = extraFiles +} diff --git a/ingest/ledgerbackend/mock_cmd_test.go b/ingest/ledgerbackend/mock_cmd_test.go new file mode 100644 index 0000000000..a1d280421a --- /dev/null +++ b/ingest/ledgerbackend/mock_cmd_test.go @@ -0,0 +1,77 @@ +package ledgerbackend + +import ( + "io" + "os" + + "github.com/stretchr/testify/mock" +) + +type mockCmd struct { + mock.Mock +} + +func (m *mockCmd) Output() ([]byte, error) { + args := m.Called() + return args.Get(0).([]byte), args.Error(1) +} + +func (m *mockCmd) Wait() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockCmd) Start() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockCmd) Run() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockCmd) setDir(dir string) { + m.Called(dir) +} + +func (m *mockCmd) setStdout(stdout io.Writer) { + m.Called(stdout) +} + +func (m *mockCmd) getStdout() io.Writer { + args := m.Called() + return args.Get(0).(io.Writer) +} + +func (m *mockCmd) setStderr(stderr io.Writer) { + m.Called(stderr) +} + +func (m *mockCmd) getStderr() io.Writer { + args := m.Called() + return args.Get(0).(io.Writer) +} + +func (m *mockCmd) getProcess() *os.Process { + args := m.Called() + return args.Get(0).(*os.Process) +} + +func (m *mockCmd) setExtraFiles(files []*os.File) { + m.Called(files) +} + +func simpleCommandMock() *mockCmd { + _, writer := io.Pipe() + cmdMock := &mockCmd{} + cmdMock.On("setDir", mock.Anything) + cmdMock.On("setStdout", mock.Anything) + cmdMock.On("getStdout").Return(writer) + cmdMock.On("setStderr", mock.Anything) + cmdMock.On("getStderr").Return(writer) + cmdMock.On("getProcess").Return(&os.Process{}) + cmdMock.On("setExtraFiles", mock.Anything) + cmdMock.On("Start").Return(nil) + return cmdMock +} diff --git a/ingest/ledgerbackend/mock_system_caller_test.go b/ingest/ledgerbackend/mock_system_caller_test.go new file mode 100644 index 0000000000..99e1faede9 --- /dev/null +++ b/ingest/ledgerbackend/mock_system_caller_test.go @@ -0,0 +1,47 @@ +package ledgerbackend + +import ( + "io/fs" + "os" + + "github.com/stretchr/testify/mock" +) + +type isDirImpl bool + +func (i isDirImpl) IsDir() bool { + return bool(i) +} + +type mockSystemCaller struct { + mock.Mock +} + +func (m *mockSystemCaller) removeAll(path string) error { + args := m.Called(path) + return args.Error(0) +} + +func (m *mockSystemCaller) writeFile(filename string, data []byte, perm fs.FileMode) error { + args := m.Called(filename, data, perm) + return args.Error(0) +} + +func (m *mockSystemCaller) mkdirAll(path string, perm os.FileMode) error { + args := m.Called(path, perm) + return args.Error(0) +} + +func (m *mockSystemCaller) stat(name string) (isDir, error) { + args := m.Called(name) + return args.Get(0).(isDir), args.Error(1) +} + +func (m *mockSystemCaller) command(name string, arg ...string) cmdI { + a := []interface{}{name} + for _, ar := range arg { + a = append(a, ar) + } + args := m.Called(a...) + return args.Get(0).(cmdI) +} diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 0cf4a63882..1bf2870810 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -6,10 +6,8 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "math/rand" "os" - "os/exec" "path" "path/filepath" "regexp" @@ -35,7 +33,8 @@ type stellarCoreRunnerInterface interface { type stellarCoreRunnerMode int const ( - stellarCoreRunnerModeOnline stellarCoreRunnerMode = iota + _ stellarCoreRunnerMode = iota // unset + stellarCoreRunnerModeOnline stellarCoreRunnerModeOffline ) @@ -55,7 +54,7 @@ type stellarCoreRunner struct { executablePath string started bool - cmd *exec.Cmd + cmd cmdI wg sync.WaitGroup ctx context.Context cancel context.CancelFunc @@ -63,6 +62,8 @@ type stellarCoreRunner struct { pipe pipe mode stellarCoreRunnerMode + systemCaller systemCaller + lock sync.Mutex processExited bool processExitError error @@ -84,71 +85,62 @@ func createRandomHexString(n int) string { return string(b) } -func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode) (*stellarCoreRunner, error) { - var fullStoragePath string - if runtime.GOOS == "windows" || mode == stellarCoreRunnerModeOffline { - // On Windows, first we ALWAYS append something to the base storage path, - // because we will delete the directory entirely when Horizon stops. We also - // add a random suffix in order to ensure that there aren't naming - // conflicts. - // This is done because it's impossible to send SIGINT on Windows so - // buckets can become corrupted. - // We also want to use random directories in offline mode (reingestion) - // because it's possible it's running multiple Stellar-Cores on a single - // machine. - fullStoragePath = path.Join(config.StoragePath, "captive-core-"+createRandomHexString(8)) - } else { - // Use the specified directory to store Captive Core's data: - // https://github.com/stellar/go/issues/3437 - // but be sure to re-use rather than replace it: - // https://github.com/stellar/go/issues/3631 - fullStoragePath = path.Join(config.StoragePath, "captive-core") - } - - err := createCheckDirectory(fullStoragePath) - if err != nil { - return nil, err - } - +func newStellarCoreRunner(config CaptiveCoreConfig) *stellarCoreRunner { ctx, cancel := context.WithCancel(config.Context) runner := &stellarCoreRunner{ executablePath: config.BinaryPath, ctx: ctx, cancel: cancel, - storagePath: fullStoragePath, + storagePath: config.StoragePath, useDB: config.UseDB, - mode: mode, nonce: fmt.Sprintf( "captive-stellar-core-%x", rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(), ), log: config.Log, toml: config.Toml, + + systemCaller: realSystemCaller{}, } - if conf, err := runner.writeConf(); err != nil { - return nil, errors.Wrap(err, "error writing configuration") + return runner +} + +func (r *stellarCoreRunner) getFullStoragePath() string { + if runtime.GOOS == "windows" || r.mode == stellarCoreRunnerModeOffline { + // On Windows, first we ALWAYS append something to the base storage path, + // because we will delete the directory entirely when Horizon stops. We also + // add a random suffix in order to ensure that there aren't naming + // conflicts. + // This is done because it's impossible to send SIGINT on Windows so + // buckets can become corrupted. + // We also want to use random directories in offline mode (reingestion) + // because it's possible it's running multiple Stellar-Cores on a single + // machine. + return path.Join(r.storagePath, "captive-core-"+createRandomHexString(8)) } else { - runner.log.Debugf("captive core config file contents:\n%s", conf) + // Use the specified directory to store Captive Core's data: + // https://github.com/stellar/go/issues/3437 + // but be sure to re-use rather than replace it: + // https://github.com/stellar/go/issues/3631 + return path.Join(r.storagePath, "captive-core") } - - return runner, nil } -func createCheckDirectory(fullStoragePath string) error { - info, err := os.Stat(fullStoragePath) +func (r *stellarCoreRunner) establishStorageDirectory() error { + info, err := r.systemCaller.stat(r.storagePath) if os.IsNotExist(err) { - innerErr := os.MkdirAll(fullStoragePath, os.FileMode(int(0755))) // rwx|rx|rx + innerErr := r.systemCaller.mkdirAll(r.storagePath, os.FileMode(int(0755))) // rwx|rx|rx if innerErr != nil { return errors.Wrap(innerErr, fmt.Sprintf( - "failed to create storage directory (%s)", fullStoragePath)) + "failed to create storage directory (%s)", r.storagePath)) } } else if !info.IsDir() { - return errors.New(fmt.Sprintf("%s is not a directory", fullStoragePath)) + return errors.New(fmt.Sprintf("%s is not a directory", r.storagePath)) } else if err != nil { return errors.Wrap(err, fmt.Sprintf( - "error accessing storage directory (%s)", fullStoragePath)) + "error accessing storage directory (%s)", r.storagePath)) } return nil @@ -160,7 +152,7 @@ func (r *stellarCoreRunner) writeConf() (string, error) { return "", err } - return string(text), ioutil.WriteFile(r.getConfFileName(), text, 0644) + return string(text), r.systemCaller.writeFile(r.getConfFileName(), text, 0644) } func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) { @@ -249,8 +241,8 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer { func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) { allParams := []string{"--conf", r.getConfFileName(), "offline-info"} - cmd := exec.Command(r.executablePath, allParams...) - cmd.Dir = r.storagePath + cmd := r.systemCaller.command(r.executablePath, allParams...) + cmd.setDir(r.storagePath) output, err := cmd.Output() if err != nil { return stellarcore.InfoResponse{}, errors.Wrap(err, "error executing offline-info cmd") @@ -263,13 +255,24 @@ func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) { return info, nil } -func (r *stellarCoreRunner) createCmd(params ...string) *exec.Cmd { +func (r *stellarCoreRunner) createCmd(params ...string) (cmdI, error) { + err := r.establishStorageDirectory() + if err != nil { + return nil, err + } + + if conf, err := r.writeConf(); err != nil { + return nil, errors.Wrap(err, "error writing configuration") + } else { + r.log.Debugf("captive core config file contents:\n%s", conf) + } + allParams := append([]string{"--conf", r.getConfFileName()}, params...) - cmd := exec.Command(r.executablePath, allParams...) - cmd.Dir = r.storagePath - cmd.Stdout = r.getLogLineWriter() - cmd.Stderr = r.getLogLineWriter() - return cmd + cmd := r.systemCaller.command(r.executablePath, allParams...) + cmd.setDir(r.storagePath) + cmd.setStdout(r.getLogLineWriter()) + cmd.setStderr(r.getLogLineWriter()) + return cmd, nil } // context returns the context.Context instance associated with the running captive core instance @@ -282,6 +285,9 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { r.lock.Lock() defer r.lock.Unlock() + r.mode = stellarCoreRunnerModeOffline + r.storagePath = r.getFullStoragePath() + // check if we have already been closed if r.ctx.Err() != nil { return r.ctx.Err() @@ -300,16 +306,23 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { // when using external storage of ledgers, use new-db to first set the state of // remote db storage to genesis to purge any prior state and reset. if r.useDB { - if err := r.createCmd("new-db").Run(); err != nil { + cmd, err := r.createCmd("new-db") + if err != nil { + return errors.Wrap(err, "error creating command") + } + if err := cmd.Run(); err != nil { return errors.Wrap(err, "error initializing core db") } } else { params = append(params, "--in-memory") } - r.cmd = r.createCmd(params...) - var err error + r.cmd, err = r.createCmd(params...) + if err != nil { + return errors.Wrap(err, "error creating command") + } + r.pipe, err = r.start(r.cmd) if err != nil { r.closeLogLineWriters(r.cmd) @@ -337,6 +350,9 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { r.lock.Lock() defer r.lock.Unlock() + r.mode = stellarCoreRunnerModeOnline + r.storagePath = r.getFullStoragePath() + // check if we have already been closed if r.ctx.Err() != nil { return r.ctx.Err() @@ -346,11 +362,14 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { return errors.New("runner already started") } + var err error + if r.useDB { // Check if on-disk core DB exists and what's the LCL there. If not what // we need remove storage dir and start from scratch. removeStorageDir := false - info, err := r.offlineInfo() + var info stellarcore.InfoResponse + info, err = r.offlineInfo() if err != nil { r.log.Infof("Error running offline-info: %v, removing existing storage-dir contents", err) removeStorageDir = true @@ -360,40 +379,44 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { } if removeStorageDir { - if err = os.RemoveAll(r.storagePath); err != nil { + if err = r.systemCaller.removeAll(r.storagePath); err != nil { return errors.Wrap(err, "error removing existing storage-dir contents") } - if err = createCheckDirectory(r.storagePath); err != nil { - return err - } - - if _, err = r.writeConf(); err != nil { - return errors.Wrap(err, "error writing configuration") + var cmd cmdI + cmd, err = r.createCmd("new-db") + if err != nil { + return errors.Wrap(err, "error creating command") } - if err = r.createCmd("new-db").Run(); err != nil { + if err = cmd.Run(); err != nil { return errors.Wrap(err, "error initializing core db") } // Do a quick catch-up to set the LCL in core to be our expected starting // point. if from > 2 { - if err = r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil { - return errors.Wrap(err, "error runing stellar-core catchup") - } - } else if err = r.createCmd("catchup", "2/0").Run(); err != nil { + cmd, err = r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)) + } else { + cmd, err = r.createCmd("catchup", "2/0") + } + + if err != nil { + return errors.Wrap(err, "error creating command") + } + + if err = cmd.Run(); err != nil { return errors.Wrap(err, "error runing stellar-core catchup") } } - r.cmd = r.createCmd( + r.cmd, err = r.createCmd( "run", "--metadata-output-stream", r.getPipeName(), ) } else { - r.cmd = r.createCmd( + r.cmd, err = r.createCmd( "run", "--in-memory", "--start-at-ledger", fmt.Sprintf("%d", from), @@ -402,7 +425,10 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { ) } - var err error + if err != nil { + return errors.Wrap(err, "error creating command") + } + r.pipe, err = r.start(r.cmd) if err != nil { r.closeLogLineWriters(r.cmd) @@ -446,7 +472,7 @@ func (r *stellarCoreRunner) handleExit() { case <-r.ctx.Done(): } - err := r.cmd.Process.Signal(interrupt) + err := r.cmd.getProcess().Signal(interrupt) if err == nil { err = r.ctx.Err() // Report ctx.Err() as the reason we interrupted. } else if err.Error() == "os: process already finished" { @@ -470,7 +496,7 @@ func (r *stellarCoreRunner) handleExit() { // Ignore any error: if cmd.Process has already terminated, we still // want to send ctx.Err() (or the error from the Interrupt call) // to properly attribute the signal that may have terminated it. - _ = r.cmd.Process.Kill() + _ = r.cmd.getProcess().Kill() errc <- err }() @@ -497,9 +523,9 @@ func (r *stellarCoreRunner) handleExit() { } // closeLogLineWriters closes the go routines created by getLogLineWriter() -func (r *stellarCoreRunner) closeLogLineWriters(cmd *exec.Cmd) { - cmd.Stdout.(*io.PipeWriter).Close() - cmd.Stderr.(*io.PipeWriter).Close() +func (r *stellarCoreRunner) closeLogLineWriters(cmd cmdI) { + cmd.getStdout().(*io.PipeWriter).Close() + cmd.getStderr().(*io.PipeWriter).Close() } // getMetaPipe returns a channel which contains ledgers streamed from the captive core subprocess @@ -557,15 +583,15 @@ func (r *stellarCoreRunner) close() error { r.pipe.Reader.Close() } - if runtime.GOOS == "windows" || + if r.mode != 0 && (runtime.GOOS == "windows" || (r.processExitError != nil && r.processExitError != context.Canceled) || - r.mode == stellarCoreRunnerModeOffline { + r.mode == stellarCoreRunnerModeOffline) { // It's impossible to send SIGINT on Windows so buckets can become // corrupted. If we can't reuse it, then remove it. // We also remove the storage path if there was an error terminating the // process (files can be corrupted). // We remove all files when reingesting to save disk space. - return os.RemoveAll(storagePath) + return r.systemCaller.removeAll(storagePath) } return nil diff --git a/ingest/ledgerbackend/stellar_core_runner_posix.go b/ingest/ledgerbackend/stellar_core_runner_posix.go index 2b0f2b4115..6f34a49a75 100644 --- a/ingest/ledgerbackend/stellar_core_runner_posix.go +++ b/ingest/ledgerbackend/stellar_core_runner_posix.go @@ -5,7 +5,6 @@ package ledgerbackend import ( "os" - "os/exec" "github.com/pkg/errors" ) @@ -19,7 +18,7 @@ func (c *stellarCoreRunner) getPipeName() string { return "fd:3" } -func (c *stellarCoreRunner) start(cmd *exec.Cmd) (pipe, error) { +func (c *stellarCoreRunner) start(cmd cmdI) (pipe, error) { // First make an anonymous pipe. // Note io.File objects close-on-finalization. readFile, writeFile, err := os.Pipe() @@ -30,7 +29,7 @@ func (c *stellarCoreRunner) start(cmd *exec.Cmd) (pipe, error) { // Add the write-end to the set of inherited file handles. This is defined // to be fd 3 on posix platforms. - cmd.ExtraFiles = []*os.File{writeFile} + cmd.setExtraFiles([]*os.File{writeFile}) err = cmd.Start() if err != nil { writeFile.Close() diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index 54f13e05de..ef8a9286cc 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -2,107 +2,272 @@ package ledgerbackend import ( "context" - "os" + "encoding/json" "testing" + "time" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/mock" + "github.com/stellar/go/protocols/stellarcore" + "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" ) -func TestCloseBeforeStartOffline(t *testing.T) { - storagePath, err := os.MkdirTemp("", "captive-core-*") - require.NoError(t, err) - defer os.RemoveAll(storagePath) - +func TestCloseOffline(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) - runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + runner := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/usr/bin/stellar-core", HistoryArchiveURLs: []string{"http://localhost"}, Log: log.New(), Context: context.Background(), Toml: captiveCoreToml, - StoragePath: storagePath, - }, stellarCoreRunnerModeOffline) - assert.NoError(t, err) + StoragePath: "/tmp/captive-core", + }) - tempDir := runner.storagePath - info, err := os.Stat(tempDir) - assert.NoError(t, err) - assert.True(t, info.IsDir()) + cmdMock := simpleCommandMock() + cmdMock.On("Wait").Return(nil) - assert.NoError(t, runner.close()) + // Replace system calls with a mock + scMock := &mockSystemCaller{} + defer scMock.AssertExpectations(t) + scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) + scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "catchup", + "200/101", + "--metadata-output-stream", + "fd:3", + "--in-memory", + ).Return(cmdMock) + scMock.On("removeAll", mock.Anything).Return(nil) + runner.systemCaller = scMock - // Directory cleaned up on shutdown when reingesting to save space - _, err = os.Stat(tempDir) - assert.Error(t, err) - assert.Contains(t, err.Error(), "no such file or directory") + assert.NoError(t, runner.catchup(100, 200)) + assert.NoError(t, runner.close()) } -func TestCloseBeforeStartOnline(t *testing.T) { - storagePath, err := os.MkdirTemp("", "captive-core-*") - require.NoError(t, err) - defer os.RemoveAll(storagePath) - +func TestCloseOnline(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) captiveCoreToml.AddExamplePubnetValidators() - runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + runner := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/usr/bin/stellar-core", HistoryArchiveURLs: []string{"http://localhost"}, Log: log.New(), Context: context.Background(), Toml: captiveCoreToml, - StoragePath: storagePath, - }, stellarCoreRunnerModeOnline) - assert.NoError(t, err) + StoragePath: "/tmp/captive-core", + }) - tempDir := runner.storagePath - info, err := os.Stat(tempDir) - assert.NoError(t, err) - assert.True(t, info.IsDir()) + cmdMock := simpleCommandMock() + cmdMock.On("Wait").Return(nil) + + // Replace system calls with a mock + scMock := &mockSystemCaller{} + defer scMock.AssertExpectations(t) + scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) + scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "run", + "--in-memory", + "--start-at-ledger", + "100", + "--start-at-hash", + "hash", + "--metadata-output-stream", + "fd:3", + ).Return(cmdMock) + runner.systemCaller = scMock + assert.NoError(t, runner.runFrom(100, "hash")) assert.NoError(t, runner.close()) +} - // Directory no longer cleaned up on shutdown (perf. bump in v2.5.0) - _, err = os.Stat(tempDir) +func TestCloseOnlineWithError(t *testing.T) { + captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) -} -func TestCloseBeforeStartOnlineWithError(t *testing.T) { - storagePath, err := os.MkdirTemp("", "captive-core-*") - require.NoError(t, err) - defer os.RemoveAll(storagePath) + captiveCoreToml.AddExamplePubnetValidators() + + runner := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/usr/bin/stellar-core", + HistoryArchiveURLs: []string{"http://localhost"}, + Log: log.New(), + Context: context.Background(), + Toml: captiveCoreToml, + StoragePath: "/tmp/captive-core", + }) + + cmdMock := simpleCommandMock() + cmdMock.On("Wait").Return(errors.New("wait error")) + + // Replace system calls with a mock + scMock := &mockSystemCaller{} + defer scMock.AssertExpectations(t) + scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) + scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "run", + "--in-memory", + "--start-at-ledger", + "100", + "--start-at-hash", + "hash", + "--metadata-output-stream", + "fd:3", + ).Return(cmdMock) + scMock.On("removeAll", mock.Anything).Return(nil) + runner.systemCaller = scMock + + assert.NoError(t, runner.runFrom(100, "hash")) + + // Wait with calling close until r.processExitError is set to Wait() error + for { + _, err := runner.getProcessExitError() + if err != nil { + break + } + time.Sleep(10 * time.Millisecond) + } + assert.NoError(t, runner.close()) +} +func TestRunFromUseDBLedgersMatch(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) captiveCoreToml.AddExamplePubnetValidators() - runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + runner := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/usr/bin/stellar-core", HistoryArchiveURLs: []string{"http://localhost"}, Log: log.New(), Context: context.Background(), Toml: captiveCoreToml, - StoragePath: storagePath, - }, stellarCoreRunnerModeOnline) - assert.NoError(t, err) + StoragePath: "/tmp/captive-core", + UseDB: true, + }) - runner.processExitError = errors.New("some error") + cmdMock := simpleCommandMock() + cmdMock.On("Wait").Return(nil) - tempDir := runner.storagePath - info, err := os.Stat(tempDir) + offlineInfoCmdMock := simpleCommandMock() + infoResponse := stellarcore.InfoResponse{} + infoResponse.Info.Ledger.Num = 100 + infoResponseBytes, err := json.Marshal(infoResponse) assert.NoError(t, err) - assert.True(t, info.IsDir()) + offlineInfoCmdMock.On("Output").Return(infoResponseBytes, nil) + offlineInfoCmdMock.On("Wait").Return(nil) + // Replace system calls with a mock + scMock := &mockSystemCaller{} + defer scMock.AssertExpectations(t) + scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) + scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "offline-info", + ).Return(offlineInfoCmdMock) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "run", + "--metadata-output-stream", + "fd:3", + ).Return(cmdMock) + // removeAll not called + runner.systemCaller = scMock + + assert.NoError(t, runner.runFrom(100, "hash")) assert.NoError(t, runner.close()) +} + +func TestRunFromUseDBLedgersNotMatch(t *testing.T) { + captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) + assert.NoError(t, err) + + captiveCoreToml.AddExamplePubnetValidators() + + runner := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/usr/bin/stellar-core", + HistoryArchiveURLs: []string{"http://localhost"}, + Log: log.New(), + Context: context.Background(), + Toml: captiveCoreToml, + StoragePath: "/tmp/captive-core", + UseDB: true, + }) + + newDBCmdMock := simpleCommandMock() + newDBCmdMock.On("Run").Return(nil) + + catchupCmdMock := simpleCommandMock() + catchupCmdMock.On("Run").Return(nil) - // Directory cleaned up on shutdown with error (potentially corrupted files) - _, err = os.Stat(tempDir) - assert.Error(t, err) - assert.Contains(t, err.Error(), "no such file or directory") + cmdMock := simpleCommandMock() + cmdMock.On("Wait").Return(nil) + + offlineInfoCmdMock := simpleCommandMock() + infoResponse := stellarcore.InfoResponse{} + infoResponse.Info.Ledger.Num = 101 // runner is one ledger behind + infoResponseBytes, err := json.Marshal(infoResponse) + assert.NoError(t, err) + offlineInfoCmdMock.On("Output").Return(infoResponseBytes, nil) + offlineInfoCmdMock.On("Wait").Return(nil) + + // Replace system calls with a mock + scMock := &mockSystemCaller{} + defer scMock.AssertExpectations(t) + // Storage dir is removed because ledgers do not match + scMock.On("removeAll", mock.Anything).Return(nil) + scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) + scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "offline-info", + ).Return(offlineInfoCmdMock) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "new-db", + ).Return(newDBCmdMock) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "catchup", + "99/0", + ).Return(catchupCmdMock) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "run", + "--metadata-output-stream", + "fd:3", + ).Return(cmdMock) + runner.systemCaller = scMock + + assert.NoError(t, runner.runFrom(100, "hash")) + assert.NoError(t, runner.close()) } diff --git a/ingest/ledgerbackend/stellar_core_runner_windows.go b/ingest/ledgerbackend/stellar_core_runner_windows.go index 80932aaf50..f942a44971 100644 --- a/ingest/ledgerbackend/stellar_core_runner_windows.go +++ b/ingest/ledgerbackend/stellar_core_runner_windows.go @@ -16,7 +16,7 @@ func (c *stellarCoreRunner) getPipeName() string { return fmt.Sprintf(`\\.\pipe\%s`, c.nonce) } -func (c *stellarCoreRunner) start(cmd *exec.Cmd) (pipe, error) { +func (c *stellarCoreRunner) start(cmd cmd) (pipe, error) { // First set up the server pipe. listener, err := winio.ListenPipe(c.getPipeName(), nil) if err != nil { diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 5626ec2dbb..a4fbbea460 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -6,9 +6,8 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ## Pending - Run postgres autovacuum on `history_trades_60000` table more frequently ([4412](https://github.com/stellar/go/pull/4412)). - - Added indexes by id for claimable balance and liquidity pool id's in the respective tx/ops tables ([4455](https://github.com/stellar/go/pull/4477)) - +- Improve restart time of Captive-Core when started with `--captive-core-use-db` flag. The solution does not work on Windows. ([4471)](https://github.com/stellar/go/pull/4471)) ## 2.18.1 * Enabled txsub system to work if/when underlying horizon db connection is read only. ([4418](https://github.com/stellar/go/pull/4418))