Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of truncated files for Filestream input #38416

Merged
merged 13 commits into from
Apr 12, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix handling of un-parsed JSON in O365 module. {issue}37800[37800] {pull}38709[38709]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
- Fix indexing failures by re-enabling event normalisation in netflow input. {issue}38703[38703] {pull}38780[38780]
- Fix handling of truncated files in Filestream {issue}38070[38070] {pull}38416[38416]

*Heartbeat*

Expand Down
65 changes: 45 additions & 20 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error {
return fmt.Errorf("not file source")
}

reader, err := inp.open(ctx.Logger, ctx.Cancelation, fs, 0)
reader, _, err := inp.open(ctx.Logger, ctx.Cancelation, fs, 0)
if err != nil {
return err
}
Expand All @@ -136,12 +136,16 @@ func (inp *filestream) Run(
log := ctx.Logger.With("path", fs.newPath).With("state-id", src.Name())
state := initState(log, cursor, fs)

r, err := inp.open(log, ctx.Cancelation, fs, state.Offset)
r, truncated, err := inp.open(log, ctx.Cancelation, fs, state.Offset)
if err != nil {
log.Errorf("File could not be opened for reading: %v", err)
return err
}

if truncated {
state.Offset = 0
}

metrics.FilesActive.Inc()
metrics.HarvesterRunning.Inc()
defer metrics.FilesActive.Dec()
Expand Down Expand Up @@ -173,10 +177,20 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
return state
}

func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSource, offset int64) (reader.Reader, error) {
f, encoding, err := inp.openFile(log, fs.newPath, offset)
func (inp *filestream) open(
log *logp.Logger,
canceler input.Canceler,
fs fileSource,
offset int64,
rdner marked this conversation as resolved.
Show resolved Hide resolved
) (reader.Reader, bool, error) {

f, encoding, truncated, err := inp.openFile(log, fs.newPath, offset)
if err != nil {
return nil, err
return nil, truncated, err
}

if truncated {
offset = 0
}

ok := false // used for cleanup
Expand All @@ -201,12 +215,12 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo
// don't require 'complicated' logic.
logReader, err := newFileReader(log, canceler, f, inp.readerConfig, closerCfg)
if err != nil {
return nil, err
return nil, truncated, err
}

dbgReader, err := debug.AppendReaders(logReader)
if err != nil {
return nil, err
return nil, truncated, err
}

// Configure MaxBytes limit for EncodeReader as multiplied by 4
Expand All @@ -223,7 +237,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo
MaxBytes: encReaderMaxBytes,
})
if err != nil {
return nil, err
return nil, truncated, err
}

r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)
Expand All @@ -235,61 +249,72 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo
r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

ok = true // no need to close the file
return r, nil
return r, truncated, nil
}

// openFile opens a file and checks for the encoding. In case the encoding cannot be detected
// or the file cannot be opened because for example of failing read permissions, an error
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, encoding.Encoding, error) {
// the file system is scanned.
//
// openFile will also detect and hadle file truncation. If a file is truncated
// then the 3rd return value is true.
func (inp *filestream) openFile(
log *logp.Logger,
path string,
offset int64,
) (*os.File, encoding.Encoding, bool, error) {
fi, err := os.Stat(path)
if err != nil {
return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
return nil, nil, false, fmt.Errorf("failed to stat source file %s: %w", path, err)
}

// it must be checked if the file is not a named pipe before we try to open it
// if it is a named pipe os.OpenFile fails, so there is no need to try opening it.
if fi.Mode()&os.ModeNamedPipe != 0 {
return nil, nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name())
return nil, nil, false, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name())
}

f, err := file.ReadOpen(path)
if err != nil {
return nil, nil, fmt.Errorf("failed opening %s: %w", path, err)
return nil, nil, false, fmt.Errorf("failed opening %s: %w", path, err)
}
ok := false
defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close))

fi, err = f.Stat()
if err != nil {
return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
return nil, nil, false, fmt.Errorf("failed to stat source file %s: %w", path, err)
}

err = checkFileBeforeOpening(fi)
if err != nil {
return nil, nil, err
return nil, nil, false, err
}

truncated := false
if fi.Size() < offset {
// if the file was truncated we need to reset the offset and notify
// all callers so they can also reset their offsets
truncated = true
log.Infof("File was truncated. Reading file from offset 0. Path=%s", path)
offset = 0
}
err = inp.initFileOffset(f, offset)
if err != nil {
return nil, nil, err
return nil, nil, truncated, err
}

encoding, err := inp.encodingFactory(f)
if err != nil {
if errors.Is(err, transform.ErrShortSrc) {
return nil, nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f)
return nil, nil, truncated, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f)
}
return nil, nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err)
return nil, nil, truncated, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err)
}

ok = true // no need to close the file
return f, encoding, nil
return f, encoding, truncated, nil
}

