Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Restart captive core when a new version of core is detected on disk #3687

Merged
merged 5 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions ingest/ledgerbackend/file_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package ledgerbackend

import (
"os"
"sync"
"time"

"github.com/pkg/errors"

"github.com/stellar/go/support/log"
)

type fileWatcher struct {
pathToFile string
duration time.Duration
onChange func()
exit <-chan struct{}
log *log.Entry
stat func(string) (os.FileInfo, error)
lastModTime time.Time
}

func newFileWatcher(runner *stellarCoreRunner) (*fileWatcher, error) {
return newFileWatcherWithOptions(runner, os.Stat, 10*time.Second)
}

func newFileWatcherWithOptions(
runner *stellarCoreRunner,
stat func(string) (os.FileInfo, error),
tickerDuration time.Duration,
) (*fileWatcher, error) {
info, err := stat(runner.executablePath)
if err != nil {
return nil, errors.Wrap(err, "could not stat captive core binary")
}

once := &sync.Once{}
return &fileWatcher{
pathToFile: runner.executablePath,
duration: tickerDuration,
onChange: func() {
once.Do(func() {
runner.log.Warnf("detected new version of captive core binary %s , aborting session.", runner.executablePath)
if err := runner.close(); err != nil {
runner.log.Warnf("could not close captive core %v", err)
}
})
},
exit: runner.ctx.Done(),
log: runner.log,
stat: stat,
lastModTime: info.ModTime(),
}, nil
}

func (f *fileWatcher) loop() {
ticker := time.NewTicker(f.duration)

for {
select {
case <-f.exit:
ticker.Stop()
return
case <-ticker.C:
if f.fileChanged() {
f.onChange()
}
}
}
}

func (f *fileWatcher) fileChanged() bool {
info, err := f.stat(f.pathToFile)
if err != nil {
f.log.Warnf("could not stat %s: %v", f.pathToFile, err)
return false
}

if modTime := info.ModTime(); !f.lastModTime.Equal(modTime) {
f.log.Infof(
"detected update to %s. previous file timestamp was %v current timestamp is %v",
f.pathToFile,
f.lastModTime,
modTime,
)
return true
}
return false
}
207 changes: 207 additions & 0 deletions ingest/ledgerbackend/file_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package ledgerbackend

import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/stellar/go/support/log"

"github.com/stretchr/testify/assert"
)

type mockFile struct {
modTime time.Time
}

func (mockFile) Name() string {
return ""
}

func (mockFile) Size() int64 {
return 0
}

func (mockFile) Mode() os.FileMode {
return 0
}

func (mockFile) IsDir() bool {
return false
}

func (mockFile) Sys() interface{} {
return nil
}
func (m mockFile) ModTime() time.Time {
return m.modTime
}

type mockStat struct {
sync.Mutex
t *testing.T
expectedPath string
modTime time.Time
err error
callCount int
}

func (m *mockStat) setResponse(modTime time.Time, err error) {
m.Lock()
defer m.Unlock()
m.modTime = modTime
m.err = err
}

func (m *mockStat) getCallCount() int {
m.Lock()
defer m.Unlock()
return m.callCount
}

func (m *mockStat) stat(fp string) (os.FileInfo, error) {
m.Lock()
defer m.Unlock()
m.callCount++
assert.Equal(m.t, m.expectedPath, fp)
//defer m.onCall(m)
return mockFile{m.modTime}, m.err
}

