diff --git a/.chloggen/pkg-stanza-reader-offset-update.yaml b/.chloggen/pkg-stanza-reader-offset-update.yaml new file mode 100755 index 000000000000..d49867d3cb79 --- /dev/null +++ b/.chloggen/pkg-stanza-reader-offset-update.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/filelog + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where file fingerprint could be corrupted while reading. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [22936] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 18ad9d84fc38..ec2a4cd555b7 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -151,18 +152,19 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli } readerFactory := reader.Factory{ - SugaredLogger: logger.With("component", "fileconsumer"), - FromBeginning: startAtBeginning, - FingerprintSize: int(c.FingerprintSize), - MaxLogSize: int(c.MaxLogSize), - Encoding: enc, - SplitFunc: splitFunc, - TrimFunc: trimFunc, - FlushTimeout: c.FlushPeriod, - EmitFunc: emit, - Attributes: c.Resolver, - HeaderConfig: hCfg, - DeleteAtEOF: c.DeleteAfterRead, + SugaredLogger: logger.With("component", "fileconsumer"), + FromBeginning: startAtBeginning, + FingerprintSize: int(c.FingerprintSize), + InitialBufferSize: scanner.DefaultBufferSize, + MaxLogSize: int(c.MaxLogSize), + Encoding: enc, + SplitFunc: splitFunc, + TrimFunc: trimFunc, + FlushTimeout: c.FlushPeriod, + EmitFunc: emit, + Attributes: c.Resolver, + HeaderConfig: hCfg, + DeleteAtEOF: c.DeleteAfterRead, } knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) for i := 0; i < len(knownFiles); i++ { diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index 55447c9d6c4e..5153f596bcf2 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -28,17 +28,18 @@ const ( type Factory struct { *zap.SugaredLogger - HeaderConfig *header.Config - FromBeginning bool - FingerprintSize int - MaxLogSize int - Encoding encoding.Encoding - SplitFunc bufio.SplitFunc - TrimFunc trim.Func - FlushTimeout time.Duration - EmitFunc emit.Callback - Attributes attrs.Resolver - DeleteAtEOF bool + HeaderConfig *header.Config + FromBeginning bool + FingerprintSize int + InitialBufferSize int + MaxLogSize int + Encoding encoding.Encoding + SplitFunc bufio.SplitFunc + TrimFunc trim.Func + FlushTimeout time.Duration + EmitFunc emit.Callback + Attributes attrs.Resolver + DeleteAtEOF bool } func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { @@ -58,16 +59,21 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader } func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) { + // Trim the fingerprint if user has reconfigured fingerprint_size + if len(m.Fingerprint.FirstBytes) > f.FingerprintSize { + m.Fingerprint.FirstBytes = m.Fingerprint.FirstBytes[:f.FingerprintSize] + } r = &Reader{ - Metadata: m, - logger: f.SugaredLogger.With("path", file.Name()), - file: file, - fileName: file.Name(), - fingerprintSize: f.FingerprintSize, - maxLogSize: f.MaxLogSize, - decoder: decode.New(f.Encoding), - lineSplitFunc: f.SplitFunc, - deleteAtEOF: f.DeleteAtEOF, + Metadata: m, + logger: f.SugaredLogger.With("path", file.Name()), + file: file, + fileName: file.Name(), + fingerprintSize: f.FingerprintSize, + initialBufferSize: f.InitialBufferSize, + maxLogSize: f.MaxLogSize, + decoder: decode.New(f.Encoding), + lineSplitFunc: f.SplitFunc, + deleteAtEOF: f.DeleteAtEOF, } if !f.FromBeginning { diff --git a/pkg/stanza/fileconsumer/internal/reader/factory_test.go b/pkg/stanza/fileconsumer/internal/reader/factory_test.go index 4f72341b6a05..c83537996406 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory_test.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" @@ -28,13 +29,14 @@ const ( func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink) { cfg := &testFactoryCfg{ - fromBeginning: true, - fingerprintSize: fingerprint.DefaultSize, - maxLogSize: defaultMaxLogSize, - encoding: unicode.UTF8, - trimFunc: trim.Whitespace, - flushPeriod: defaultFlushPeriod, - sinkCallBufferSize: 100, + fromBeginning: true, + fingerprintSize: fingerprint.DefaultSize, + initialBufferSize: scanner.DefaultBufferSize, + maxLogSize: defaultMaxLogSize, + encoding: unicode.UTF8, + trimFunc: trim.Whitespace, + flushPeriod: defaultFlushPeriod, + sinkChanSize: 100, attributes: attrs.Resolver{ IncludeFileName: true, }, @@ -46,33 +48,35 @@ func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink splitFunc, err := cfg.splitCfg.Func(cfg.encoding, false, cfg.maxLogSize) require.NoError(t, err) - sink := emittest.NewSink(emittest.WithCallBuffer(cfg.sinkCallBufferSize)) + sink := emittest.NewSink(emittest.WithCallBuffer(cfg.sinkChanSize)) return &Factory{ - SugaredLogger: testutil.Logger(t), - FromBeginning: cfg.fromBeginning, - FingerprintSize: cfg.fingerprintSize, - MaxLogSize: cfg.maxLogSize, - Encoding: cfg.encoding, - SplitFunc: splitFunc, - TrimFunc: cfg.trimFunc, - FlushTimeout: cfg.flushPeriod, - EmitFunc: sink.Callback, - Attributes: cfg.attributes, + SugaredLogger: testutil.Logger(t), + FromBeginning: cfg.fromBeginning, + FingerprintSize: cfg.fingerprintSize, + InitialBufferSize: cfg.initialBufferSize, + MaxLogSize: cfg.maxLogSize, + Encoding: cfg.encoding, + SplitFunc: splitFunc, + TrimFunc: cfg.trimFunc, + FlushTimeout: cfg.flushPeriod, + EmitFunc: sink.Callback, + Attributes: cfg.attributes, }, sink } type testFactoryOpt func(*testFactoryCfg) type testFactoryCfg struct { - fromBeginning bool - fingerprintSize int - maxLogSize int - encoding encoding.Encoding - splitCfg split.Config - trimFunc trim.Func - flushPeriod time.Duration - sinkCallBufferSize int - attributes attrs.Resolver + fromBeginning bool + fingerprintSize int + initialBufferSize int + maxLogSize int + encoding encoding.Encoding + splitCfg split.Config + trimFunc trim.Func + flushPeriod time.Duration + sinkChanSize int + attributes attrs.Resolver } func withFingerprintSize(size int) testFactoryOpt { @@ -87,9 +91,15 @@ func withSplitConfig(cfg split.Config) testFactoryOpt { } } -func withMaxLogSize(maxLogSize int) testFactoryOpt { +func withInitialBufferSize(size int) testFactoryOpt { return func(c *testFactoryCfg) { - c.maxLogSize = maxLogSize + c.initialBufferSize = size + } +} + +func withMaxLogSize(size int) testFactoryOpt { + return func(c *testFactoryCfg) { + c.maxLogSize = size } } @@ -99,9 +109,9 @@ func withFlushPeriod(flushPeriod time.Duration) testFactoryOpt { } } -func withSinkBufferSize(n int) testFactoryOpt { +func withSinkChanSize(n int) testFactoryOpt { return func(c *testFactoryCfg) { - c.sinkCallBufferSize = n + c.sinkChanSize = n } } diff --git a/pkg/stanza/fileconsumer/internal/reader/fingerprint_test.go b/pkg/stanza/fileconsumer/internal/reader/fingerprint_test.go new file mode 100644 index 000000000000..6d72f5c07f22 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/reader/fingerprint_test.go @@ -0,0 +1,348 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reader + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" +) + +func TestReaderUpdateFingerprint(t *testing.T) { + bufferSizes := []int{2, 3, 5, 8, 10, 13, 20, 50} + testCases := []updateFingerprintTest{ + { + name: "new_file", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte(""), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("1234567890")}, + expectOffset: 11, + expectFingerprint: []byte("1234567890"), + }, + { + name: "existing_partial_line_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("foo1234567890")}, + expectOffset: 14, + expectFingerprint: []byte("foo1234567"), + }, + { + name: "existing_partial_line", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("1234567890")}, + expectOffset: 14, + expectFingerprint: []byte("foo1234567"), + }, + { + name: "existing_full_line_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo\n"), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("foo"), []byte("1234567890")}, + expectOffset: 15, + expectFingerprint: []byte("foo\n123456"), + }, + { + name: "existing_full_line", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo\n"), + moreBytes: []byte("1234567890\n"), + expectTokens: [][]byte{[]byte("1234567890")}, + expectOffset: 15, + expectFingerprint: []byte("foo\n123456"), + }, + { + name: "split_none_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890"), + expectTokens: [][]byte{}, + expectOffset: 0, + expectFingerprint: []byte("foo1234567"), + }, + { + name: "split_none", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890"), + expectTokens: [][]byte{}, + expectOffset: 3, + expectFingerprint: []byte("foo1234567"), + }, + { + name: "split_mid_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890"), + expectTokens: [][]byte{[]byte("foo12345")}, + expectOffset: 9, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "split_mid", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890"), + expectTokens: [][]byte{[]byte("12345")}, + expectOffset: 9, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "clean_end_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890\n"), + expectTokens: [][]byte{[]byte("foo12345"), []byte("67890")}, + expectOffset: 15, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "clean_end", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo"), + moreBytes: []byte("12345\n67890\n"), + expectTokens: [][]byte{[]byte("12345"), []byte("67890")}, + expectOffset: 15, + expectFingerprint: []byte("foo12345\n6"), + }, + { + name: "full_lines_only_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte("foo\n"), + moreBytes: []byte("12345\n67890\n"), + expectTokens: [][]byte{[]byte("foo"), []byte("12345"), []byte("67890")}, + expectOffset: 16, + expectFingerprint: []byte("foo\n12345\n"), + }, + { + name: "full_lines_only", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte("foo\n"), + moreBytes: []byte("12345\n67890\n"), + expectTokens: [][]byte{[]byte("12345"), []byte("67890")}, + expectOffset: 16, + expectFingerprint: []byte("foo\n12345\n"), + }, + { + name: "small_max_log_size_from_start", + fingerprintSize: 20, + maxLogSize: 4, + fromBeginning: true, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890\nbar\nhelloworld\n"), + expectTokens: [][]byte{[]byte("foo1"), []byte("2345"), []byte("6789"), []byte("0"), []byte("bar"), []byte("hell"), []byte("owor"), []byte("ld")}, + expectOffset: 29, + expectFingerprint: []byte("foo1234567890\nbar\nhe"), + }, + { + name: "small_max_log_size", + fingerprintSize: 20, + maxLogSize: 4, + initBytes: []byte("foo"), + moreBytes: []byte("1234567890\nbar\nhelloworld\n"), + expectTokens: [][]byte{[]byte("1234"), []byte("5678"), []byte("90"), []byte("bar"), []byte("hell"), []byte("owor"), []byte("ld")}, + expectOffset: 29, + expectFingerprint: []byte("foo1234567890\nbar\nhe"), + }, + { + name: "leading_empty_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte(""), + moreBytes: []byte("\n12345\n67890\n"), + expectTokens: [][]byte{[]byte(""), []byte("12345"), []byte("67890")}, + expectOffset: 13, + expectFingerprint: []byte("\n12345\n678"), + }, + { + name: "leading_empty", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte(""), + moreBytes: []byte("\n12345\n67890\n"), + expectTokens: [][]byte{[]byte(""), []byte("12345"), []byte("67890")}, + expectOffset: 13, + expectFingerprint: []byte("\n12345\n678"), + }, + { + name: "multiple_empty_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte(""), + moreBytes: []byte("\n\n12345\n\n67890\n\n"), + expectTokens: [][]byte{[]byte(""), []byte(""), []byte("12345"), []byte(""), []byte("67890"), []byte("")}, + expectOffset: 16, + expectFingerprint: []byte("\n\n12345\n\n6"), + }, + { + name: "multiple_empty", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte(""), + moreBytes: []byte("\n\n12345\n\n67890\n\n"), + expectTokens: [][]byte{[]byte(""), []byte(""), []byte("12345"), []byte(""), []byte("67890"), []byte("")}, + expectOffset: 16, + expectFingerprint: []byte("\n\n12345\n\n6"), + }, + { + name: "multiple_empty_partial_end_from_start", + fingerprintSize: 10, + maxLogSize: 100, + fromBeginning: true, + initBytes: []byte(""), + moreBytes: []byte("\n\n12345\n\n67890"), + expectTokens: [][]byte{[]byte(""), []byte(""), []byte("12345"), []byte("")}, + expectOffset: 9, + expectFingerprint: []byte("\n\n12345\n\n6"), + }, + { + name: "multiple_empty_partial_end", + fingerprintSize: 10, + maxLogSize: 100, + initBytes: []byte(""), + moreBytes: []byte("\n\n12345\n\n67890"), + expectTokens: [][]byte{[]byte(""), []byte(""), []byte("12345"), []byte("")}, + expectOffset: 9, + expectFingerprint: []byte("\n\n12345\n\n6"), + }, + } + + for _, tc := range testCases { + for _, bufferSize := range bufferSizes { + t.Run(fmt.Sprintf("%s/bufferSize:%d", tc.name, bufferSize), tc.run(bufferSize)) + } + } +} + +type updateFingerprintTest struct { + name string + fingerprintSize int + maxLogSize int + fromBeginning bool + initBytes []byte + moreBytes []byte + expectTokens [][]byte + expectOffset int64 + expectFingerprint []byte +} + +func (tc updateFingerprintTest) run(bufferSize int) func(*testing.T) { + return func(t *testing.T) { + opts := []testFactoryOpt{ + withFingerprintSize(tc.fingerprintSize), + withInitialBufferSize(bufferSize), + withMaxLogSize(tc.maxLogSize), + withFlushPeriod(0), + } + if !tc.fromBeginning { + opts = append(opts, fromEnd()) + } + f, sink := testFactory(t, opts...) + + temp := filetest.OpenTemp(t, t.TempDir()) + _, err := temp.Write(tc.initBytes) + require.NoError(t, err) + + fi, err := temp.Stat() + require.NoError(t, err) + require.Equal(t, int64(len(tc.initBytes)), fi.Size()) + + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + r, err := f.NewReader(temp, fp) + require.NoError(t, err) + require.Same(t, temp, r.file) + + if tc.fromBeginning { + assert.Equal(t, int64(0), r.Offset) + } else { + assert.Equal(t, int64(len(tc.initBytes)), r.Offset) + } + assert.Equal(t, tc.initBytes, r.Fingerprint.FirstBytes) + + i, err := temp.Write(tc.moreBytes) + require.NoError(t, err) + require.Equal(t, i, len(tc.moreBytes)) + + r.ReadToEnd(context.Background()) + + sink.ExpectTokens(t, tc.expectTokens...) + + assert.Equal(t, tc.expectOffset, r.Offset) + assert.Equal(t, tc.expectFingerprint, r.Fingerprint.FirstBytes) + } +} + +// TestReadingWithLargeFingerPrintSizeAndFileLargerThanScannerBuf tests for reading of log file when: +// - fingerprint size is larger than the size of scanner default buffer (defaultBufSize) +// - size of the log file is lower than fingerprint size +func TestReadingWithLargeFingerPrintSizeAndFileLargerThanScannerBuf(t *testing.T) { + t.Parallel() + tempDir := t.TempDir() + + // Generate log lines + body := "abcdefghijklmnopqrstuvwxyz1234567890" + fileContent := "" + expected := [][]byte{} + fingerPrintSize := scanner.DefaultBufferSize + 2*1024 + + for i := 0; len(fileContent) < fingerPrintSize-1024; i++ { + log := fmt.Sprintf("line %d log %s, end of line %d", i, body, i) + fileContent += fmt.Sprintf("%s\n", log) + expected = append(expected, []byte(log)) + } + + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, fileContent) + + f, sink := testFactory(t, + withFingerprintSize(fingerPrintSize), + withMaxLogSize(defaultMaxLogSize), + withSinkChanSize(1000), + ) + + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + + r, err := f.NewReader(temp, fp) + require.NoError(t, err) + + initialFingerPrintSize := len(r.Fingerprint.FirstBytes) + r.ReadToEnd(context.Background()) + require.Equal(t, initialFingerPrintSize, len(r.Fingerprint.FirstBytes)) + + sink.ExpectTokens(t, expected...) +} diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 3c1111a43e2b..f5eefe99df0f 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -30,18 +30,20 @@ type Metadata struct { // Reader manages a single file type Reader struct { *Metadata - logger *zap.SugaredLogger - fileName string - file *os.File - fingerprintSize int - maxLogSize int - lineSplitFunc bufio.SplitFunc - splitFunc bufio.SplitFunc - decoder *decode.Decoder - headerReader *header.Reader - processFunc emit.Callback - emitFunc emit.Callback - deleteAtEOF bool + logger *zap.SugaredLogger + fileName string + file *os.File + fingerprintSize int + initialBufferSize int + maxLogSize int + lineSplitFunc bufio.SplitFunc + splitFunc bufio.SplitFunc + decoder *decode.Decoder + headerReader *header.Reader + processFunc emit.Callback + emitFunc emit.Callback + deleteAtEOF bool + needsUpdateFingerprint bool } // ReadToEnd will read until the end of the file @@ -51,7 +53,13 @@ func (r *Reader) ReadToEnd(ctx context.Context) { return } - s := scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) + defer func() { + if r.needsUpdateFingerprint { + r.updateFingerprint() + } + }() + + s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc) // Iterate over the tokenized file, emitting entries as we go for { @@ -144,32 +152,16 @@ func (r *Reader) close() { } // Read from the file and update the fingerprint if necessary -func (r *Reader) Read(dst []byte) (int, error) { - // Skip if fingerprint is already built - // or if fingerprint is behind Offset - if len(r.Fingerprint.FirstBytes) == r.fingerprintSize || int(r.Offset) > len(r.Fingerprint.FirstBytes) { - return r.file.Read(dst) - } - n, err := r.file.Read(dst) - appendCount := min0(n, r.fingerprintSize-int(r.Offset)) - // return for n == 0 or r.Offset >= r.fingerprintSize - if appendCount == 0 { - return n, err +func (r *Reader) Read(dst []byte) (n int, err error) { + n, err = r.file.Read(dst) + if n == 0 || err != nil { + return } - // for appendCount==0, the following code would add `0` to fingerprint - r.Fingerprint.FirstBytes = append(r.Fingerprint.FirstBytes[:r.Offset], dst[:appendCount]...) - return n, err -} - -func min0(a, b int) int { - if a < 0 || b < 0 { - return 0 - } - if a < b { - return a + if !r.needsUpdateFingerprint && len(r.Fingerprint.FirstBytes) < r.fingerprintSize { + r.needsUpdateFingerprint = true } - return b + return } func (r *Reader) NameEquals(other *Reader) bool { @@ -194,3 +186,18 @@ func (r *Reader) Validate() bool { func (m Metadata) GetFingerprint() *fingerprint.Fingerprint { return m.Fingerprint } + +func (r *Reader) updateFingerprint() { + r.needsUpdateFingerprint = false + if r.file == nil { + return + } + refreshedFingerprint, err := fingerprint.New(r.file, r.fingerprintSize) + if err != nil { + return + } + if len(r.Fingerprint.FirstBytes) > 0 && !refreshedFingerprint.StartsWith(r.Fingerprint) { + return // fingerprint tampered, likely due to truncation + } + r.Fingerprint.FirstBytes = refreshedFingerprint.FirstBytes +} diff --git a/pkg/stanza/fileconsumer/internal/reader/reader_test.go b/pkg/stanza/fileconsumer/internal/reader/reader_test.go index 58baeea97277..bca9221c0249 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader_test.go @@ -60,7 +60,7 @@ func TestFingerprintGrowsAndStops(t *testing.T) { temp := filetest.OpenTemp(t, tempDir) tempCopy := filetest.OpenFile(t, temp.Name()) - f, _ := testFactory(t, withSinkBufferSize(3*fpSize/lineLen), withFingerprintSize(fpSize)) + f, _ := testFactory(t, withSinkChanSize(3*fpSize/lineLen), withFingerprintSize(fpSize)) fp, err := f.NewFingerprint(temp) require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) @@ -110,14 +110,14 @@ func TestFingerprintChangeSize(t *testing.T) { // Use prime numbers to ensure variation in // whether or not they are factors of fpSize - lineLens := []int{3, 5, 7, 11, 13, 17, 19, 23, 27} + lineLens := []int{3, 4, 5, 6, 7, 8, 11, 12, 13, 17, 19, 23, 27, 36} for _, lineLen := range lineLens { lineLen := lineLen t.Run(fmt.Sprintf("%d", lineLen), func(t *testing.T) { t.Parallel() - f, _ := testFactory(t, withSinkBufferSize(3*fpSize/lineLen), withFingerprintSize(fpSize)) + f, _ := testFactory(t, withSinkChanSize(3*fpSize/lineLen), withFingerprintSize(fpSize)) tempDir := t.TempDir() temp := filetest.OpenTemp(t, tempDir) @@ -151,7 +151,8 @@ func TestFingerprintChangeSize(t *testing.T) { } // Recreate the factory with a larger fingerprint size - f, _ = testFactory(t, withSinkBufferSize(3*fpSize/lineLen), withFingerprintSize(fpSize*lineLen/3)) + fpSizeUp := fpSize * 2 + f, _ = testFactory(t, withSinkChanSize(3*fpSize/lineLen), withFingerprintSize(fpSizeUp)) // Recreate the reader with the new factory reader, err = f.NewReaderFromMetadata(filetest.OpenFile(t, temp.Name()), reader.Close()) @@ -162,10 +163,11 @@ func TestFingerprintChangeSize(t *testing.T) { filetest.WriteString(t, temp, line) reader.ReadToEnd(context.Background()) - require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes) + require.Equal(t, fileContent[:fpSizeUp], reader.Fingerprint.FirstBytes) // Recreate the factory with a smaller fingerprint size - f, _ = testFactory(t, withSinkBufferSize(3*fpSize/lineLen), withFingerprintSize(fpSize/2)) + fpSizeDown := fpSize / 2 + f, _ = testFactory(t, withSinkChanSize(3*fpSize/lineLen), withFingerprintSize(fpSizeDown)) // Recreate the reader with the new factory reader, err = f.NewReaderFromMetadata(filetest.OpenFile(t, temp.Name()), reader.Close()) @@ -176,7 +178,7 @@ func TestFingerprintChangeSize(t *testing.T) { filetest.WriteString(t, temp, line) reader.ReadToEnd(context.Background()) - require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes) + require.Equal(t, fileContent[:fpSizeDown], reader.Fingerprint.FirstBytes) }) } } diff --git a/pkg/stanza/trim/trim.go b/pkg/stanza/trim/trim.go index 1906aed08b3f..3cf5d4f5c2ce 100644 --- a/pkg/stanza/trim/trim.go +++ b/pkg/stanza/trim/trim.go @@ -16,6 +16,9 @@ func WithFunc(splitFunc bufio.SplitFunc, trimFunc Func) bufio.SplitFunc { } return func(data []byte, atEOF bool) (advance int, token []byte, err error) { advance, token, err = splitFunc(data, atEOF) + if advance == 0 && token == nil && err == nil { + return 0, nil, nil + } return advance, trimFunc(token), err } }