From c7c8b46f7641dc0397b2c09162161317cb6fd02e Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 23 Feb 2024 18:15:53 +0100 Subject: [PATCH 01/13] [WIP] Test filestream file truncation Test that fails if Filestream does not correctly update the registry when the file is truncated while Filebeat is not running. --- .../integration/filestream_truncation_test.go | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 filebeat/tests/integration/filestream_truncation_test.go diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go new file mode 100644 index 00000000000..a0d2a0c7504 --- /dev/null +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -0,0 +1,196 @@ +//go:build integration + +package integration + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/tests/integration" +) + +var truncationCfg = ` +filebeat.inputs: + - type: filestream + id: id + enabled: true + paths: + - %s +output: + file: + enabled: true + codec.json: + pretty: false + path: %s + filename: "output" + rotate_on_startup: true +queue.mem: + flush: + timeout: 1s + min_events: 32 +filebeat.registry.flush: 1s +path.home: %s +logging: + files: + rotateeverybytes: 104857600 + rotateonstartup: false + level: debug + selectors: +# - file_watcher + - input.filestream + - input.harvester + metrics: + enabled: false +` + +func TestFilestreamFileTruncation(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + tempDir := filebeat.TempDir() + logFile := path.Join(tempDir, "log.log") + filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir)) + + writeLogFile(t, logFile, 10, false) + fi, err := os.Stat(logFile) + if err != nil { + t.Fatal(err) + } + + if fi.Size() != 500 { + t.Fatalf("[%s] file size is wrong: %d", logFile, fi.Size()) + } + filebeat.Start() + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + filebeat.Stop() + + registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") + entries := readFilestreamRegistryLog(t, registryLogFile) + lastEntry := entries[len(entries)-1] + if lastEntry.Offset != 500 { + t.Fatalf("expecting offset 500 got %d instead", lastEntry.Offset) + } + + if err := os.Truncate(logFile, 0); err != nil { + t.Fatalf("could not truncate log file: %s", err) + } + + writeLogFile(t, logFile, 5, true) + + fi, err = os.Stat(logFile) + if err != nil { + t.Fatal(err) + } + + if fi.Size() != 250 { + t.Fatalf("[%s] file size is wrong: %d", logFile, fi.Size()) + } + + filebeat.Start() + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + filebeat.Stop() + + entries = readFilestreamRegistryLog(t, registryLogFile) + lastEntry = entries[len(entries)-1] + if lastEntry.Offset != 250 { + t.Errorf("expecting offset 250 got %d instead", lastEntry.Offset) + max := len(entries) + if max > 10 { + max = 10 + } + + t.Log("last registry entries") + for _, e := range entries[:max] { + t.Logf("%+v\n", e) + } + t.FailNow() + } +} + +// writeLogFile each line is 50 bytes +func writeLogFile(t *testing.T, path string, count int, append bool) { + var file *os.File + var err error + if !append { + file, err = os.Create(path) + if err != nil { + t.Fatalf("could not create file '%s': %s", path, err) + } + } else { + file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) + } + defer file.Close() + defer file.Sync() + + now := time.Now().Format(time.RFC3339Nano) + for i := 0; i < count; i++ { + if _, err := fmt.Fprintf(file, "%s %13d\n", now, i); err != nil { + t.Fatalf("could not write line %d to file: %s", count+1, err) + } + } +} + +type registryEntry struct { + Key string + Offset int + Filename string + TTL time.Duration +} + +func readFilestreamRegistryLog(t *testing.T, path string) []registryEntry { + file, err := os.Open(path) + if err != nil { + t.Fatalf("could not open file '%s': %s", path, err) + } + + entries := []registryEntry{} + s := bufio.NewScanner(file) + + for s.Scan() { + line := s.Bytes() + + e := entry{} + if err := json.Unmarshal(line, &e); err != nil { + t.Fatalf("could not read line '%s': %s", string(line), err) + } + + if e.K == "" { + continue + } + + entries = append(entries, registryEntry{ + Key: e.K, + Offset: e.V.Cursor.Offset, + TTL: e.V.TTL, + Filename: e.V.Meta.Source, + }) + } + + return entries +} + +type entry struct { + K string `json:"k"` + V struct { + Cursor struct { + Offset int `json:"offset"` + } `json:"cursor"` + Meta struct { + Source string `json:"source"` + IdentifierName string `json:"identifier_name"` + } `json:"meta"` + TTL time.Duration `json:"ttl"` + Updated []any `json:"updated"` + } `json:"v"` +} From f9bc32d0f8f49860cd2d27ece987ec4d3ac94032 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 23 Feb 2024 19:54:13 +0100 Subject: [PATCH 02/13] Refactor tests Refactor tests and set `logging.files.rotateonstartup=false` when starting the Beat so we only generate a single log file even when the Beat is started multiple times. --- .../integration/filestream_truncation_test.go | 59 +++++++++---------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index a0d2a0c7504..dedef380d3e 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -37,9 +37,6 @@ queue.mem: filebeat.registry.flush: 1s path.home: %s logging: - files: - rotateeverybytes: 104857600 - rotateonstartup: false level: debug selectors: # - file_watcher @@ -58,28 +55,17 @@ func TestFilestreamFileTruncation(t *testing.T) { tempDir := filebeat.TempDir() logFile := path.Join(tempDir, "log.log") + registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir)) writeLogFile(t, logFile, 10, false) - fi, err := os.Stat(logFile) - if err != nil { - t.Fatal(err) - } - if fi.Size() != 500 { - t.Fatalf("[%s] file size is wrong: %d", logFile, fi.Size()) - } filebeat.Start() filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.Stop() - registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") - entries := readFilestreamRegistryLog(t, registryLogFile) - lastEntry := entries[len(entries)-1] - if lastEntry.Offset != 500 { - t.Fatalf("expecting offset 500 got %d instead", lastEntry.Offset) - } + assertLastOffset(t, registryLogFile, 500) if err := os.Truncate(logFile, 0); err != nil { t.Fatalf("could not truncate log file: %s", err) @@ -87,38 +73,34 @@ func TestFilestreamFileTruncation(t *testing.T) { writeLogFile(t, logFile, 5, true) - fi, err = os.Stat(logFile) - if err != nil { - t.Fatal(err) - } - - if fi.Size() != 250 { - t.Fatalf("[%s] file size is wrong: %d", logFile, fi.Size()) - } - filebeat.Start() filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.Stop() - entries = readFilestreamRegistryLog(t, registryLogFile) - lastEntry = entries[len(entries)-1] - if lastEntry.Offset != 250 { - t.Errorf("expecting offset 250 got %d instead", lastEntry.Offset) + assertLastOffset(t, registryLogFile, 250) +} + +func assertLastOffset(t *testing.T, path string, offset int) { + entries := readFilestreamRegistryLog(t, path) + lastEntry := entries[len(entries)-1] + if lastEntry.Offset != offset { + t.Errorf("expecting offset %d got %d instead", offset, lastEntry.Offset) + t.Log("last registry entries:") + max := len(entries) if max > 10 { max = 10 } - - t.Log("last registry entries") for _, e := range entries[:max] { t.Logf("%+v\n", e) } + t.FailNow() } } -// writeLogFile each line is 50 bytes +// writeLogFile writes count lines to path, each line is 50 bytes func writeLogFile(t *testing.T, path string, count int, append bool) { var file *os.File var err error @@ -130,6 +112,7 @@ func writeLogFile(t *testing.T, path string, count int, append bool) { } else { file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) } + defer assertFileSize(t, path, int64(count*50)) defer file.Close() defer file.Sync() @@ -141,6 +124,18 @@ func writeLogFile(t *testing.T, path string, count int, append bool) { } } +func assertFileSize(t *testing.T, path string, size int64) { + t.Helper() + fi, err := os.Stat(path) + if err != nil { + t.Fatalf("could not call Stat on '%s': %s", path, err) + } + + if fi.Size() != size { + t.Fatalf("[%s] expecting size %d, got: %d", path, size, fi.Size()) + } +} + type registryEntry struct { Key string Offset int From d9b76146ae833a075f8f43acdfe0feeef3bec9b6 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 4 Mar 2024 12:14:41 +0100 Subject: [PATCH 03/13] Better document tests --- filebeat/tests/integration/filestream_truncation_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index dedef380d3e..d8d8132fe8f 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -58,26 +58,31 @@ func TestFilestreamFileTruncation(t *testing.T) { registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir)) + // 1. Create a log file with some lines writeLogFile(t, logFile, 10, false) + // 2. Ingest the file and stop Filebeat filebeat.Start() filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.Stop() + // 3. Assert the offset is correctly set in the registry assertLastOffset(t, registryLogFile, 500) + // 4. Truncate the file and write some data (less than before) if err := os.Truncate(logFile, 0); err != nil { t.Fatalf("could not truncate log file: %s", err) } - writeLogFile(t, logFile, 5, true) + // 5. Read the file again and stop Filebeat filebeat.Start() filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.Stop() + // 6. Assert the registry offset is new, smaller file size. assertLastOffset(t, registryLogFile, 250) } From 0cb1db8651903d8c6e8c50cca0ea25fb8733bcaf Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 19 Mar 2024 12:53:34 +0100 Subject: [PATCH 04/13] Test case for truncating the file while Filebeat is running --- .../integration/filestream_truncation_test.go | 93 +++++++++++++++++-- filebeat/tests/integration/store_test.go | 4 +- libbeat/tests/integration/framework.go | 54 +++++++++++ 3 files changed, 140 insertions(+), 11 deletions(-) diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index d8d8132fe8f..bcc57ee1901 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -20,6 +20,7 @@ filebeat.inputs: - type: filestream id: id enabled: true + prospector.scanner.check_interval: 30s paths: - %s output: @@ -46,7 +47,65 @@ logging: enabled: false ` -func TestFilestreamFileTruncation(t *testing.T) { +func TestFilestreamLiveFileTruncation(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + tempDir := filebeat.TempDir() + logFile := path.Join(tempDir, "log.log") + registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") + filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir)) + + // genLogCtx, cancel := context.WithCancel(context.Background()) + // t.Cleanup(cancel) + // integration.GenerateLogFile(genLogCtx, t, logFile, false) + + // 1. Create a log file and let Filebeat harvest all contents + writeLogFile(t, logFile, 200, false) + filebeat.Start() + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + + // 2. Truncate the file and wait Filebeat to close the file + // time.Sleep(10 * time.Second) + t.Log("sleeping done, truncating file") + // time.Sleep(1 * time.Second) + if err := os.Truncate(logFile, 0); err != nil { + t.Fatalf("could not truncate log file: %s", err) + } + + // 3. Ensure Filebeat detected the file truncation + filebeat.WaitForLogs("File was truncated as offset (10000) > size (0)", 20*time.Second, "file was not truncated") + filebeat.WaitForLogs("File was truncated, nothing to read", 20*time.Second, "reader loop did not stop") + filebeat.WaitForLogs("Stopped harvester for file", 20*time.Second, "harvester did not stop") + filebeat.WaitForLogs("Closing reader of filestream", 20*time.Second, "reader did not close") + + // 4. Now we need to stop Filebeat before the next scan cycle + filebeat.Stop() + + // Assert we offset in the registry + // TODO (Tiago): decide whether Filebeat can exit without + // updating the offset. Currently it does and it can be considered + // one of the root causes for the issue + assertLastOffset(t, registryLogFile, 10_000) + + // TODO: ensure data was read + + // Open for appending because the file has already been truncated + writeLogFile(t, logFile, 10, true) + // 5. Start Filebeat again. + filebeat.Start() + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") + + assertLastOffset(t, registryLogFile, 500) + +} + +func TestFilestreamOfflineFileTruncation(t *testing.T) { filebeat := integration.NewBeat( t, "filebeat", @@ -87,6 +146,7 @@ func TestFilestreamFileTruncation(t *testing.T) { } func assertLastOffset(t *testing.T, path string, offset int) { + t.Helper() entries := readFilestreamRegistryLog(t, path) lastEntry := entries[len(entries)-1] if lastEntry.Offset != offset { @@ -117,13 +177,28 @@ func writeLogFile(t *testing.T, path string, count int, append bool) { } else { file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) } - defer assertFileSize(t, path, int64(count*50)) - defer file.Close() - defer file.Sync() - - now := time.Now().Format(time.RFC3339Nano) + defer func() { + assertFileSize(t, path, int64(count*50)) + if t.Failed() { + t.Log("Waiting a few seconds") + time.Sleep(time.Second * 2) + t.Log("asserting file size again") + assertFileSize(t, path, int64(count*50)) + } + }() + defer func() { + if err := file.Close(); err != nil { + t.Fatalf("could not close file: %s", err) + } + }() + defer func() { + if err := file.Sync(); err != nil { + t.Fatalf("could not sync file: %s", err) + } + }() + now := time.Now().Format(time.RFC3339) for i := 0; i < count; i++ { - if _, err := fmt.Fprintf(file, "%s %13d\n", now, i); err != nil { + if _, err := fmt.Fprintf(file, "%s %13d\n", now, i); err != nil { t.Fatalf("could not write line %d to file: %s", count+1, err) } } @@ -133,11 +208,11 @@ func assertFileSize(t *testing.T, path string, size int64) { t.Helper() fi, err := os.Stat(path) if err != nil { - t.Fatalf("could not call Stat on '%s': %s", path, err) + t.Errorf("could not call Stat on '%s': %s", path, err) } if fi.Size() != size { - t.Fatalf("[%s] expecting size %d, got: %d", path, size, fi.Size()) + t.Errorf("[%s] expecting size %d, got: %d", path, size, fi.Size()) } } diff --git a/filebeat/tests/integration/store_test.go b/filebeat/tests/integration/store_test.go index c3ddb1ca759..d4ee36298d5 100644 --- a/filebeat/tests/integration/store_test.go +++ b/filebeat/tests/integration/store_test.go @@ -106,10 +106,10 @@ func TestStore(t *testing.T) { filebeat.Stop() registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") - readFilestreamRegistryLog(t, registryLogFile, "remove", 10) + countOperationsFromFilestreamRegistry(t, registryLogFile, "remove", 10) } -func readFilestreamRegistryLog(t *testing.T, path, op string, expectedCount int) { +func countOperationsFromFilestreamRegistry(t *testing.T, path, op string, expectedCount int) { file, err := os.Open(path) if err != nil { t.Fatalf("could not open file '%s': %s", path, err) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 6017c63fd87..7b4a3a85b05 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -719,4 +719,58 @@ func GenerateLogFile(t *testing.T, path string, count int, append bool) { t.Fatalf("could not write line %d to file: %s", count+1, err) } } + // ======= + // // GenerateLogFile generates a log file by appending the current + // // time to it every second. + // // TODO (Tiago): Find a better name + // func GenerateLogFile(ctx context.Context, t *testing.T, fullPath string, append bool) { + // var f *os.File + // var err error + // if !append { + // f, err = os.Create(fullPath) + // } else { + // f, err = os.OpenFile(fullPath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) + // } + // if err != nil { + // t.Fatalf("could not create file '%s': %s", fullPath, err) + // } + + // go func() { + // t.Helper() + // ticker := time.NewTicker(time.Second) + // t.Cleanup(ticker.Stop) + + // done := make(chan struct{}) + // t.Cleanup(func() { close(done) }) + + // defer func() { + // if err := f.Close(); err != nil { + // t.Errorf("could not close log file '%s': %s", fullPath, err) + // } + // }() + + // for { + // select { + // case <-ctx.Done(): + // return + // case <-done: + // return + // case now := <-ticker.C: + // fmt.Println(now.Format(time.RFC3339)) + // _, err := fmt.Fprintln(f, now.Format(time.RFC3339)) + // if err != nil { + // // The Go compiler does not allow me to call t.Fatalf from a non-test + // // goroutine, so just log it instead + // t.Errorf("could not write data to log file '%s': %s", fullPath, err) + // return + // } + // // make sure log lines are synced as quickly as possible + // if err := f.Sync(); err != nil { + // t.Errorf("could not sync file '%s': %s", fullPath, err) + // } + // } + // } + // }() + // + // >>>>>>> e1b34b611f (Test case for truncating the file while Filebeat is running) } From a687c752c749f73f5a4dac5d90c154fc75cd0838 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 19 Mar 2024 20:44:20 +0100 Subject: [PATCH 05/13] Fix file truncation handling This commit fixes file truncation handling for the case when the file is truncated when Filebeat is not running. --- filebeat/input/filestream/input.go | 67 +++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 43c6ddcb19f..2b9c7b2a266 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -114,7 +114,7 @@ func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error { return fmt.Errorf("not file source") } - reader, err := inp.open(ctx.Logger, ctx.Cancelation, fs, 0) + reader, _, err := inp.open(ctx.Logger, ctx.Cancelation, fs, 0, src) if err != nil { return err } @@ -136,12 +136,16 @@ func (inp *filestream) Run( log := ctx.Logger.With("path", fs.newPath).With("state-id", src.Name()) state := initState(log, cursor, fs) - r, err := inp.open(log, ctx.Cancelation, fs, state.Offset) + r, truncated, err := inp.open(log, ctx.Cancelation, fs, state.Offset, src) if err != nil { log.Errorf("File could not be opened for reading: %v", err) return err } + if truncated { + state.Offset = 0 + } + metrics.FilesActive.Inc() metrics.HarvesterRunning.Inc() defer metrics.FilesActive.Dec() @@ -173,10 +177,21 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { return state } -func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSource, offset int64) (reader.Reader, error) { - f, encoding, err := inp.openFile(log, fs.newPath, offset) +func (inp *filestream) open( + log *logp.Logger, + canceler input.Canceler, + fs fileSource, + offset int64, + src loginp.Source, +) (reader.Reader, bool, error) { + + f, encoding, truncated, err := inp.openFile(log, fs.newPath, offset, src) if err != nil { - return nil, err + return nil, truncated, err + } + + if truncated { + offset = 0 } ok := false // used for cleanup @@ -201,12 +216,12 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo // don't require 'complicated' logic. logReader, err := newFileReader(log, canceler, f, inp.readerConfig, closerCfg) if err != nil { - return nil, err + return nil, truncated, err } dbgReader, err := debug.AppendReaders(logReader) if err != nil { - return nil, err + return nil, truncated, err } // Configure MaxBytes limit for EncodeReader as multiplied by 4 @@ -223,7 +238,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo MaxBytes: encReaderMaxBytes, }) if err != nil { - return nil, err + return nil, truncated, err } r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator) @@ -235,61 +250,73 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes) ok = true // no need to close the file - return r, nil + return r, truncated, nil } // openFile opens a file and checks for the encoding. In case the encoding cannot be detected // or the file cannot be opened because for example of failing read permissions, an error // is returned and the harvester is closed. The file will be picked up again the next time -// the file system is scanned -func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, encoding.Encoding, error) { +// the file system is scanned. +// +// openFile will also detect and hadle file truncation. If a file is truncated +// then the 3rd return value is true. +func (inp *filestream) openFile( + log *logp.Logger, + path string, + offset int64, + src loginp.Source, +) (*os.File, encoding.Encoding, bool, error) { fi, err := os.Stat(path) if err != nil { - return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err) + return nil, nil, false, fmt.Errorf("failed to stat source file %s: %w", path, err) } // it must be checked if the file is not a named pipe before we try to open it // if it is a named pipe os.OpenFile fails, so there is no need to try opening it. if fi.Mode()&os.ModeNamedPipe != 0 { - return nil, nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name()) + return nil, nil, false, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name()) } f, err := file.ReadOpen(path) if err != nil { - return nil, nil, fmt.Errorf("failed opening %s: %w", path, err) + return nil, nil, false, fmt.Errorf("failed opening %s: %w", path, err) } ok := false defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close)) fi, err = f.Stat() if err != nil { - return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err) + return nil, nil, false, fmt.Errorf("failed to stat source file %s: %w", path, err) } err = checkFileBeforeOpening(fi) if err != nil { - return nil, nil, err + return nil, nil, false, err } + truncated := false if fi.Size() < offset { + // if the file was truncated we need to reset the offset and notify + // all callers so they can also reset their offsets + truncated = true log.Infof("File was truncated. Reading file from offset 0. Path=%s", path) offset = 0 } err = inp.initFileOffset(f, offset) if err != nil { - return nil, nil, err + return nil, nil, truncated, err } encoding, err := inp.encodingFactory(f) if err != nil { if errors.Is(err, transform.ErrShortSrc) { - return nil, nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f) + return nil, nil, truncated, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f) } - return nil, nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err) + return nil, nil, truncated, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err) } ok = true // no need to close the file - return f, encoding, nil + return f, encoding, truncated, nil } func checkFileBeforeOpening(fi os.FileInfo) error { From dfa3d2337e7130a4e60a00cf01ecbc3c2ed673e9 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 20 Mar 2024 08:57:21 +0100 Subject: [PATCH 06/13] Add changelog and remove comments from tests --- CHANGELOG.next.asciidoc | 1 + .../integration/filestream_truncation_test.go | 16 ++-------------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 37fc00bd926..c5160e8f3a4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -100,6 +100,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of un-parsed JSON in O365 module. {issue}37800[37800] {pull}38709[38709] - Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488] - Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780] +- Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416] *Heartbeat* diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index bcc57ee1901..b8fa11cfd84 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -40,7 +40,7 @@ path.home: %s logging: level: debug selectors: -# - file_watcher + - file_watcher - input.filestream - input.harvester metrics: @@ -59,10 +59,6 @@ func TestFilestreamLiveFileTruncation(t *testing.T) { registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json") filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir)) - // genLogCtx, cancel := context.WithCancel(context.Background()) - // t.Cleanup(cancel) - // integration.GenerateLogFile(genLogCtx, t, logFile, false) - // 1. Create a log file and let Filebeat harvest all contents writeLogFile(t, logFile, 200, false) filebeat.Start() @@ -70,9 +66,6 @@ func TestFilestreamLiveFileTruncation(t *testing.T) { filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") // 2. Truncate the file and wait Filebeat to close the file - // time.Sleep(10 * time.Second) - t.Log("sleeping done, truncating file") - // time.Sleep(1 * time.Second) if err := os.Truncate(logFile, 0); err != nil { t.Fatalf("could not truncate log file: %s", err) } @@ -87,22 +80,17 @@ func TestFilestreamLiveFileTruncation(t *testing.T) { filebeat.Stop() // Assert we offset in the registry - // TODO (Tiago): decide whether Filebeat can exit without - // updating the offset. Currently it does and it can be considered - // one of the root causes for the issue assertLastOffset(t, registryLogFile, 10_000) - // TODO: ensure data was read - // Open for appending because the file has already been truncated writeLogFile(t, logFile, 10, true) + // 5. Start Filebeat again. filebeat.Start() filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") assertLastOffset(t, registryLogFile, 500) - } func TestFilestreamOfflineFileTruncation(t *testing.T) { From f88d7c89f35935138b918df1c2b188657f02ca38 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 20 Mar 2024 08:58:37 +0100 Subject: [PATCH 07/13] Add license headers --- .../integration/filestream_truncation_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index b8fa11cfd84..6630a78f11a 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //go:build integration package integration From cb0b68655955391f760323d2de6d6c3af3508665 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 20 Mar 2024 09:14:05 +0100 Subject: [PATCH 08/13] Remove unused parameters --- filebeat/input/filestream/input.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 2b9c7b2a266..0136b062b48 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -114,7 +114,7 @@ func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error { return fmt.Errorf("not file source") } - reader, _, err := inp.open(ctx.Logger, ctx.Cancelation, fs, 0, src) + reader, _, err := inp.open(ctx.Logger, ctx.Cancelation, fs, 0) if err != nil { return err } @@ -136,7 +136,7 @@ func (inp *filestream) Run( log := ctx.Logger.With("path", fs.newPath).With("state-id", src.Name()) state := initState(log, cursor, fs) - r, truncated, err := inp.open(log, ctx.Cancelation, fs, state.Offset, src) + r, truncated, err := inp.open(log, ctx.Cancelation, fs, state.Offset) if err != nil { log.Errorf("File could not be opened for reading: %v", err) return err @@ -182,10 +182,9 @@ func (inp *filestream) open( canceler input.Canceler, fs fileSource, offset int64, - src loginp.Source, ) (reader.Reader, bool, error) { - f, encoding, truncated, err := inp.openFile(log, fs.newPath, offset, src) + f, encoding, truncated, err := inp.openFile(log, fs.newPath, offset) if err != nil { return nil, truncated, err } @@ -264,7 +263,6 @@ func (inp *filestream) openFile( log *logp.Logger, path string, offset int64, - src loginp.Source, ) (*os.File, encoding.Encoding, bool, error) { fi, err := os.Stat(path) if err != nil { From 0315bad2ff800fd7c5951f43993a4f84bf66e8df Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 20 Mar 2024 09:23:31 +0100 Subject: [PATCH 09/13] Small refactoring and clean up --- .../integration/filestream_truncation_test.go | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index 6630a78f11a..ea799576a58 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -35,7 +35,7 @@ import ( var truncationCfg = ` filebeat.inputs: - type: filestream - id: id + id: a-unique-filestream-input-id enabled: true prospector.scanner.check_interval: 30s paths: @@ -181,16 +181,12 @@ func writeLogFile(t *testing.T, path string, count int, append bool) { } } else { file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) - } - defer func() { - assertFileSize(t, path, int64(count*50)) - if t.Failed() { - t.Log("Waiting a few seconds") - time.Sleep(time.Second * 2) - t.Log("asserting file size again") - assertFileSize(t, path, int64(count*50)) + if err != nil { + t.Fatalf("could not open or create file: '%s': %s", path, err) } - }() + } + + defer assertFileSize(t, path, int64(count*50)) defer func() { if err := file.Close(); err != nil { t.Fatalf("could not close file: %s", err) @@ -245,15 +241,17 @@ func readFilestreamRegistryLog(t *testing.T, path string) []registryEntry { t.Fatalf("could not read line '%s': %s", string(line), err) } - if e.K == "" { + // Skips registry log entries containing the operation ID like: + // '{"op":"set","id":46}' + if e.Key == "" { continue } entries = append(entries, registryEntry{ - Key: e.K, - Offset: e.V.Cursor.Offset, - TTL: e.V.TTL, - Filename: e.V.Meta.Source, + Key: e.Key, + Offset: e.Value.Cursor.Offset, + TTL: e.Value.TTL, + Filename: e.Value.Meta.Source, }) } @@ -261,16 +259,14 @@ func readFilestreamRegistryLog(t *testing.T, path string) []registryEntry { } type entry struct { - K string `json:"k"` - V struct { + Key string `json:"k"` + Value struct { Cursor struct { Offset int `json:"offset"` } `json:"cursor"` Meta struct { - Source string `json:"source"` - IdentifierName string `json:"identifier_name"` + Source string `json:"source"` } `json:"meta"` - TTL time.Duration `json:"ttl"` - Updated []any `json:"updated"` + TTL time.Duration `json:"ttl"` } `json:"v"` } From cb4f787ce6282e2bd9cdf582ddbbcd9901ddcee1 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 20 Mar 2024 09:31:55 +0100 Subject: [PATCH 10/13] [Refactoring] Move log file generation to the integration framework --- .../integration/filestream_truncation_test.go | 55 ++----------------- libbeat/tests/integration/framework.go | 54 ------------------ 2 files changed, 4 insertions(+), 105 deletions(-) diff --git a/filebeat/tests/integration/filestream_truncation_test.go b/filebeat/tests/integration/filestream_truncation_test.go index ea799576a58..98db9a6ad23 100644 --- a/filebeat/tests/integration/filestream_truncation_test.go +++ b/filebeat/tests/integration/filestream_truncation_test.go @@ -77,7 +77,7 @@ func TestFilestreamLiveFileTruncation(t *testing.T) { filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir)) // 1. Create a log file and let Filebeat harvest all contents - writeLogFile(t, logFile, 200, false) + integration.GenerateLogFile(t, logFile, 200, false) filebeat.Start() filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file") @@ -100,7 +100,7 @@ func TestFilestreamLiveFileTruncation(t *testing.T) { assertLastOffset(t, registryLogFile, 10_000) // Open for appending because the file has already been truncated - writeLogFile(t, logFile, 10, true) + integration.GenerateLogFile(t, logFile, 10, true) // 5. Start Filebeat again. filebeat.Start() @@ -123,7 +123,7 @@ func TestFilestreamOfflineFileTruncation(t *testing.T) { filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir)) // 1. Create a log file with some lines - writeLogFile(t, logFile, 10, false) + integration.GenerateLogFile(t, logFile, 10, false) // 2. Ingest the file and stop Filebeat filebeat.Start() @@ -138,7 +138,7 @@ func TestFilestreamOfflineFileTruncation(t *testing.T) { if err := os.Truncate(logFile, 0); err != nil { t.Fatalf("could not truncate log file: %s", err) } - writeLogFile(t, logFile, 5, true) + integration.GenerateLogFile(t, logFile, 5, true) // 5. Read the file again and stop Filebeat filebeat.Start() @@ -170,53 +170,6 @@ func assertLastOffset(t *testing.T, path string, offset int) { } } -// writeLogFile writes count lines to path, each line is 50 bytes -func writeLogFile(t *testing.T, path string, count int, append bool) { - var file *os.File - var err error - if !append { - file, err = os.Create(path) - if err != nil { - t.Fatalf("could not create file '%s': %s", path, err) - } - } else { - file, err = os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) - if err != nil { - t.Fatalf("could not open or create file: '%s': %s", path, err) - } - } - - defer assertFileSize(t, path, int64(count*50)) - defer func() { - if err := file.Close(); err != nil { - t.Fatalf("could not close file: %s", err) - } - }() - defer func() { - if err := file.Sync(); err != nil { - t.Fatalf("could not sync file: %s", err) - } - }() - now := time.Now().Format(time.RFC3339) - for i := 0; i < count; i++ { - if _, err := fmt.Fprintf(file, "%s %13d\n", now, i); err != nil { - t.Fatalf("could not write line %d to file: %s", count+1, err) - } - } -} - -func assertFileSize(t *testing.T, path string, size int64) { - t.Helper() - fi, err := os.Stat(path) - if err != nil { - t.Errorf("could not call Stat on '%s': %s", path, err) - } - - if fi.Size() != size { - t.Errorf("[%s] expecting size %d, got: %d", path, size, fi.Size()) - } -} - type registryEntry struct { Key string Offset int diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 7b4a3a85b05..6017c63fd87 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -719,58 +719,4 @@ func GenerateLogFile(t *testing.T, path string, count int, append bool) { t.Fatalf("could not write line %d to file: %s", count+1, err) } } - // ======= - // // GenerateLogFile generates a log file by appending the current - // // time to it every second. - // // TODO (Tiago): Find a better name - // func GenerateLogFile(ctx context.Context, t *testing.T, fullPath string, append bool) { - // var f *os.File - // var err error - // if !append { - // f, err = os.Create(fullPath) - // } else { - // f, err = os.OpenFile(fullPath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) - // } - // if err != nil { - // t.Fatalf("could not create file '%s': %s", fullPath, err) - // } - - // go func() { - // t.Helper() - // ticker := time.NewTicker(time.Second) - // t.Cleanup(ticker.Stop) - - // done := make(chan struct{}) - // t.Cleanup(func() { close(done) }) - - // defer func() { - // if err := f.Close(); err != nil { - // t.Errorf("could not close log file '%s': %s", fullPath, err) - // } - // }() - - // for { - // select { - // case <-ctx.Done(): - // return - // case <-done: - // return - // case now := <-ticker.C: - // fmt.Println(now.Format(time.RFC3339)) - // _, err := fmt.Fprintln(f, now.Format(time.RFC3339)) - // if err != nil { - // // The Go compiler does not allow me to call t.Fatalf from a non-test - // // goroutine, so just log it instead - // t.Errorf("could not write data to log file '%s': %s", fullPath, err) - // return - // } - // // make sure log lines are synced as quickly as possible - // if err := f.Sync(); err != nil { - // t.Errorf("could not sync file '%s': %s", fullPath, err) - // } - // } - // } - // }() - // - // >>>>>>> e1b34b611f (Test case for truncating the file while Filebeat is running) } From 5d1565ba89b1d58a953f282aaf187d89e85f84d0 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 27 Mar 2024 09:28:38 +0100 Subject: [PATCH 11/13] [CI] Add `refreshenv` to Windows script --- .ci/scripts/install-tools.bat | 1 + 1 file changed, 1 insertion(+) diff --git a/.ci/scripts/install-tools.bat b/.ci/scripts/install-tools.bat index 86572356da9..acd177ea40d 100644 --- a/.ci/scripts/install-tools.bat +++ b/.ci/scripts/install-tools.bat @@ -17,6 +17,7 @@ SET USERPROFILE=%OLD_USERPROFILE% echo "Upgrade chocolatey to latest version" choco upgrade chocolatey -y +refreshenv IF NOT EXIST C:\Python38\python.exe ( REM Install python 3.8 From aa8f9d6adc3a832d37571a9bf51656fab8ba4928 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 9 Apr 2024 07:39:42 +0200 Subject: [PATCH 12/13] Remove unnecessary line --- .ci/scripts/install-tools.bat | 1 - 1 file changed, 1 deletion(-) diff --git a/.ci/scripts/install-tools.bat b/.ci/scripts/install-tools.bat index acd177ea40d..86572356da9 100644 --- a/.ci/scripts/install-tools.bat +++ b/.ci/scripts/install-tools.bat @@ -17,7 +17,6 @@ SET USERPROFILE=%OLD_USERPROFILE% echo "Upgrade chocolatey to latest version" choco upgrade chocolatey -y -refreshenv IF NOT EXIST C:\Python38\python.exe ( REM Install python 3.8 From bbc3532686a868390f5e11ad155e3b8e743b03d6 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 11 Apr 2024 08:07:33 +0000 Subject: [PATCH 13/13] Adding padding to log file generation The generated log file did not have its size predictable because the RFC3339 may not include the timezone offset. This is fixed by looking at the time string size and adding padding if necessary. --- libbeat/tests/integration/framework.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 6017c63fd87..b3bb33883b9 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -714,6 +714,14 @@ func GenerateLogFile(t *testing.T, path string, count int, append bool) { } }() now := time.Now().Format(time.RFC3339) + // If the length is different, e.g when there is no offset from UTC. + // add some padding so the length is predictable + if len(now) != len(time.RFC3339) { + paddingNeeded := len(time.RFC3339) - len(now) + for i := 0; i < paddingNeeded; i++ { + now += "-" + } + } for i := 0; i < count; i++ { if _, err := fmt.Fprintf(file, "%s %13d\n", now, i); err != nil { t.Fatalf("could not write line %d to file: %s", count+1, err)