func createFWFixtures(t *testing.T) (*mockStat, *stellarCoreRunner, *fileWatcher) {
ms := &mockStat{
modTime: time.Now(),
expectedPath: "/some/path",
t: t,
}

captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

runner, err := newStellarCoreRunner(CaptiveCoreConfig{
BinaryPath: "/some/path",
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)

fw, err := newFileWatcherWithOptions(runner, ms.stat, time.Millisecond)
assert.NoError(t, err)
assert.Equal(t, 1, ms.getCallCount())

return ms, runner, fw
}

func TestNewFileWatcherError(t *testing.T) {
ms := &mockStat{
modTime: time.Now(),
expectedPath: "/some/path",
t: t,
}
ms.setResponse(time.Time{}, fmt.Errorf("test error"))

captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

runner, err := newStellarCoreRunner(CaptiveCoreConfig{
BinaryPath: "/some/path",
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)

_, err = newFileWatcherWithOptions(runner, ms.stat, time.Millisecond)
assert.EqualError(t, err, "could not stat captive core binary: test error")
assert.Equal(t, 1, ms.getCallCount())
}

func TestFileChanged(t *testing.T) {
ms, _, fw := createFWFixtures(t)

modTime := ms.modTime

assert.False(t, fw.fileChanged())
assert.False(t, fw.fileChanged())
assert.Equal(t, 3, ms.getCallCount())

ms.setResponse(time.Time{}, fmt.Errorf("test error"))
assert.False(t, fw.fileChanged())
assert.Equal(t, 4, ms.getCallCount())

ms.setResponse(modTime, nil)
assert.False(t, fw.fileChanged())
assert.Equal(t, 5, ms.getCallCount())

ms.setResponse(time.Now().Add(time.Hour), nil)
assert.True(t, fw.fileChanged())
assert.Equal(t, 6, ms.getCallCount())
}

func TestCloseRunnerBeforeFileWatcherLoop(t *testing.T) {
_, runner, fw := createFWFixtures(t)

assert.NoError(t, runner.close())

// loop should exit almost immediately because the runner is closed
fw.loop()
}

func TestCloseRunnerDuringFileWatcherLoop(t *testing.T) {
ms, runner, fw := createFWFixtures(t)
done := make(chan struct{})
go func() {
fw.loop()
close(done)
}()

// fw.loop will repeatedly check if the file has changed by calling stat.
// This test ensures that closing the runner will exit fw.loop so that the goroutine is not leaked.

closedRunner := false
for {
select {
case <-done:
assert.True(t, closedRunner)
return
default:
if ms.getCallCount() > 20 {
runner.close()
closedRunner = true
}
}
}
}

func TestFileChangesTriggerRunnerClose(t *testing.T) {
ms, runner, fw := createFWFixtures(t)
done := make(chan struct{})
go func() {
fw.loop()
close(done)
}()

// fw.loop will repeatedly check if the file has changed by calling stat.
// This test ensures that modifying the file will trigger the closing of the runner.
modifiedFile := false
for {
select {
case <-done:
assert.True(t, modifiedFile)
// the runner is closed if and only if runner.ctx.Err() is non-nil
assert.Error(t, runner.ctx.Err())
return
default:
if ms.getCallCount() > 20 {
ms.setResponse(time.Now().Add(time.Hour), nil)
modifiedFile = true
}
}
}
}
14 changes: 12 additions & 2 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,18 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
return errors.Wrap(err, "error waiting for `stellar-core new-db` subprocess")
}

binaryWatcher, err := newFileWatcher(r)
if err != nil {
return errors.Wrap(err, "could not create captive core binary watcher")
}

tamirms marked this conversation as resolved.
Show resolved Hide resolved
rangeArg := fmt.Sprintf("%d/%d", to, to-from+1)
cmd := r.createCmd(
"catchup", rangeArg,
"--metadata-output-stream", r.getPipeName(),
"--in-memory",
)

var err error
r.pipe, err = r.start(cmd)
if err != nil {
r.closeLogLineWriters(cmd)
Expand All @@ -266,6 +270,7 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
r.started = true
r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader)
go r.ledgerBuffer.start()
go binaryWatcher.loop()
r.wg.Add(1)
go r.handleExit(cmd)

Expand All @@ -286,6 +291,11 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return errors.New("runner already started")
}

binaryWatcher, err := newFileWatcher(r)
if err != nil {
return errors.Wrap(err, "could not create captive core binary watcher")
}

cmd := r.createCmd(
"run",
"--in-memory",
Expand All @@ -294,7 +304,6 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
"--metadata-output-stream", r.getPipeName(),
)

var err error
r.pipe, err = r.start(cmd)
if err != nil {
r.closeLogLineWriters(cmd)
Expand All @@ -304,6 +313,7 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
r.started = true
r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader)
go r.ledgerBuffer.start()
go binaryWatcher.loop()
r.wg.Add(1)
go r.handleExit(cmd)

Expand Down
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).

* Performance improvement: Captive Core now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.1.0](#v2.1.0) rather than generating a one-time temporary sub-directory ([](https://github.com/stellar/go/pull/XXXX)). **This feature requires Stellar-Core version 17.1 or later.**
tamirms marked this conversation as resolved.
Show resolved Hide resolved

* Horizon monitors captive-core on disk, and restarts captive-core if it detects a change (e.g. a more recent file timestamp for the captive-core binary). ([3687](https://github.com/stellar/go/pull/3687)).
tamirms marked this conversation as resolved.
Show resolved Hide resolved

## v2.4.1

Expand Down