Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish porting all tests from Python to Golang from test_harvester.py #24397

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Config stores the options of a file stream.
type config struct {
readerConfig
ReaderConfig `config:",inline"`

Paths []string `config:"paths"`
Close closerConfig `config:"close"`
Expand Down Expand Up @@ -59,7 +59,7 @@ type stateChangeCloserConfig struct {
Renamed bool `config:"renamed"`
}

type readerConfig struct {
type ReaderConfig struct {
Backoff backoffConfig `config:"backoff"`
BufferSize int `config:"buffer_size"`
Encoding string `config:"encoding"`
Expand All @@ -79,7 +79,7 @@ type backoffConfig struct {

func defaultConfig() config {
return config{
readerConfig: defaultReaderConfig(),
ReaderConfig: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
Expand All @@ -104,8 +104,8 @@ func defaultCloserConfig() closerConfig {
}
}

func defaultReaderConfig() readerConfig {
return readerConfig{
func defaultReaderConfig() ReaderConfig {
return ReaderConfig{
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 10 * time.Second,
Expand Down
123 changes: 123 additions & 0 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,69 @@ func (e *inputTestingEnvironment) mustWriteLinesToFile(filename string, lines []
}
}

func (e *inputTestingEnvironment) mustAppendLinesToFile(filename string, lines []byte) {
path := e.abspath(filename)
f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
e.t.Fatalf("failed to open file '%s': %+v", path, err)
}
defer f.Close()

_, err = f.Write(lines)
if err != nil {
e.t.Fatalf("append lines to file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) mustRenameFile(oldname, newname string) {
err := os.Rename(e.abspath(oldname), e.abspath(newname))
if err != nil {
e.t.Fatalf("failed to rename file '%s': %+v", oldname, err)
}
}

func (e *inputTestingEnvironment) mustRemoveFile(filename string) {
path := e.abspath(filename)
err := os.Remove(path)
if err != nil {
e.t.Fatalf("failed to rename file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) mustSymlink(filename, symlinkname string) {
err := os.Symlink(e.abspath(filename), e.abspath(symlinkname))
if err != nil {
e.t.Fatalf("failed to create symlink to file '%s': %+v", filename, err)
}
}

func (e *inputTestingEnvironment) mustTruncateFile(filename string, size int64) {
path := e.abspath(filename)
err := os.Truncate(path, size)
if err != nil {
e.t.Fatalf("failed to truncate file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) abspath(filename string) string {
return filepath.Join(e.workingDir, filename)
}

func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
inputStore, _ := e.stateStore.Access()

actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
actual += 1
return true, nil
})
if err != nil {
e.t.Fatalf("error while iterating through registry: %+v", err)
}

require.Equal(e.t, actual, expectedCount)
}

// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expectedOffset int) {
filepath := e.abspath(filename)
Expand All @@ -131,6 +183,32 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expec
require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
}

func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) {
filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

inputStore, _ := e.stateStore.Access()

identifier, _ := newINodeDeviceIdentifier(nil)
src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath})

var entry registryEntry
err = inputStore.Get(src.Name(), &entry)
if err == nil {
e.t.Fatalf("key is not expected to be present '%s'", src.Name())
}
}

// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) {
entry := e.getRegistryState(key)

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
}

func (e *inputTestingEnvironment) getRegistryState(key string) registryEntry {
inputStore, _ := e.stateStore.Access()

Expand All @@ -157,6 +235,51 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
}
}

// waitUntilHarvesterIsDone detects Harvester stop by checking if the last client has been closed
// as when a Harvester stops the client is closed.
func (e *inputTestingEnvironment) waitUntilHarvesterIsDone() {
for !e.pipeline.clients[len(e.pipeline.clients)-1].closed {
time.Sleep(10 * time.Millisecond)
}
}

// requireEventReceived requires that the list of messages has made it into the output.
func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
foundEvents := make([]bool, len(events))
checkedEventCount := 0
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
if len(events) == checkedEventCount {
e.t.Fatalf("not enough expected elements")
}
message := evt.Fields["message"].(string)
if message == events[checkedEventCount] {
foundEvents[checkedEventCount] = true
}
checkedEventCount += 1
}
}

var missingEvents []string
for i, found := range foundEvents {
if !found {
missingEvents = append(missingEvents, events[i])
}
}

require.Equal(e.t, 0, len(missingEvents), "following events are missing: %+v", missingEvents)
}

func (e *inputTestingEnvironment) getOutputMessages() []string {
messages := make([]string, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
messages = append(messages, evt.Fields["message"].(string))
}
}
return messages
}

