Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Delete unnecessary FingerprintUpdatingReader #202

Merged
merged 4 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,65 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
require.Equal(t, []byte("testlog1\n"), reader.Fingerprint.FirstBytes)
}

// Test that a fingerprint:
// - Starts empty
// - Updates as a file is read
// - Stops updating when the max fingerprint size is reached
// - Stops exactly at max fingerprint size, regardless of content
func TestFingerprintGrowsAndStops(t *testing.T) {
t.Parallel()

// Use a number with many factors.
// Sometimes fingerprint length will align with
// the end of a line, sometimes not. Test both.
maxFP := 360

// Use prime numbers to ensure variation in
// whether or not they are factors of maxFP
lineLens := []int{3, 5, 7, 11, 13, 17, 19, 23, 27}

for _, lineLen := range lineLens {
t.Run(fmt.Sprintf("%d", lineLen), func(t *testing.T) {
t.Parallel()
operator, _, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
cfg.FingerprintSize = helper.ByteSize(maxFP)
}, nil)
defer operator.Stop()

temp := openTemp(t, tempDir)
tempCopy := openFile(t, temp.Name())
fp, err := operator.NewFingerprint(temp)
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.NewReader(temp.Name(), tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

// keep track of what has been written to the file
fileContent := []byte{}

// keep track of expected fingerprint size
expectedFP := 0

// Write lines until file is much larger than the length of the fingerprint
for len(fileContent) < 2*maxFP {
expectedFP += lineLen
if expectedFP > maxFP {
expectedFP = maxFP
}

line := stringWithLength(lineLen-1) + "\n"
fileContent = append(fileContent, []byte(line)...)

writeString(t, temp, line)
reader.ReadToEnd(context.Background())
require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes)
}
})
}
}

func TestEncodings(t *testing.T) {
t.Parallel()
cases := []struct {
Expand Down
39 changes: 8 additions & 31 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bufio"
"context"
"fmt"
"io"
"os"
"path/filepath"

Expand Down Expand Up @@ -107,7 +106,6 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error {
}
f.Offset = info.Size()
}

return nil
}

Expand All @@ -118,8 +116,7 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
return
}

fr := NewFingerprintUpdatingReader(f.file, f.Offset, f.Fingerprint, f.fileInput.fingerprintSize)
scanner := NewPositionalScanner(fr, f.fileInput.MaxLogSize, f.Offset, f.fileInput.SplitFunc)
scanner := NewPositionalScanner(f, f.fileInput.MaxLogSize, f.Offset, f.fileInput.SplitFunc)

// Iterate over the tokenized file, emitting entries as we go
for {
Expand Down Expand Up @@ -215,34 +212,14 @@ func getScannerError(scanner *PositionalScanner) error {
return nil
}

// NewFingerprintUpdatingReader creates a new FingerprintUpdatingReader starting starting at the given offset
func NewFingerprintUpdatingReader(r io.Reader, offset int64, f *Fingerprint, fingerprintSize int) *FingerprintUpdatingReader {
return &FingerprintUpdatingReader{
fingerprint: f,
fingerprintSize: fingerprintSize,
reader: r,
offset: offset,
}
}

// FingerprintUpdatingReader wraps another reader, and updates the fingerprint
// with each read in the first fingerPrintSize bytes
type FingerprintUpdatingReader struct {
fingerprint *Fingerprint
fingerprintSize int
reader io.Reader
offset int64
}

// Read reads from the wrapped reader, saving the read bytes to the fingerprint
func (f *FingerprintUpdatingReader) Read(dst []byte) (int, error) {
if len(f.fingerprint.FirstBytes) == f.fingerprintSize {
return f.reader.Read(dst)
// Read from the file and update the fingerprint if necessary
func (f *Reader) Read(dst []byte) (int, error) {
if len(f.Fingerprint.FirstBytes) == f.fileInput.fingerprintSize {
return f.file.Read(dst)
}
n, err := f.reader.Read(dst)
appendCount := min0(n, f.fingerprintSize-int(f.offset))
f.fingerprint.FirstBytes = append(f.fingerprint.FirstBytes[:f.offset], dst[:appendCount]...)
f.offset += int64(n)
n, err := f.file.Read(dst)
appendCount := min0(n, f.fileInput.fingerprintSize-int(f.Offset))
f.Fingerprint.FirstBytes = append(f.Fingerprint.FirstBytes[:f.Offset], dst[:appendCount]...)
return n, err
}

Expand Down