From 85387b610f62e263c2484c1834c13e1380284341 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 11 Jun 2021 16:00:35 +0100 Subject: [PATCH 1/4] Restart captive core when a new version of core is detected on disk --- ingest/ledgerbackend/file_watcher.go | 89 +++++++++ ingest/ledgerbackend/file_watcher_test.go | 207 ++++++++++++++++++++ ingest/ledgerbackend/stellar_core_runner.go | 14 +- services/horizon/CHANGELOG.md | 1 + 4 files changed, 309 insertions(+), 2 deletions(-) create mode 100644 ingest/ledgerbackend/file_watcher.go create mode 100644 ingest/ledgerbackend/file_watcher_test.go diff --git a/ingest/ledgerbackend/file_watcher.go b/ingest/ledgerbackend/file_watcher.go new file mode 100644 index 0000000000..af68596e3d --- /dev/null +++ b/ingest/ledgerbackend/file_watcher.go @@ -0,0 +1,89 @@ +package ledgerbackend + +import ( + "os" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/stellar/go/support/log" +) + +type fileWatcher struct { + pathToFile string + duration time.Duration + onChange func() + exit <-chan struct{} + log *log.Entry + stat func(string) (os.FileInfo, error) + lastModTime time.Time +} + +func newFileWatcher(runner *stellarCoreRunner) (*fileWatcher, error) { + return newFileWatcherWithOptions(runner, os.Stat, 10*time.Second) +} + +func newFileWatcherWithOptions( + runner *stellarCoreRunner, + stat func(string) (os.FileInfo, error), + tickerDuration time.Duration, +) (*fileWatcher, error) { + info, err := stat(runner.executablePath) + if err != nil { + return nil, errors.Wrap(err, "could not stat captive core binary") + } + + once := &sync.Once{} + return &fileWatcher{ + pathToFile: runner.executablePath, + duration: tickerDuration, + onChange: func() { + once.Do(func() { + runner.log.Warnf("detected new version of captive core binary %s , aborting session.", runner.executablePath) + if err := runner.close(); err != nil { + runner.log.Warnf("could not close captive core %v", err) + } + }) + }, + exit: runner.ctx.Done(), + log: runner.log, + stat: stat, + lastModTime: info.ModTime(), + }, nil +} + +func (f *fileWatcher) loop() { + ticker := time.NewTicker(f.duration) + + for { + select { + case <-f.exit: + ticker.Stop() + return + case <-ticker.C: + if f.fileChanged() { + f.onChange() + } + } + } +} + +func (f *fileWatcher) fileChanged() bool { + info, err := f.stat(f.pathToFile) + if err != nil { + f.log.Warnf("could not stat %s: %v", f.pathToFile, err) + return false + } + + if modTime := info.ModTime(); !f.lastModTime.Equal(modTime) { + f.log.Infof( + "detected update to %s. previous file timestamp was %v current timestamp is %v", + f.pathToFile, + f.lastModTime, + modTime, + ) + return true + } + return false +} diff --git a/ingest/ledgerbackend/file_watcher_test.go b/ingest/ledgerbackend/file_watcher_test.go new file mode 100644 index 0000000000..a2720c6342 --- /dev/null +++ b/ingest/ledgerbackend/file_watcher_test.go @@ -0,0 +1,207 @@ +package ledgerbackend + +import ( + "context" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/stellar/go/support/log" + + "github.com/stretchr/testify/assert" +) + +type mockFile struct { + modTime time.Time +} + +func (mockFile) Name() string { + return "" +} + +func (mockFile) Size() int64 { + return 0 +} + +func (mockFile) Mode() os.FileMode { + return 0 +} + +func (mockFile) IsDir() bool { + return false +} + +func (mockFile) Sys() interface{} { + return nil +} +func (m mockFile) ModTime() time.Time { + return m.modTime +} + +type mockStat struct { + sync.Mutex + t *testing.T + expectedPath string + modTime time.Time + err error + callCount int +} + +func (m *mockStat) setResponse(modTime time.Time, err error) { + m.Lock() + defer m.Unlock() + m.modTime = modTime + m.err = err +} + +func (m *mockStat) getCallCount() int { + m.Lock() + defer m.Unlock() + return m.callCount +} + +func (m *mockStat) stat(fp string) (os.FileInfo, error) { + m.Lock() + defer m.Unlock() + m.callCount++ + assert.Equal(m.t, m.expectedPath, fp) + //defer m.onCall(m) + return mockFile{m.modTime}, m.err +} + +func createFWFixtures(t *testing.T) (*mockStat, *stellarCoreRunner, *fileWatcher) { + ms := &mockStat{ + modTime: time.Now(), + expectedPath: "/some/path", + t: t, + } + + captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) + assert.NoError(t, err) + + runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/some/path", + HistoryArchiveURLs: []string{"http://localhost"}, + Log: log.New(), + Context: context.Background(), + Toml: captiveCoreToml, + }, stellarCoreRunnerModeOffline) + assert.NoError(t, err) + + fw, err := newFileWatcherWithOptions(runner, ms.stat, time.Millisecond) + assert.NoError(t, err) + assert.Equal(t, 1, ms.getCallCount()) + + return ms, runner, fw +} + +func TestNewFileWatcherError(t *testing.T) { + ms := &mockStat{ + modTime: time.Now(), + expectedPath: "/some/path", + t: t, + } + ms.setResponse(time.Time{}, fmt.Errorf("test error")) + + captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) + assert.NoError(t, err) + + runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/some/path", + HistoryArchiveURLs: []string{"http://localhost"}, + Log: log.New(), + Context: context.Background(), + Toml: captiveCoreToml, + }, stellarCoreRunnerModeOffline) + assert.NoError(t, err) + + _, err = newFileWatcherWithOptions(runner, ms.stat, time.Millisecond) + assert.EqualError(t, err, "could not stat captive core binary: test error") + assert.Equal(t, 1, ms.getCallCount()) +} + +func TestFileChanged(t *testing.T) { + ms, _, fw := createFWFixtures(t) + + modTime := ms.modTime + + assert.False(t, fw.fileChanged()) + assert.False(t, fw.fileChanged()) + assert.Equal(t, 3, ms.getCallCount()) + + ms.setResponse(time.Time{}, fmt.Errorf("test error")) + assert.False(t, fw.fileChanged()) + assert.Equal(t, 4, ms.getCallCount()) + + ms.setResponse(modTime, nil) + assert.False(t, fw.fileChanged()) + assert.Equal(t, 5, ms.getCallCount()) + + ms.setResponse(time.Now().Add(time.Hour), nil) + assert.True(t, fw.fileChanged()) + assert.Equal(t, 6, ms.getCallCount()) +} + +func TestCloseRunnerBeforeFileWatcherLoop(t *testing.T) { + _, runner, fw := createFWFixtures(t) + + assert.NoError(t, runner.close()) + + // loop should exit almost immediately because the runner is closed + fw.loop() +} + +func TestCloseRunnerDuringFileWatcherLoop(t *testing.T) { + ms, runner, fw := createFWFixtures(t) + done := make(chan struct{}) + go func() { + fw.loop() + close(done) + }() + + // fw.loop will repeatedly check if the file has changed by calling stat. + // This test ensures that closing the runner will exit fw.loop so that the goroutine is not leaked. + + closedRunner := false + for { + select { + case <-done: + assert.True(t, closedRunner) + return + default: + if ms.getCallCount() > 20 { + runner.close() + closedRunner = true + } + } + } +} + +func TestFileChangesTriggerRunnerClose(t *testing.T) { + ms, runner, fw := createFWFixtures(t) + done := make(chan struct{}) + go func() { + fw.loop() + close(done) + }() + + // fw.loop will repeatedly check if the file has changed by calling stat. + // This test ensures that modifying the file will trigger the closing of the runner. + modifiedFile := false + for { + select { + case <-done: + assert.True(t, modifiedFile) + // the runner is closed if and only if runner.ctx.Err() is non-nil + assert.Error(t, runner.ctx.Err()) + return + default: + if ms.getCallCount() > 20 { + ms.setResponse(time.Now().Add(time.Hour), nil) + modifiedFile = true + } + } + } +} diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 5f45395686..e14b987c20 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -249,6 +249,11 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { return errors.Wrap(err, "error waiting for `stellar-core new-db` subprocess") } + binaryWatcher, err := newFileWatcher(r) + if err != nil { + return errors.Wrap(err, "could not create captive core binary watcher") + } + rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) cmd := r.createCmd( "catchup", rangeArg, @@ -256,7 +261,6 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { "--in-memory", ) - var err error r.pipe, err = r.start(cmd) if err != nil { r.closeLogLineWriters(cmd) @@ -266,6 +270,7 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { r.started = true r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader) go r.ledgerBuffer.start() + go binaryWatcher.loop() r.wg.Add(1) go r.handleExit(cmd) @@ -286,6 +291,11 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { return errors.New("runner already started") } + binaryWatcher, err := newFileWatcher(r) + if err != nil { + return errors.Wrap(err, "could not create captive core binary watcher") + } + cmd := r.createCmd( "run", "--in-memory", @@ -294,7 +304,6 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { "--metadata-output-stream", r.getPipeName(), ) - var err error r.pipe, err = r.start(cmd) if err != nil { r.closeLogLineWriters(cmd) @@ -304,6 +313,7 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { r.started = true r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader) go r.ledgerBuffer.start() + go binaryWatcher.loop() r.wg.Add(1) go r.handleExit(cmd) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 4e5ad55536..d8b52e3d5e 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -11,6 +11,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). * Performance improvement: Captive Core now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.1.0](#v2.1.0) rather than generating a one-time temporary sub-directory ([](https://github.com/stellar/go/pull/XXXX)). **This feature requires Stellar-Core version 17.1 or later.** +* Horizon monitors captive-core on disk, and restarts captive-core if it detects a change (e.g. a more recent file timestamp for the captive-core binary). ([3687](https://github.com/stellar/go/pull/3687)). ## v2.4.1 From 6ac423ca8977cb049956ab3fef6f03b677ff489d Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 14 Jun 2021 16:04:14 +0100 Subject: [PATCH 2/4] Update services/horizon/CHANGELOG.md Co-authored-by: George --- services/horizon/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index d8b52e3d5e..7191564d50 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -11,7 +11,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). * Performance improvement: Captive Core now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.1.0](#v2.1.0) rather than generating a one-time temporary sub-directory ([](https://github.com/stellar/go/pull/XXXX)). **This feature requires Stellar-Core version 17.1 or later.** -* Horizon monitors captive-core on disk, and restarts captive-core if it detects a change (e.g. a more recent file timestamp for the captive-core binary). ([3687](https://github.com/stellar/go/pull/3687)). +* Horizon now monitors the Stellar Core binary on disk (pointed to by `--stellar-core-binary-path`/`STELLAR_CORE_BINARY_PATH`) and restarts its Captive Core subprocess if it detects changes (i.e a more recent file timestamp for the Stellar Core binary) ([3687](https://github.com/stellar/go/pull/3687)). ## v2.4.1 From 64cb6e0a7d77ff7075784347c090295784b2deee Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 14 Jun 2021 16:05:40 +0100 Subject: [PATCH 3/4] fix pr link --- services/horizon/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 7191564d50..c30301f711 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -9,7 +9,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ### New Features * Add new command `horizon db detect-gaps`, which detects ingestion gaps in the database. The command prints out the `db reingest` commands to run in order to fill the gaps found. -* Performance improvement: Captive Core now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.1.0](#v2.1.0) rather than generating a one-time temporary sub-directory ([](https://github.com/stellar/go/pull/XXXX)). **This feature requires Stellar-Core version 17.1 or later.** +* Performance improvement: Captive Core now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.1.0](#v2.1.0) rather than generating a one-time temporary sub-directory ([3670](https://github.com/stellar/go/pull/3670)). **This feature requires Stellar-Core version 17.1 or later.** * Horizon now monitors the Stellar Core binary on disk (pointed to by `--stellar-core-binary-path`/`STELLAR_CORE_BINARY_PATH`) and restarts its Captive Core subprocess if it detects changes (i.e a more recent file timestamp for the Stellar Core binary) ([3687](https://github.com/stellar/go/pull/3687)). From 45131a28d0632cf59bf69c4f6dfe76d9a45d50fd Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 14 Jun 2021 16:11:05 +0100 Subject: [PATCH 4/4] Make binarywatcher optional in case of error --- ingest/ledgerbackend/stellar_core_runner.go | 28 ++++++++++++--------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index e14b987c20..5baa980f80 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -249,11 +249,6 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { return errors.Wrap(err, "error waiting for `stellar-core new-db` subprocess") } - binaryWatcher, err := newFileWatcher(r) - if err != nil { - return errors.Wrap(err, "could not create captive core binary watcher") - } - rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) cmd := r.createCmd( "catchup", rangeArg, @@ -261,6 +256,7 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { "--in-memory", ) + var err error r.pipe, err = r.start(cmd) if err != nil { r.closeLogLineWriters(cmd) @@ -270,7 +266,13 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { r.started = true r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader) go r.ledgerBuffer.start() - go binaryWatcher.loop() + + if binaryWatcher, err := newFileWatcher(r); err != nil { + r.log.Warnf("could not create captive core binary watcher: %v", err) + } else { + go binaryWatcher.loop() + } + r.wg.Add(1) go r.handleExit(cmd) @@ -291,11 +293,6 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { return errors.New("runner already started") } - binaryWatcher, err := newFileWatcher(r) - if err != nil { - return errors.Wrap(err, "could not create captive core binary watcher") - } - cmd := r.createCmd( "run", "--in-memory", @@ -304,6 +301,7 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { "--metadata-output-stream", r.getPipeName(), ) + var err error r.pipe, err = r.start(cmd) if err != nil { r.closeLogLineWriters(cmd) @@ -313,7 +311,13 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { r.started = true r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader) go r.ledgerBuffer.start() - go binaryWatcher.loop() + + if binaryWatcher, err := newFileWatcher(r); err != nil { + r.log.Warnf("could not create captive core binary watcher: %v", err) + } else { + go binaryWatcher.loop() + } + r.wg.Add(1) go r.handleExit(cmd)