func checkFileBeforeOpening(fi os.FileInfo) error {
Expand Down
225 changes: 225 additions & 0 deletions filebeat/tests/integration/filestream_truncation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Licensed to Elasticsearch B.V. under one or more contributor
Copy link
Member

@rdner rdner Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@belimawr Did you make sure all these tests are failing without the fix applied?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wrote them before start working on the fix. But it has been a while since I ran them, so I'll re-run them without the fix before merging just to be on the safe side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--- FAIL: TestFilestreamLiveFileTruncation (8.91s)
    filestream_truncation_test.go:110: expecting offset 500 got 10500 instead

--- FAIL: TestFilestreamOfflineFileTruncation (4.28s)
    filestream_truncation_test.go:150: expecting offset 250 got 750 instead

// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build integration

package integration

import (
"bufio"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/tests/integration"
)

var truncationCfg = `
filebeat.inputs:
- type: filestream
id: a-unique-filestream-input-id
enabled: true
prospector.scanner.check_interval: 30s
paths:
- %s
output:
file:
enabled: true
codec.json:
pretty: false
path: %s
filename: "output"
rotate_on_startup: true
queue.mem:
flush:
timeout: 1s
min_events: 32
filebeat.registry.flush: 1s
path.home: %s
logging:
level: debug
selectors:
- file_watcher
- input.filestream
- input.harvester
metrics:
enabled: false
`

func TestFilestreamLiveFileTruncation(t *testing.T) {
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)

tempDir := filebeat.TempDir()
logFile := path.Join(tempDir, "log.log")
registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json")
filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir))

// 1. Create a log file and let Filebeat harvest all contents
integration.GenerateLogFile(t, logFile, 200, false)
filebeat.Start()
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")

// 2. Truncate the file and wait Filebeat to close the file
if err := os.Truncate(logFile, 0); err != nil {
t.Fatalf("could not truncate log file: %s", err)
}

// 3. Ensure Filebeat detected the file truncation
filebeat.WaitForLogs("File was truncated as offset (10000) > size (0)", 20*time.Second, "file was not truncated")
filebeat.WaitForLogs("File was truncated, nothing to read", 20*time.Second, "reader loop did not stop")
filebeat.WaitForLogs("Stopped harvester for file", 20*time.Second, "harvester did not stop")
filebeat.WaitForLogs("Closing reader of filestream", 20*time.Second, "reader did not close")

// 4. Now we need to stop Filebeat before the next scan cycle
filebeat.Stop()

// Assert we offset in the registry
assertLastOffset(t, registryLogFile, 10_000)

// Open for appending because the file has already been truncated
integration.GenerateLogFile(t, logFile, 10, true)

// 5. Start Filebeat again.
filebeat.Start()
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")

assertLastOffset(t, registryLogFile, 500)
}

func TestFilestreamOfflineFileTruncation(t *testing.T) {
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)

tempDir := filebeat.TempDir()
logFile := path.Join(tempDir, "log.log")
registryLogFile := filepath.Join(tempDir, "data/registry/filebeat/log.json")
filebeat.WriteConfigFile(fmt.Sprintf(truncationCfg, logFile, tempDir, tempDir))

// 1. Create a log file with some lines
integration.GenerateLogFile(t, logFile, 10, false)

// 2. Ingest the file and stop Filebeat
filebeat.Start()
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")
filebeat.Stop()

// 3. Assert the offset is correctly set in the registry
assertLastOffset(t, registryLogFile, 500)

// 4. Truncate the file and write some data (less than before)
if err := os.Truncate(logFile, 0); err != nil {
t.Fatalf("could not truncate log file: %s", err)
}
integration.GenerateLogFile(t, logFile, 5, true)

// 5. Read the file again and stop Filebeat
filebeat.Start()
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")
filebeat.WaitForLogs("End of file reached", 30*time.Second, "Filebeat did not finish reading the log file")
filebeat.Stop()

// 6. Assert the registry offset is new, smaller file size.
assertLastOffset(t, registryLogFile, 250)
}

func assertLastOffset(t *testing.T, path string, offset int) {
t.Helper()
entries := readFilestreamRegistryLog(t, path)
lastEntry := entries[len(entries)-1]
if lastEntry.Offset != offset {
t.Errorf("expecting offset %d got %d instead", offset, lastEntry.Offset)
t.Log("last registry entries:")

max := len(entries)
if max > 10 {
max = 10
}
for _, e := range entries[:max] {
t.Logf("%+v\n", e)
}

t.FailNow()
}
}

type registryEntry struct {
Key string
Offset int
Filename string
TTL time.Duration
}

func readFilestreamRegistryLog(t *testing.T, path string) []registryEntry {
file, err := os.Open(path)
if err != nil {
t.Fatalf("could not open file '%s': %s", path, err)
}

entries := []registryEntry{}
s := bufio.NewScanner(file)

for s.Scan() {
line := s.Bytes()

e := entry{}
if err := json.Unmarshal(line, &e); err != nil {
t.Fatalf("could not read line '%s': %s", string(line), err)
}

// Skips registry log entries containing the operation ID like:
// '{"op":"set","id":46}'
if e.Key == "" {
continue
}

entries = append(entries, registryEntry{
Key: e.Key,
Offset: e.Value.Cursor.Offset,
TTL: e.Value.TTL,
Filename: e.Value.Meta.Source,
})
}

return entries
}

type entry struct {
Key string `json:"k"`
Value struct {
Cursor struct {
Offset int `json:"offset"`
} `json:"cursor"`
Meta struct {
Source string `json:"source"`
} `json:"meta"`
TTL time.Duration `json:"ttl"`
} `json:"v"`
}
Loading
Loading