type testInputStore struct {
registry *statestore.Registry
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newFileReader(
log *logp.Logger,
canceler input.Canceler,
f *os.File,
config readerConfig,
config ReaderConfig,
closerConfig closerConfig,
) (*logFile, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/filestream/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestLogFileTimedClosing(t *testing.T) {
logp.L(),
context.TODO(),
f,
readerConfig{},
ReaderConfig{},
closerConfig{
OnStateChange: stateChangeCloserConfig{
CheckInterval: 1 * time.Second,
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestLogFileTruncated(t *testing.T) {
defer f.Close()
defer os.Remove(f.Name())

reader, err := newFileReader(logp.L(), context.TODO(), f, readerConfig{}, closerConfig{})
reader, err := newFileReader(logp.L(), context.TODO(), f, ReaderConfig{}, closerConfig{})
if err != nil {
t.Fatalf("error while creating logReader: %+v", err)
}
Expand Down
33 changes: 23 additions & 10 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type fileWatcherConfig struct {
// Interval is the time between two scans.
Interval time.Duration `config:"check_interval"`
// Scanner is the configuration of the scanner.
Scanner fileScannerConfig `config:",inline"`
Scanner FileScannerConfig `config:",inline"`
}

// fileWatcher gets the list of files from a FSWatcher and creates events by
Expand Down Expand Up @@ -142,10 +142,18 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
}

if prevInfo.ModTime() != info.ModTime() {
select {
case <-ctx.Done():
return
case w.events <- writeEvent(path, info):
if prevInfo.Size() > info.Size() {
select {
case <-ctx.Done():
return
case w.events <- truncateEvent(path, info):
}
} else {
select {
case <-ctx.Done():
return
case w.events <- writeEvent(path, info):
}
}
}

Expand Down Expand Up @@ -198,6 +206,10 @@ func writeEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Info: fi}
}

func truncateEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpTruncate, OldPath: path, NewPath: path, Info: fi}
}

func renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Info: fi}
}
Expand All @@ -214,20 +226,21 @@ func (w *fileWatcher) GetFiles() map[string]os.FileInfo {
return w.scanner.GetFiles()
}

type fileScannerConfig struct {
type FileScannerConfig struct {
ExcludedFiles []match.Matcher `config:"exclude_files"`
Symlinks bool `config:"symlinks"`
RecursiveGlob bool `config:"recursive_glob"`
}

func defaultFileScannerConfig() fileScannerConfig {
return fileScannerConfig{
func defaultFileScannerConfig() FileScannerConfig {
return FileScannerConfig{
Symlinks: false,
RecursiveGlob: true,
}
}

func newFileScanner(paths []string, cfg fileScannerConfig) (loginp.FSScanner, error) {
func newFileScanner(paths []string, cfg FileScannerConfig) (loginp.FSScanner, error) {
fmt.Println(cfg.Symlinks)
fs := fileScanner{
paths: paths,
excludedFiles: cfg.ExcludedFiles,
Expand All @@ -247,7 +260,7 @@ func newFileScanner(paths []string, cfg fileScannerConfig) (loginp.FSScanner, er
}

// resolveRecursiveGlobs expands `**` from the globs in multiple patterns
func (s *fileScanner) resolveRecursiveGlobs(c fileScannerConfig) error {
func (s *fileScanner) resolveRecursiveGlobs(c FileScannerConfig) error {
if !c.RecursiveGlob {
s.log.Debug("recursive glob disabled")
return nil
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestFileScanner(t *testing.T) {
test := test

t.Run(name, func(t *testing.T) {
cfg := fileScannerConfig{
cfg := FileScannerConfig{
ExcludedFiles: test.excludedFiles,
Symlinks: test.symlinks,
RecursiveGlob: false,
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/filestream/fswatch_test_non_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestFileScannerSymlinks(t *testing.T) {
test := test

t.Run(name, func(t *testing.T) {
cfg := fileScannerConfig{
cfg := FileScannerConfig{
ExcludedFiles: test.excludedFiles,
Symlinks: true,
RecursiveGlob: false,
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestFileWatcherRenamedFile(t *testing.T) {
t.Fatal(err)
}

cfg := fileScannerConfig{
cfg := FileScannerConfig{
ExcludedFiles: nil,
Symlinks: false,
RecursiveGlob: false,
Expand Down
9 changes: 6 additions & 3 deletions filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ type fileIdentifier interface {
// fileSource implements the Source interface
// It is required to identify and manage file sources.
type fileSource struct {
info os.FileInfo
newPath string
oldPath string
info os.FileInfo
newPath string
oldPath string
truncated bool

name string
identifierGenerator string
Expand Down Expand Up @@ -103,6 +104,7 @@ func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: i.name + identitySep + file.GetOSState(e.Info).String(),
identifierGenerator: i.name,
}
Expand Down Expand Up @@ -140,6 +142,7 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: p.name + identitySep + path,
identifierGenerator: p.name,
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/filestream/identifier_inode_deviceid.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(),
identifierGenerator: i.name,
}
Expand Down
Loading