Skip to content

Commit

Permalink
[chore][pkg/stanza] Make fingerprint.FirstBytes private (open-telemet…
Browse files Browse the repository at this point in the history
…ry#31346)

Depends on open-telemetry#31251

This is in preparation for adding additional fields to the fingerprint
struct. The goal is to write the same data to storage while ensuring
consistency between the fields of the struct. This problem does not
present until there is a second field in the struct, but this PR
prepares for that problem without adding the new field. See open-telemetry#31317 for
additional detail.
  • Loading branch information
djaglowski authored and XinRanZhAWS committed Mar 13, 2024
1 parent a505a59 commit 18d5210
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 90 deletions.
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
return nil, nil
}

if len(fp.FirstBytes) == 0 {
if fp.Len() == 0 {
// Empty file, don't read it until we can compare its fingerprint
if err = file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
Expand Down
16 changes: 8 additions & 8 deletions pkg/stanza/fileconsumer/internal/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestNopEncodingDifferentLogSizes(t *testing.T) {
[]*reader.Metadata{
{
FileAttributes: make(map[string]any),
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Fingerprint: fingerprint.New([]byte("foo")),
Offset: 3,
},
},
Expand All @@ -63,12 +63,12 @@ func TestNopEncodingDifferentLogSizes(t *testing.T) {
[]*reader.Metadata{
{
FileAttributes: make(map[string]any),
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Fingerprint: fingerprint.New([]byte("foo")),
Offset: 3,
},
{
FileAttributes: make(map[string]any),
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")},
Fingerprint: fingerprint.New([]byte("barrrr")),
Offset: 6,
},
},
Expand All @@ -77,20 +77,20 @@ func TestNopEncodingDifferentLogSizes(t *testing.T) {
"other_fields",
[]*reader.Metadata{
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Fingerprint: fingerprint.New([]byte("foo")),
Offset: 3,
FileAttributes: map[string]any{
"hello": "world",
},
},
{
FileAttributes: make(map[string]any),
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("barrrr")},
Fingerprint: fingerprint.New([]byte("barrrr")),
Offset: 6,
HeaderFinalized: true,
},
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("ab")},
Fingerprint: fingerprint.New([]byte("ab")),
Offset: 2,
FileAttributes: map[string]any{
"hello2": "world2",
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestMigrateHeaderAttributes(t *testing.T) {
p := testutil.NewUnscopedMockPersister()
saveDeprecated(t, p, &deprecatedMetadata{
Metadata: reader.Metadata{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Fingerprint: fingerprint.New([]byte("foo")),
Offset: 3,
FileAttributes: map[string]any{
"HeaderAttributes": map[string]any{
Expand All @@ -134,7 +134,7 @@ func TestMigrateHeaderAttributes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, []*reader.Metadata{
{
Fingerprint: &fingerprint.Fingerprint{FirstBytes: []byte("foo")},
Fingerprint: fingerprint.New([]byte("foo")),
Offset: 3,
FileAttributes: map[string]any{
"hello": "world",
Expand Down
15 changes: 3 additions & 12 deletions pkg/stanza/fileconsumer/internal/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,11 @@ func match[T Matchable](ele T, expect bool) func(t *testing.T, fileset *Fileset[
}
}

func newFingerprint(bytes []byte) *fingerprint.Fingerprint {
return &fingerprint.Fingerprint{
FirstBytes: bytes,
}
}
func newMetadata(bytes []byte) *reader.Metadata {
return &reader.Metadata{
Fingerprint: newFingerprint(bytes),
}
}

func newReader(bytes []byte) *reader.Reader {
return &reader.Reader{
Metadata: newMetadata(bytes),
Metadata: &reader.Metadata{
Fingerprint: fingerprint.New(bytes),
},
}
}

Expand Down
60 changes: 39 additions & 21 deletions pkg/stanza/fileconsumer/internal/fingerprint/fingerprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package fingerprint // import "github.com/open-telemetry/opentelemetry-collector

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -18,46 +19,45 @@ const MinSize = 16 // bytes
// Fingerprint is used to identify a file
// A file's fingerprint is the first N bytes of the file
type Fingerprint struct {
FirstBytes []byte
firstBytes []byte
}

// New creates a new fingerprint from an open file
func New(file *os.File, size int) (*Fingerprint, error) {
buf := make([]byte, size)
func New(first []byte) *Fingerprint {
return &Fingerprint{firstBytes: first}
}

func NewFromFile(file *os.File, size int) (*Fingerprint, error) {
buf := make([]byte, size)
n, err := file.ReadAt(buf, 0)
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("reading fingerprint bytes: %w", err)
}

fp := &Fingerprint{
FirstBytes: buf[:n],
}

return fp, nil
return New(buf[:n]), nil
}

// Copy creates a new copy of the fingerprint
func (f Fingerprint) Copy() *Fingerprint {
buf := make([]byte, len(f.FirstBytes), cap(f.FirstBytes))
n := copy(buf, f.FirstBytes)
return &Fingerprint{
FirstBytes: buf[:n],
}
buf := make([]byte, len(f.firstBytes), cap(f.firstBytes))
n := copy(buf, f.firstBytes)
return New(buf[:n])
}

func (f *Fingerprint) Len() int {
return len(f.firstBytes)
}

// Equal returns true if the fingerprints have the same FirstBytes,
// false otherwise. This does not compare other aspects of the fingerprints
// because the primary purpose of a fingerprint is to convey a unique
// identity, and only the FirstBytes field contributes to this goal.
func (f Fingerprint) Equal(other *Fingerprint) bool {
l0 := len(other.FirstBytes)
l1 := len(f.FirstBytes)
l0 := len(other.firstBytes)
l1 := len(f.firstBytes)
if l0 != l1 {
return false
}
for i := 0; i < l0; i++ {
if other.FirstBytes[i] != f.FirstBytes[i] {
if other.firstBytes[i] != f.firstBytes[i] {
return false
}
}
Expand All @@ -71,13 +71,31 @@ func (f Fingerprint) Equal(other *Fingerprint) bool {
// a fingerprint. As the file grows, its fingerprint is updated
// until it reaches a maximum size, as configured on the operator
func (f Fingerprint) StartsWith(old *Fingerprint) bool {
l0 := len(old.FirstBytes)
l0 := len(old.firstBytes)
if l0 == 0 {
return false
}
l1 := len(f.FirstBytes)
l1 := len(f.firstBytes)
if l0 > l1 {
return false
}
return bytes.Equal(old.FirstBytes[:l0], f.FirstBytes[:l0])
return bytes.Equal(old.firstBytes[:l0], f.firstBytes[:l0])
}

func (f *Fingerprint) MarshalJSON() ([]byte, error) {
m := marshal{FirstBytes: f.firstBytes}
return json.Marshal(&m)
}

func (f *Fingerprint) UnmarshalJSON(data []byte) error {
m := new(marshal)
if err := json.Unmarshal(data, m); err != nil {
return err
}
f.firstBytes = m.FirstBytes
return nil
}

type marshal struct {
FirstBytes []byte `json:"first_bytes"`
}
66 changes: 41 additions & 25 deletions pkg/stanza/fileconsumer/internal/fingerprint/fingerprint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func TestNewDoesNotModifyOffset(t *testing.T) {
_, err = temp.Seek(0, 0)
require.NoError(t, err)

fp, err := New(temp, len(fingerprint))
fp, err := NewFromFile(temp, len(fingerprint))
require.NoError(t, err)

// Validate the fingerprint is the correct size
require.Equal(t, len(fingerprint), len(fp.FirstBytes))
require.Equal(t, len(fingerprint), len(fp.firstBytes))

// Validate that reading the fingerprint did not adjust the
// file descriptor's internal offset (as using Seek does)
Expand All @@ -52,6 +52,11 @@ func TestNewDoesNotModifyOffset(t *testing.T) {
}

func TestNew(t *testing.T) {
fp := New([]byte("hello"))
require.Equal(t, []byte("hello"), fp.firstBytes)
}

func TestNewFromFile(t *testing.T) {
cases := []struct {
name string
fingerprintSize int
Expand Down Expand Up @@ -126,15 +131,15 @@ func TestNew(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tc.fileSize, int(info.Size()))

fp, err := New(temp, tc.fingerprintSize)
fp, err := NewFromFile(temp, tc.fingerprintSize)
require.NoError(t, err)

require.Equal(t, tc.expectedLen, len(fp.FirstBytes))
require.Equal(t, tc.expectedLen, len(fp.firstBytes))
})
}
}

func TestFingerprintCopy(t *testing.T) {
func TestCopy(t *testing.T) {
t.Parallel()
cases := []string{
"",
Expand All @@ -146,36 +151,36 @@ func TestFingerprintCopy(t *testing.T) {
}

for _, tc := range cases {
fp := &Fingerprint{FirstBytes: []byte(tc)}
fp := New([]byte(tc))

cp := fp.Copy()

// Did not change original
require.Equal(t, tc, string(fp.FirstBytes))
require.Equal(t, tc, string(fp.firstBytes))

// Copy is also good
require.Equal(t, tc, string(cp.FirstBytes))
require.Equal(t, tc, string(cp.firstBytes))

// Modify copy
cp.FirstBytes = append(cp.FirstBytes, []byte("also")...)
cp.firstBytes = append(cp.firstBytes, []byte("also")...)

// Still did not change original
require.Equal(t, tc, string(fp.FirstBytes))
require.Equal(t, tc, string(fp.firstBytes))

// Copy is modified
require.Equal(t, tc+"also", string(cp.FirstBytes))
require.Equal(t, tc+"also", string(cp.firstBytes))
}
}

func TestEqual(t *testing.T) {
empty := &Fingerprint{FirstBytes: []byte("")}
empty2 := &Fingerprint{FirstBytes: []byte("")}
hello := &Fingerprint{FirstBytes: []byte("hello")}
hello2 := &Fingerprint{FirstBytes: []byte("hello")}
world := &Fingerprint{FirstBytes: []byte("world")}
world2 := &Fingerprint{FirstBytes: []byte("world")}
helloworld := &Fingerprint{FirstBytes: []byte("helloworld")}
helloworld2 := &Fingerprint{FirstBytes: []byte("helloworld")}
empty := New([]byte(""))
empty2 := New([]byte(""))
hello := New([]byte("hello"))
hello2 := New([]byte("hello"))
world := New([]byte("world"))
world2 := New([]byte("world"))
helloworld := New([]byte("helloworld"))
helloworld2 := New([]byte("helloworld"))

require.True(t, empty.Equal(empty2))
require.True(t, hello.Equal(hello2))
Expand All @@ -193,10 +198,10 @@ func TestEqual(t *testing.T) {
}

func TestStartsWith(t *testing.T) {
empty := &Fingerprint{FirstBytes: []byte("")}
hello := &Fingerprint{FirstBytes: []byte("hello")}
world := &Fingerprint{FirstBytes: []byte("world")}
helloworld := &Fingerprint{FirstBytes: []byte("helloworld")}
empty := New([]byte(""))
hello := New([]byte("hello"))
world := New([]byte("world"))
helloworld := New([]byte("helloworld"))

// Empty never matches
require.False(t, hello.StartsWith(empty))
Expand Down Expand Up @@ -245,7 +250,7 @@ func TestStartsWith_FromFile(t *testing.T) {
_, err = fullFile.Write(content)
require.NoError(t, err)

fff, err := New(fullFile, fingerprintSize)
fff, err := NewFromFile(fullFile, fingerprintSize)
require.NoError(t, err)

partialFile, err := os.CreateTemp(tempDir, "")
Expand All @@ -263,7 +268,7 @@ func TestStartsWith_FromFile(t *testing.T) {
_, err = partialFile.Write(content[i:i])
require.NoError(t, err)

pff, err := New(partialFile, fingerprintSize)
pff, err := NewFromFile(partialFile, fingerprintSize)
require.NoError(t, err)

require.True(t, fff.StartsWith(pff))
Expand All @@ -278,3 +283,14 @@ func tokenWithLength(length int) []byte {
}
return b
}

func TestMarshalUnmarshal(t *testing.T) {
fp := New([]byte("hello"))
b, err := fp.MarshalJSON()
require.NoError(t, err)

fp2 := new(Fingerprint)
require.NoError(t, fp2.UnmarshalJSON(b))

require.Equal(t, fp, fp2)
}
19 changes: 14 additions & 5 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"bufio"
"errors"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -43,7 +44,7 @@ type Factory struct {
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.FingerprintSize)
return fingerprint.NewFromFile(file, f.FingerprintSize)
}

func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
Expand All @@ -59,10 +60,6 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader
}

func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
// Trim the fingerprint if user has reconfigured fingerprint_size
if len(m.Fingerprint.FirstBytes) > f.FingerprintSize {
m.Fingerprint.FirstBytes = m.Fingerprint.FirstBytes[:f.FingerprintSize]
}
r = &Reader{
Metadata: m,
logger: f.SugaredLogger.With("path", file.Name()),
Expand All @@ -76,6 +73,18 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
deleteAtEOF: f.DeleteAtEOF,
}

if r.Fingerprint.Len() > r.fingerprintSize {
// User has reconfigured fingerprint_size
shorter, rereadErr := fingerprint.NewFromFile(file, r.fingerprintSize)
if rereadErr != nil {
return nil, fmt.Errorf("reread fingerprint: %w", err)
}
if !r.Fingerprint.StartsWith(shorter) {
return nil, errors.New("file truncated")
}
m.Fingerprint = shorter
}

if !f.FromBeginning {
var info os.FileInfo
if info, err = r.file.Stat(); err != nil {
Expand Down
Loading

0 comments on commit 18d5210

Please sign in to comment.