Skip to content

Commit

Permalink
Migrate all tests from test_harvester.py
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Mar 5, 2021
1 parent 8c75f1a commit e18e32f
Show file tree
Hide file tree
Showing 15 changed files with 514 additions and 38 deletions.
10 changes: 5 additions & 5 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Config stores the options of a file stream.
type config struct {
readerConfig
ReaderConfig `config:",inline"`

Paths []string `config:"paths"`
Close closerConfig `config:"close"`
Expand Down Expand Up @@ -59,7 +59,7 @@ type stateChangeCloserConfig struct {
Renamed bool `config:"renamed"`
}

type readerConfig struct {
type ReaderConfig struct {
Backoff backoffConfig `config:"backoff"`
BufferSize int `config:"buffer_size"`
Encoding string `config:"encoding"`
Expand All @@ -79,7 +79,7 @@ type backoffConfig struct {

func defaultConfig() config {
return config{
readerConfig: defaultReaderConfig(),
ReaderConfig: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
Expand All @@ -104,8 +104,8 @@ func defaultCloserConfig() closerConfig {
}
}

func defaultReaderConfig() readerConfig {
return readerConfig{
func defaultReaderConfig() ReaderConfig {
return ReaderConfig{
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 10 * time.Second,
Expand Down
51 changes: 51 additions & 0 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,40 @@ func (e *inputTestingEnvironment) mustRemoveFile(filename string) {
}
}

func (e *inputTestingEnvironment) mustSymlink(filename, symlinkname string) {
err := os.Symlink(e.abspath(filename), e.abspath(symlinkname))
if err != nil {
e.t.Fatalf("failed to create symlink to file '%s': %+v", filename, err)
}
}

func (e *inputTestingEnvironment) mustTruncateFile(filename string, size int64) {
path := e.abspath(filename)
err := os.Truncate(path, size)
if err != nil {
e.t.Fatalf("failed to truncate file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) abspath(filename string) string {
return filepath.Join(e.workingDir, filename)
}

func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
inputStore, _ := e.stateStore.Access()

actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
actual += 1
return true, nil
})
if err != nil {
e.t.Fatalf("error while iterating through registry: %+v", err)
}

require.Equal(e.t, actual, expectedCount)
}

// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expectedOffset int) {
filepath := e.abspath(filename)
Expand Down Expand Up @@ -205,12 +235,23 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
}
}

// waitUntilHarvesterIsDone detects Harvester stop by checking if the last client has been closed
// as when a Harvester stops the client is closed.
func (e *inputTestingEnvironment) waitUntilHarvesterIsDone() {
for !e.pipeline.clients[len(e.pipeline.clients)-1].closed {
time.Sleep(10 * time.Millisecond)
}
}

// requireEventReceived requires that the list of messages has made it into the output.
func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
foundEvents := make([]bool, len(events))
checkedEventCount := 0
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
if len(events) == checkedEventCount {
e.t.Fatalf("not enough expected elements")
}
message := evt.Fields["message"].(string)
if message == events[checkedEventCount] {
foundEvents[checkedEventCount] = true
Expand All @@ -229,6 +270,16 @@ func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
require.Equal(e.t, 0, len(missingEvents), "following events are missing: %+v", missingEvents)
}

func (e *inputTestingEnvironment) getOutputMessages() []string {
messages := make([]string, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
messages = append(messages, evt.Fields["message"].(string))
}
}
return messages
}

type testInputStore struct {
registry *statestore.Registry
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newFileReader(
log *logp.Logger,
canceler input.Canceler,
f *os.File,
config readerConfig,
config ReaderConfig,
closerConfig closerConfig,
) (*logFile, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/filestream/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestLogFileTimedClosing(t *testing.T) {
logp.L(),
context.TODO(),
f,
readerConfig{},
ReaderConfig{},
closerConfig{
OnStateChange: stateChangeCloserConfig{
CheckInterval: 1 * time.Second,
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestLogFileTruncated(t *testing.T) {
defer f.Close()
defer os.Remove(f.Name())

reader, err := newFileReader(logp.L(), context.TODO(), f, readerConfig{}, closerConfig{})
reader, err := newFileReader(logp.L(), context.TODO(), f, ReaderConfig{}, closerConfig{})
if err != nil {
t.Fatalf("error while creating logReader: %+v", err)
}
Expand Down
33 changes: 23 additions & 10 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type fileWatcherConfig struct {
// Interval is the time between two scans.
Interval time.Duration `config:"check_interval"`
// Scanner is the configuration of the scanner.
Scanner fileScannerConfig `config:",inline"`
Scanner FileScannerConfig `config:",inline"`
}

// fileWatcher gets the list of files from a FSWatcher and creates events by
Expand Down Expand Up @@ -142,10 +142,18 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
}

if prevInfo.ModTime() != info.ModTime() {
select {
case <-ctx.Done():
return
case w.events <- writeEvent(path, info):
if prevInfo.Size() > info.Size() {
select {
case <-ctx.Done():
return
case w.events <- truncateEvent(path, info):
}
} else {
select {
case <-ctx.Done():
return
case w.events <- writeEvent(path, info):
}
}
}

Expand Down Expand Up @@ -198,6 +206,10 @@ func writeEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Info: fi}
}

func truncateEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpTruncate, OldPath: path, NewPath: path, Info: fi}
}

func renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Info: fi}
}
Expand All @@ -214,20 +226,21 @@ func (w *fileWatcher) GetFiles() map[string]os.FileInfo {
return w.scanner.GetFiles()
}

type fileScannerConfig struct {
type FileScannerConfig struct {
ExcludedFiles []match.Matcher `config:"exclude_files"`
Symlinks bool `config:"symlinks"`
RecursiveGlob bool `config:"recursive_glob"`
}

func defaultFileScannerConfig() fileScannerConfig {
return fileScannerConfig{
func defaultFileScannerConfig() FileScannerConfig {
return FileScannerConfig{
Symlinks: false,
RecursiveGlob: true,
}
}

func newFileScanner(paths []string, cfg fileScannerConfig) (loginp.FSScanner, error) {
func newFileScanner(paths []string, cfg FileScannerConfig) (loginp.FSScanner, error) {
fmt.Println(cfg.Symlinks)
fs := fileScanner{
paths: paths,
excludedFiles: cfg.ExcludedFiles,
Expand All @@ -247,7 +260,7 @@ func newFileScanner(paths []string, cfg fileScannerConfig) (loginp.FSScanner, er
}

// resolveRecursiveGlobs expands `**` from the globs in multiple patterns
func (s *fileScanner) resolveRecursiveGlobs(c fileScannerConfig) error {
func (s *fileScanner) resolveRecursiveGlobs(c FileScannerConfig) error {
if !c.RecursiveGlob {
s.log.Debug("recursive glob disabled")
return nil
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestFileScanner(t *testing.T) {
test := test

t.Run(name, func(t *testing.T) {
cfg := fileScannerConfig{
cfg := FileScannerConfig{
ExcludedFiles: test.excludedFiles,
Symlinks: test.symlinks,
RecursiveGlob: false,
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/filestream/fswatch_test_non_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestFileScannerSymlinks(t *testing.T) {
test := test

t.Run(name, func(t *testing.T) {
cfg := fileScannerConfig{
cfg := FileScannerConfig{
ExcludedFiles: test.excludedFiles,
Symlinks: true,
RecursiveGlob: false,
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestFileWatcherRenamedFile(t *testing.T) {
t.Fatal(err)
}

cfg := fileScannerConfig{
cfg := FileScannerConfig{
ExcludedFiles: nil,
Symlinks: false,
RecursiveGlob: false,
Expand Down
9 changes: 6 additions & 3 deletions filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ type fileIdentifier interface {
// fileSource implements the Source interface
// It is required to identify and manage file sources.
type fileSource struct {
info os.FileInfo
newPath string
oldPath string
info os.FileInfo
newPath string
oldPath string
truncated bool

name string
identifierGenerator string
Expand Down Expand Up @@ -103,6 +104,7 @@ func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: i.name + identitySep + file.GetOSState(e.Info).String(),
identifierGenerator: i.name,
}
Expand Down Expand Up @@ -140,6 +142,7 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: p.name + identitySep + path,
identifierGenerator: p.name,
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/filestream/identifier_inode_deviceid.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(),
identifierGenerator: i.name,
}
Expand Down
16 changes: 10 additions & 6 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type fileMeta struct {
// filestream is the input for reading from files which
// are actively written by other applications.
type filestream struct {
readerConfig readerConfig
readerConfig ReaderConfig
bufferSize int
tailFile bool // TODO
encodingFactory encoding.EncodingFactory
Expand Down Expand Up @@ -111,7 +111,7 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
}

filestream := &filestream{
readerConfig: config.readerConfig,
readerConfig: config.ReaderConfig,
bufferSize: config.BufferSize,
encodingFactory: encodingFactory,
lineTerminator: config.LineTerminator,
Expand Down Expand Up @@ -173,7 +173,7 @@ func (inp *filestream) Run(

func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
var state state
if c.IsNew() {
if c.IsNew() || s.truncated {
return state
}

Expand Down Expand Up @@ -236,7 +236,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (inp *filestream) openFile(path string, offset int64) (*os.File, error) {
err := inp.checkFileBeforeOpening(path)
err := inp.checkFileBeforeOpening(path, &offset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func (inp *filestream) openFile(path string, offset int64) (*os.File, error) {
return f, nil
}

func (inp *filestream) checkFileBeforeOpening(path string) error {
func (inp *filestream) checkFileBeforeOpening(path string, offset *int64) error {
fi, err := os.Stat(path)
if err != nil {
return fmt.Errorf("failed to stat source file %s: %v", path, err)
Expand All @@ -278,6 +278,11 @@ func (inp *filestream) checkFileBeforeOpening(path string) error {
return fmt.Errorf("failed to open file %s, named pipes are not supported", path)
}

// file was truncated
if fi.Size() < *offset {
*offset = 0
}

return nil
}

Expand Down Expand Up @@ -306,7 +311,6 @@ func (inp *filestream) readFromSource(
switch err {
case ErrFileTruncate:
log.Info("File was truncated. Begin reading file from offset 0.")
s.Offset = 0
case ErrClosed:
log.Info("Reader was closed. Closing.")
case reader.ErrLineUnparsable:
Expand Down
Loading

0 comments on commit e18e32f

Please sign in to comment.