Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Let filewatcher check for binary hash to detect captive core updates. #4050

Merged
merged 2 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ All notable changes to this project will be documented in this file. This projec

## Unreleased

* Let filewatcher use binary hash instead of timestap to detect core version update. [4050](https://github.com/stellar/go/pull/4050)

### New Features
* **Performance improvement**: the Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later.

Expand Down
69 changes: 46 additions & 23 deletions ingest/ledgerbackend/file_watcher.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,60 @@
package ledgerbackend

import (
"bytes"
"crypto/sha1"
"io"
"os"
"sync"
"time"

"github.com/pkg/errors"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
)

type hash []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think this type alias provides much benefit since we end up using bytes.Equal to compare hashes anyways

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I slightly prefer to keep the type definition though, so I defined a Equals method for type hash that just calls byte.Equals.


func (h hash) Equals(other hash) bool {
return bytes.Equal(h, other)
}

type fileWatcher struct {
pathToFile string
duration time.Duration
onChange func()
exit <-chan struct{}
log *log.Entry
stat func(string) (os.FileInfo, error)
lastModTime time.Time
pathToFile string
duration time.Duration
onChange func()
exit <-chan struct{}
log *log.Entry
hashFile func(string) (hash, error)
lastHash hash
}

func hashFile(filename string) (hash, error) {
f, err := os.Open(filename)
if err != nil {
return hash{}, errors.Wrapf(err, "unable to open %v", f)
}
defer f.Close()

h := sha1.New()
if _, err := io.Copy(h, f); err != nil {
return hash{}, errors.Wrapf(err, "unable to copy %v into buffer", f)
}

return h.Sum(nil), nil
}

func newFileWatcher(runner *stellarCoreRunner) (*fileWatcher, error) {
return newFileWatcherWithOptions(runner, os.Stat, 10*time.Second)
return newFileWatcherWithOptions(runner, hashFile, 10*time.Second)
}

func newFileWatcherWithOptions(
runner *stellarCoreRunner,
stat func(string) (os.FileInfo, error),
hashFile func(string) (hash, error),
tickerDuration time.Duration,
) (*fileWatcher, error) {
info, err := stat(runner.executablePath)
hashResult, err := hashFile(runner.executablePath)
if err != nil {
return nil, errors.Wrap(err, "could not stat captive core binary")
return nil, errors.Wrap(err, "could not hash captive core binary")
}

once := &sync.Once{}
Expand All @@ -46,10 +69,10 @@ func newFileWatcherWithOptions(
}
})
},
exit: runner.ctx.Done(),
log: runner.log,
stat: stat,
lastModTime: info.ModTime(),
exit: runner.ctx.Done(),
log: runner.log,
hashFile: hashFile,
lastHash: hashResult,
}, nil
}

Expand All @@ -70,18 +93,18 @@ func (f *fileWatcher) loop() {
}

func (f *fileWatcher) fileChanged() bool {
info, err := f.stat(f.pathToFile)
hashResult, err := f.hashFile(f.pathToFile)
if err != nil {
f.log.Warnf("could not stat %s: %v", f.pathToFile, err)
f.log.Warnf("could not hash contents of %s: %v", f.pathToFile, err)
return false
}

if modTime := info.ModTime(); !f.lastModTime.Equal(modTime) {
if !f.lastHash.Equals(hashResult) {
f.log.Infof(
"detected update to %s. previous file timestamp was %v current timestamp is %v",
"detected update to %s. previous file hash was %v current hash is %v",
f.pathToFile,
f.lastModTime,
modTime,
f.lastHash,
hashResult,
)
return true
}
Expand Down
75 changes: 22 additions & 53 deletions ingest/ledgerbackend/file_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ledgerbackend
import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"
Expand All @@ -13,67 +12,39 @@ import (
"github.com/stretchr/testify/assert"
)

type mockFile struct {
modTime time.Time
}

func (mockFile) Name() string {
return ""
}

func (mockFile) Size() int64 {
return 0
}

func (mockFile) Mode() os.FileMode {
return 0
}

func (mockFile) IsDir() bool {
return false
}

func (mockFile) Sys() interface{} {
return nil
}
func (m mockFile) ModTime() time.Time {
return m.modTime
}

type mockStat struct {
type mockHash struct {
sync.Mutex
t *testing.T
expectedPath string
modTime time.Time
hashResult hash
err error
callCount int
}

func (m *mockStat) setResponse(modTime time.Time, err error) {
func (m *mockHash) setResponse(hashResult hash, err error) {
m.Lock()
defer m.Unlock()
m.modTime = modTime
m.hashResult = hashResult
m.err = err
}

func (m *mockStat) getCallCount() int {
func (m *mockHash) getCallCount() int {
m.Lock()
defer m.Unlock()
return m.callCount
}

func (m *mockStat) stat(fp string) (os.FileInfo, error) {
func (m *mockHash) hashFile(fp string) (hash, error) {
m.Lock()
defer m.Unlock()
m.callCount++
assert.Equal(m.t, m.expectedPath, fp)
//defer m.onCall(m)
return mockFile{m.modTime}, m.err
return m.hashResult, m.err
}

func createFWFixtures(t *testing.T) (*mockStat, *stellarCoreRunner, *fileWatcher) {
ms := &mockStat{
modTime: time.Now(),
func createFWFixtures(t *testing.T) (*mockHash, *stellarCoreRunner, *fileWatcher) {
ms := &mockHash{
hashResult: hash{},
expectedPath: "/some/path",
t: t,
}
Expand All @@ -90,20 +61,20 @@ func createFWFixtures(t *testing.T) (*mockStat, *stellarCoreRunner, *fileWatcher
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)

fw, err := newFileWatcherWithOptions(runner, ms.stat, time.Millisecond)
fw, err := newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond)
assert.NoError(t, err)
assert.Equal(t, 1, ms.getCallCount())

return ms, runner, fw
}

func TestNewFileWatcherError(t *testing.T) {
ms := &mockStat{
modTime: time.Now(),
ms := &mockHash{
hashResult: hash{},
expectedPath: "/some/path",
t: t,
}
ms.setResponse(time.Time{}, fmt.Errorf("test error"))
ms.setResponse(hash{}, fmt.Errorf("test error"))

captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)
Expand All @@ -117,29 +88,27 @@ func TestNewFileWatcherError(t *testing.T) {
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)

_, err = newFileWatcherWithOptions(runner, ms.stat, time.Millisecond)
assert.EqualError(t, err, "could not stat captive core binary: test error")
_, err = newFileWatcherWithOptions(runner, ms.hashFile, time.Millisecond)
assert.EqualError(t, err, "could not hash captive core binary: test error")
assert.Equal(t, 1, ms.getCallCount())
}

func TestFileChanged(t *testing.T) {
ms, _, fw := createFWFixtures(t)

modTime := ms.modTime

assert.False(t, fw.fileChanged())
assert.False(t, fw.fileChanged())
assert.Equal(t, 3, ms.getCallCount())

ms.setResponse(time.Time{}, fmt.Errorf("test error"))
ms.setResponse(hash{}, fmt.Errorf("test error"))
assert.False(t, fw.fileChanged())
assert.Equal(t, 4, ms.getCallCount())

ms.setResponse(modTime, nil)
ms.setResponse(ms.hashResult, nil)
assert.False(t, fw.fileChanged())
assert.Equal(t, 5, ms.getCallCount())

ms.setResponse(time.Now().Add(time.Hour), nil)
ms.setResponse(hash{1}, nil)
assert.True(t, fw.fileChanged())
assert.Equal(t, 6, ms.getCallCount())
}
Expand All @@ -161,7 +130,7 @@ func TestCloseRunnerDuringFileWatcherLoop(t *testing.T) {
close(done)
}()

// fw.loop will repeatedly check if the file has changed by calling stat.
// fw.loop will repeatedly check if the file has changed by calling hash.
// This test ensures that closing the runner will exit fw.loop so that the goroutine is not leaked.

closedRunner := false
Expand All @@ -187,7 +156,7 @@ func TestFileChangesTriggerRunnerClose(t *testing.T) {
close(done)
}()

// fw.loop will repeatedly check if the file has changed by calling stat.
// fw.loop will repeatedly check if the file has changed by calling hash
// This test ensures that modifying the file will trigger the closing of the runner.
modifiedFile := false
for {
Expand All @@ -199,7 +168,7 @@ func TestFileChangesTriggerRunnerClose(t *testing.T) {
return
default:
if ms.getCallCount() > 20 {
ms.setResponse(time.Now().Add(time.Hour), nil)
ms.setResponse(hash{1}, nil)
modifiedFile = true
}
}
Expand Down